Introduction to Apache Kafka [Tutorial]
Apache Kafka is a distributed streaming system that can publish and subscribe a stream of records. In another aspect, it is an enterprise messaging system. It is a highly fast, horizontally scalable, and fault-tolerant system. Kafka has four core APIs:
Producer API: Allows clients to connect to Kafka servers running in the cluster and publish the stream of records to one or more Kafka topics.
Consumer API: Allows clients to connect to Kafka servers running in the cluster and consume streams of records from one or more Kafka topics. Kafka consumes the messages from Kafka topics.
Streams API: Allows clients to act as stream processors by consuming streams from one or more topics and producing the streams to other output topics. This allows transforming the input and output streams.
Connector API: Allows writing reusable producer and consumer code; for example, if we want to read data from any RDBMS to publish the data to the topic and consume data from the topic and write that to RDBMS. We can create reusable source and sink connector components for various data sources.
What Is Kafka Used for?
Kafka is used for the below use cases.
Messaging System
Kafka is used as an enterprise messaging system to decouple source and target systems to exchange data. Kafka provides high throughput with partitions and fault tolerance with replication compared to JMS.
Web Activity Tracking
This is done to track user journey events on the website for analytics and offline data processing.
Log Aggregation
This processes the log from various systems, especially in distributed environments with microservices architectures in which the systems are deployed on various hosts. We need to aggregate the logs from various systems and make the logs available in a central place for analysis. Go through this article on distributed logging architecture where Kafka is used.
Metrics Collector
Kafka is used to collect metrics from various systems and networks for operations monitoring. There are Kafka metrics reporters available for monitoring tools like Ganglia, Graphite, etc.
Some references on this can be found here.
What Is a Broker?
An instance in a Kafka cluster is called a broker. In a Kafka cluster, if you connect to any one broker, you will be able to access the entire cluster. The broker instance that we connect to in order to access the cluster is known as a bootstrap server. Each broker is identified by a numeric ID in the cluster. To start a Kafka cluster, three brokers is a good number, but there are clusters with hundreds of brokers.
What Is a Topic?
A topic is a logical name to which the records are published. Internally, the topic is divided into partitions to which the data is published. These partitions are distributed across the brokers in the cluster. For example, if a topic has three partitions with three brokers in the cluster, each broker has one partition. The published data to partition is append-only with the offset increment.
Below are some points we need to remember when working with partitions.
- Topics are identified by name. We can have many named topics in a cluster.
- Theorder of messages is maintained at the partition level, not across topics.
- Once the data written to the partition, it is not overridden. This is called immutability.
- The messages in partitions are stored with keys, values, and timestamps. Kafka ensures publishing the message to the same partition for a given key.
- From the Kafka cluster, each partition will have a leader that will take read/write operations to that partition.
In the above example, I have created a topic with three partitions with a replication factor of 3. In this case, as the cluster has three brokers, the partitions are evenly distributed and the replicas of each partition are replicated over to another two brokers. As the replication factor is 3, there is no data loss — even if two brokers goes down. Always keep the replication factor greater than 1 and less than or equal to the number of brokers in the cluster. You can not create a topic with a replication factor more than the number of brokers in a cluster.
In the above diagram, for each partition, there is a leader (glowing partition) and other in-sync replicas (gray out partitions) are followers. For partition 0, the broker-1 is leader and broker-2 and broker-3 are followers. All the reads/writes to partition 0 will go to broker-1 and the same will be copied to broker-2 and broker-3.
Now, let's create a Kafka cluster with three brokers.
Step 1
Download the latest version of Apache Kafka. In this example, I am using 1.0, which is the latest at the time of writing. Extract the folder and move it to the bin
folder. Start ZooKeeper, which is essential to start with the Kafka cluster. ZooKeeper is the coordination service to manage the brokers, leader election for partitions, and alerts when Kafka changes topics (i.e. deletes topic, creates topic, etc.) or brokers (add broker, dead broker, etc.). In this example, I have started only one ZooKeeper instance. In production environments, we should have more ZooKeeper instances to manage fail-over. Without ZooKeeper, the Kafka cluster cannot work.
./zookeeper-server-start.sh ../config/zookeeper.properties
Step 2
Now, start the Kafka brokers. In this example, we are going to start three brokers. Go to the config folder under Kafka root, copy the server.properties
file three times, and name them server_1.properties
, server_2.properties
and server_3.properties
. Change the below properties in those files.
#####server_1.properties#####
broker.id=1
listeners=PLAINTEXT://:9091
log.dirs=/tmp/kafka-logs-1
#####server_2.properties######
broker.id=2
listeners=PLAINTEXT://:9092
log.dirs=/tmp/kafka-logs-2
######server_3.properties#####
broker.id=3
listeners=PLAINTEXT://:9093
log.dirs=/tmp/kafka-logs-3
Now run the three brokers with the below commands.
###Start Broker 1 #######
./kafka-server-start.sh ../config/server_1.properties
###Start Broker 2 #######
./kafka-server-start.sh ../config/server_2.properties
###Start Broker 3 #######
./kafka-server-start.sh ../config/server_3.properties
Step 3
Create a topic with the below command.
./kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 3 --partitions 3 --topic first_topic
Step 4
Produce some messages on the topic created in the above step using the Kafka console producer. For the console producer, mention any one of the broker addresses. That will be the bootstrap server to gain access to the entire cluster.
./kafka-console-producer.sh --broker-list localhost:9091 --topic first_topic
>First message
>Second message
>Third message
>Fourth message
>
Step 5
Consume the messages using the Kafka console consumer. For the Kafka consumer, mention any of the broker addresses as the bootstrap server. Remember that while reading the messages, you may not see the order, as the order is maintained at the partition level, not at the topic level.
./kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic first_topic --from-beginning
If you want, you can describe the topic to see how partitions are distributed and see the leaders of each partition using the below command.
./kafka-topics.sh --describe --zookeeper localhost:2181 --topic first_topic
#### The Result for the above command#####
Topic:first_topicPartitionCount:3ReplicationFactor:3Configs:
Topic: first_topicPartition: 0Leader: 1Replicas: 1,2,3Isr: 1,2,3
Topic: first_topicPartition: 1Leader: 2Replicas: 2,3,1Isr: 2,3,1
Topic: first_topicPartition: 2Leader: 3Replicas: 3,1,2Isr: 3,1,2
In the above description, broker-1 is the leader for partition:0 and broker-1, broker-2, and broker-3 has replicas of each partition.
Further Reading
How to Use the Kafka Streams API
Understanding When to Use RabbitMQ or Apache Kafka