Apache Kafka With Scala Tutorial

This article was first published on the Knoldus blog.

Before the introduction of Apache Kafka, data pipelines used to be very complex and time-consuming. A separate streaming pipeline was needed for every consumer. You can see the complexity of it with the help of the below diagram.

Apache Kafka solved this problem and provided a universal pipeline that is fault-tolerant, scalable, and simple to use. There is now a single pipeline needed to cater to multiple consumers, which can be also seen with the help of the below diagram.

This blog will help you in getting started with Apache Kafka, understand its basic terminologies and how to create Kafka producers and consumers using its APIs in Scala.

Apache Kafka is an open source project initially created by LinkedIn, that is designed to be a distributed, partitioned, replicated commit log service. It provides the functionality of a messaging system.

Topic and Messages in Apache Kafka

A topic in Kafka is where all the messages are stored that are produced. Messages are a unit of data which can be byte arrays and any object can be stored in any format

There are two components of any message: a key and a value. The key is used to represent the data about the message and the value represents the body of the message.

Apache Kafka is a feed of messages which are organized into what is called a topic. Think of it as a category of messages. Kafka topics can be divided into a number of Partitions as shown in below diagram.

Apache Kafka uses partitions to scale a topic across many servers for producer writes. A Kafka cluster is comprised of one or more servers which are called brokers. Each of these Kafka brokers stores one or more partitions on it.

Kafka Producer and Consumer

Kafka provides the Producer API and Consumer API. Producers are used to publish messages to Kafka topics that are stored in different topic partitions. Each of these topic partitions is an ordered, immutable sequence of messages that are continually appended to.

The Kafka Producer maps each message it would like to produce to a topic. Kafka Producer is the client that publishes records to the Kafka cluster and notes that it is thread-safe. The producer client controls which partition it publishes messages to.

Consumers are to subscribe to the Kafka topics and process the feed of published messages in real-time. Kafka retains all the messages that are published regardless of whether they have been consumed for a configurable period of time or not. The below diagram illustrates this concept.

Here we have multiple producers publishing message into the topic on the different broker and from where the consumers read from any topic to which they have subscribed.

Apache Kafka is able to spread a single topic partition across multiple brokers, which allows for horizontal scaling. By spreading the topic’s partitions across multiple brokers, consumers can read from a single topic in parallel. 

This was a basic introduction to common terminologies used while working with Apache Kafka. Now, we will move ahead and understand how to create a simple producer-consumer in Kafka.

As a pre-requisite, we should have ZooKeeper and a Kafka server up and running. You can refer to this quick start guide for setting up a single node Kafka cluster on your local machine.

Assuming that you have your server started, we will now start building a simple producer-consumer application where the producer will publish the message in a Kafka topic and a consumer can subscribe to the topic and fetch messages in real-time.

In your sbt project, add the following library dependency.

libraryDependencies += "org.apache.kafka" %% "kafka" % "2.1.0"

With the help of the following code, we will be publishing messages into Kafka topicquick-start”.

import java.util.Properties
import org.apache.kafka.clients.producer._

class Producer {

  def main(args: Array[String]): Unit = {
    writeToKafka("quick-start")
  }

  def writeToKafka(topic: String): Unit = {
    val props = new Properties()
    props.put("bootstrap.servers", "localhost:9094")
    props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer")
    props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer")
    val producer = new KafkaProducer[String, String](props)
    val record = new ProducerRecord[String, String](topic, "key", "value")
    producer.send(record)
    producer.close()
  }
}

At the same time, we can have our Kafka Consumer up and running which is subscribing to the Kafka topic “quick-start and displaying the messages.

import java.util
import org.apache.kafka.clients.consumer.KafkaConsumer
import java.util.Properties
import scala.collection.JavaConverters._

class Consumer {

  def main(args: Array[String]): Unit = {
    consumeFromKafka("quick-start")
  }

  def consumeFromKafka(topic: String) = {
    val props = new Properties()
    props.put("bootstrap.servers", "localhost:9094")
    props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer")
    props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer")
    props.put("auto.offset.reset", "latest")
    props.put("group.id", "consumer-group")
    val consumer: KafkaConsumer[String, String] = new KafkaConsumer[String, String](props)
    consumer.subscribe(util.Arrays.asList(topic))
    while (true) {
      val record = consumer.poll(1000).asScala
      for (data <- record.iterator)
        println(data.value())
    }
  }
}

 

 

 

 

Top