Custom Partitioner in Kafka Using Scala: Take Quick Tour!

In this blog, we are going to explore the Kafka partitioner. We will try to understand why the default partitioner is not enough and when you might need a custom partitioner. We will also look at a use case and create code for the custom partitioner. I'm assuming that you have sound knowledge of Kafka. Let’s understand the behavior of the default partitioner.

The default partitioner follows these rules:

  1. If a producer provides a partition number in the message record, use it.
  2. If a producer doesn’t provide a partition number, but it provides a key, choose a partition based on a hash value of the key.
  3. When no partition number or key is present, pick a partition in a round-robin fashion.

So, you can use the default partitioner in three scenarios:

  1. If you already know the partition number in which you want to send a message record then use the first rule.
  2. When you want to distribute data based on the hash key, you will use the second rule of default partitioner.
  3. If you don’t care about which partition message record will be stored, then you will use the third rule of default partitioner.

There are two problems with the key:

  1. If the producer provides the same key for each message record then hashing will give you the same hash number, but it doesn’t ensure that if you provide two different keys, then it will never give you the same hash number. 
  2. The default partitioner uses the hash value of the key and the total number of partitions on a topic to determine the partition number. If you increase partition number, the default partitioner will return different numbers even if you provide the same key.

Now, you might have questions about how to solve this problem.

The answer to this question is very simple: you can implement your algorithm based on your requirements and use it in the custom partitioner.

You may also like: Kafka Internals: Topics and Partitions.

Kafka Custom Partitioner Example

Let’s create an example use-case and implement a custom partitioner. Try to understand the problem statement with the help of a diagram.

Assume we are collecting data from different departments. All the departments are sending data to a single topic named department. I planned five partitions for the topic. But, I want two partitions dedicated to a specific department, named IT, and the remaining three partitions for the rest of the departments. How would you achieve this?

You can solve this requirement, and any other type of partitioning needs by implementing a custom partitioner.

Kafka Producer

Let’s look at the producer code.

Scala




x


1
package com.knoldus
2
 
          
3
import java.util.Properties
4
import org.apache.kafka.clients.producer._
5
 
          
6
object KafkaProducer extends App {
7
  val props = new Properties()
8
  val topicName = "department"
9
  props.put("bootstrap.servers", "localhost:9092,localhost:9093")
10
  props.put("key.serializer","org.apache.kafka.common.serialization.StringSerializer")
11
  props.put("value.serializer","org.apache.kafka.common.serialization.StringSerializer")
12
  props.put("partitioner.class", "com.knoldus.CustomPartitioner")
13
 
          
14
  val producer = new KafkaProducer[String, String](props)
15
 
          
16
  try {
17
    for (i <- 0 to 5) {
18
      val record = new ProducerRecord[String, String](topicName,"IT" + i,"My Site is knoldus.com " + i)
19
      producer.send(record)
20
    }
21
    for (i <- 0 to 5) {
22
      val record = new ProducerRecord[String, String](topicName,"COMP" + i,"My Site is knoldus.com " + i)
23
      producer.send(record)
24
    }
25
  } catch {
26
    case e: Exception => e.printStackTrace()
27
  } finally {
28
    producer.close()
29
  }
30
}
9
  props.put("bootstrap.servers", "localhost:9092,localhost:9093")



The first step in writing messages to Kafka is to create a producer object with the properties you want to pass to the producer. A Kafka producer has three mandatory properties, as you can see in the above code:

  1. bootstrap.serversz: port pairs of Kafka broker that the producer will use to establish a connection to the Kafka cluster. It is recommended that you should include at least two Kafka brokers because if one Kafka broker goes down, then the producer will still be able to connect Kafka cluster.
  2. Key.serializer: Name of the class that will be used to serialize key.
  3. value.serializer: Name of the class that will be used to serialize a value.

If you look at the rest of the code, there are only three steps:

  1. Create a KafkaProducer object.
  2. Create a ProducerRecord object.
  3. Send the record to the broker.

That is all that we do in a Kafka Producer.

Kafka Custom Partitioner

We need to create our class by implementing the Partitioner Interface. Your custom partitioner class must implement three methods from the interface.

  1. Configure.
  2. Partition.
  3. Close.

Let’s look at the code.

Scala




xxxxxxxxxx
1
31


 
1
package com.knoldus
2
 
          
3
import java.util
4
import org.apache.kafka.common.record.InvalidRecordException
5
import org.apache.kafka.common.utils.Utils
6
import org.apache.kafka.clients.producer.Partitioner
7
import org.apache.kafka.common.Cluster
8
 
          
9
class CustomPartitioner extends Partitioner {
10
  val departmentName = "IT"
11
  override def configure(configs: util.Map[String, _]): Unit = {}
12
 
          
13
  override def partition(topic: String,key: Any, keyBytes: Array[Byte], value: Any,valueBytes: Array[Byte],cluster: Cluster): Int = {
14
    val partitions = cluster.partitionsForTopic(topic)
15
    val numPartitions = partitions.size
16
    val it = Math.abs(numPartitions * 0.4).asInstanceOf[Int]
17
 
          
18
    if ((keyBytes == null) || (!key.isInstanceOf[String]))
19
      throw new InvalidRecordException("All messages must have department name as key")
20
 
          
21
    if (key.asInstanceOf[String].startsWith(departmentName)) {
22
      val p = Utils.toPositive(Utils.murmur2(keyBytes)) % it
23
         p
24
    } else {
25
      val p = Utils.toPositive(Utils.murmur2(keyBytes)) % (numPartitions - it) + it
26
           p
27
    }
28
  }
29
 
          
30
  override def close(): Unit = {}
31
}



configure and close methods are used for initialization and clean up. In our example, we don’t have anything to clean up and initialize.

The partition method is the place where all the action happens. The producer will call this method for each message record.input to this method is key, topic, cluster details. we need to do is to return an integer as a partition number. This is the place where we have to write our algorithm.

Algorithm

Let’s try to understand the algorithm that I have implemented. I am applying my algorithm in four simple steps.

  1. The first step is to determine the number of partitions and reserve 40% of it for the IT department. If I have five partitions for the topic, this logic will reserve two partitions for IT. The next question is, how do we get the number of partitions in the topic?

    We got a cluster object as an input, and the method, partitionsForTopic, will give us a list of all partitions. Then, we take the size of the list. That’s the number of partitions in the Topic. Then, we set IT as 40% of the number of partitions. So, if I have five partitions, IT should be set to 2.
  2. If we don’t get a message Key, throw an exception. We need the Key because the Key tells us the department name. Without knowing the department name, we can’t decide that the message should go to one of the two reserved partitions or it should go to the other three partitions.
  3. The next step is to determine the partition number. If the Key = IT, then we hash the message value, divide it by 2 and take the mod as partition number. Using mod will make sure that we always get 0 or 1.
  4. If the Key != IT, then we divide it by 3 and again take the mod. The mod will be somewhere between 0 and 2. So, I am adding 2 to shift it by 2

Kafka Consumer

Let’s look at the consumer code.

Scala




xxxxxxxxxx
1
33


 
1
package com.knoldus
2
 
          
3
import java.util
4
import java.util.Properties
5
import scala.jdk.CollectionConverters._
6
import org.apache.kafka.clients.consumer.KafkaConsumer
7
 
          
8
object KafkaConsumer extends App {
9
 
          
10
  val props: Properties = new Properties()
11
  val topicName = "department"
12
 
          
13
  props.put("group.id", "test")
14
  props.put("bootstrap.servers", "localhost:9092,localhost:9093")
15
  props.put("key.deserializer","org.apache.kafka.common.serialization.StringDeserializer")
16
  props.put("value.deserializer","org.apache.kafka.common.serialization.StringDeserializer")
17
 
          
18
  val consumer = new KafkaConsumer(props)
19
  try {
20
    consumer.subscribe(util.Arrays.asList(topicName))
21
    while (true) {
22
      val records = consumer.poll(10)
23
      for (record <- records.asScala) {
24
        println("Topic: " + record.topic() + ", Offset: " + record.offset() +", Partition: " + record.partition())
25
      }
26
    }
27
  } catch {
28
    case e: Exception => e.printStackTrace()
29
  } finally {
30
    consumer.close()
31
  }
32
}
33
 
          


 
A Kafka consumer has three mandatory properties as you can see in the above code:

  1. bootstrap.servers: port pairs of Kafka broker that the consumer will use to establish a connection to the Kafka cluster.it is recommended that you should include at least two Kafka brokers because if one Kafka broker goes down then the consumer will still be able to connect Kafka cluster.
  2. key.deserializer: Name of the class that will be used to deserialize key.
  3. value.deserializer: Name of the class that will be used to deserialize a value.

If you look at the rest of the code, there are only two steps:

  1. Subscribe to the topic.
  2. Consume messages from the topic.

That is all that we do in a Kafka Consumer.

I hope you enjoy this blog. You can now create a custom partitioner in Kafka using scala. If you want the source code, please feel free to downloadit.

Thanks for reading!

References

1.https://kafka.apache.org/documentation/.

2.https://docs.confluent.io/.

 

 

 

 

Top