ClickHouse Kafka Engine Tutorial

Our colleague Mikhail Filimonov just published an excellent ClickHouse Kafka Engine FAQ. It provides users with answers to common questions about using stable versions, configuration parameters, standard SQL definitions, and many other topics. Even experienced users are likely to learn something new. 

But what if you are getting started and need help setting up Kafka and ClickHouse for the first time? Good news! This article is for you. 

We’ll work through an end-to-end example that loads data from a Kafka topic into a ClickHouse table using the Kafka engine. We will also show how to reset offsets and reload data, as well as how to change the table schema. Finally, we’ll demonstrate how to write data from ClickHouse back out to a Kafka topic.  

Prequisites

The exercises that follow assume you have Kafka and ClickHouse already installed and running. We used Kubernetes for convenience. The Kafka version is Confluent 5.4.0, installed using a Kafka helm chart with three Kafka brokers. The ClickHouse version is 20.4.2, installed on a single node using the ClickHouse Kubernetes Operator.  For non-Kubernetes instructions on installation, look here for Confluent Kafka and here for ClickHouse. 

The exercises should work for any type of installation, but you’ll need to change host names accordingly. You may also need to change the replication factor if you have fewer Kafka brokers.

Overview of Kafka-ClickHouse Integration

Kafka is an extremely scalable message bus. Its core is a distributed log managed by brokers running on different hosts. Here is a short description of the application model.  

Producers write messages to a topic, which is a set of messages. Consumers read messages from the topic, which is spread over partitions. Consumers are arranged in consumer groups, which allow applications to read messages in parallel from Kafka without loss or duplication.  

The following diagram illustrates the principle parts described above.


ClickHouse Kafka Engine TutorialClickHouse can read messages directly from a Kafka topic using the Kafka table engine coupled with a materialized view that fetches messages and pushes them to a ClickHouse target table. The target table is typically implemented using MergeTree engine or a variant like ReplicatedMergeTree. The flow of messages is illustrated below.

ClickHouse Kafka Engine Tutorial

It is also possible to write from ClickHouse back to Kafka. The flow of messages is simpler--just insert into the Kafka table. Here is a diagram of the flow. 

ClickHouse Kafka Engine TutorialCreating a Topic on Kafka

Let’s now set up a topic on Kafka that we can use to load messages. Log in to a Kafka server and create the topic using a command like the sample below. ‘kafka’ in this example is the DNS name of the server. If you have a different DNS name, use that instead. You may also adjust the number of partitions as well as the replication factor.  

Java
 




x


 
1
kafka-topics \
2
--bootstrap-server kafka:9092 \
3
--topic readings \
4
--create --partitions 6 \
5
--replication-factor 2


Check that the topic has been successfully created. 

Java
 




xxxxxxxxxx
1


 
1
kafka-topics --bootstrap-server kafka:9092 --describe readings


You’ll see output like the following showing the topic and current state of its partitions. 

Java
 




xxxxxxxxxx
1


 
1
Topic: readings    PartitionCount: 6    ReplicationFactor: 2    Configs:
2
    Topic: readings    Partition: 0    Leader: 0    Replicas: 0,2    Isr: 0,2
3
    Topic: readings    Partition: 1    Leader: 2    Replicas: 2,1    Isr: 2,1
4
    Topic: readings    Partition: 2    Leader: 1    Replicas: 1,0    Isr: 1,0
5
    Topic: readings    Partition: 3    Leader: 0    Replicas: 0,1    Isr: 0,1
6
    Topic: readings    Partition: 4    Leader: 2    Replicas: 2,0    Isr: 2,0
7
    Topic: readings    Partition: 5    Leader: 1    Replicas: 1,2    Isr: 1,2


At this point, we’re ready to go on the Kafka side. Let’s turn to ClickHouse.

ClickHouse Kafka Engine Setup

To read data from a Kafka topic to a ClickHouse table, we need three things:

Let’s take them in order. First, we will define the target MergeTree table. Log in to ClickHouse and issue the following SQL to create a table from our famous 500B Rows on an Intel NUC article.

Java
 




xxxxxxxxxx
1


 
1
CREATE TABLE readings (
2
    readings_id Int32 Codec(DoubleDelta, LZ4),
3
    time DateTime Codec(DoubleDelta, LZ4),
4
    date ALIAS toDate(time),
5
    temperature Decimal(5,2) Codec(T64, LZ4)
6
) Engine = MergeTree
7
PARTITION BY toYYYYMM(time)
8
ORDER BY (readings_id, time);


Next, we need to create a table using the Kafka engine to connect to the topic and read data.  The engine will read from the broker at host kafka using topic ‘readings’ and a consumer group name ‘readings consumer_group1’. The input format is CSV.  Note that we omit the ‘date’ column. It’s an alias in the target table that will populate automatically from the ‘time’ column. 

Java
 




xxxxxxxxxx
1
11


 
1
CREATE TABLE readings_queue (
2
    readings_id Int32,
3
    time DateTime,
4
    temperature Decimal(5,2)
5
)
6
ENGINE = Kafka
7
SETTINGS kafka_broker_list = 'kafka-headless.kafka:9092',
8
         kafka_topic_list = 'readings',
9
         kafka_group_name = 'readings_consumer_group1',
10
         kafka_format = 'CSV',
11
         kafka_max_block_size = 1048576;


The preceding settings handle the simplest case: a single broker, a single topic, and no specialized configuration. Check out the Kafka Table Engine docs as well as our Kafka FAQ to learn how you can change the engine behavior.  

Finally, we create a materialized view to transfer data between Kafka and the merge tree table. 

Java
 




xxxxxxxxxx
1


 
1
CREATE MATERIALIZED VIEW readings_queue_mv TO readings AS
2
SELECT readings_id, time, temperature
3
FROM readings_queue;


That’s it for the Kafka to ClickHouse integration. Let’s test it.  

Loading Data

It’s now time to load some input data using the kafka-console-producer command. Here’s an example that adds three records using CSV format. 

Java
 




xxxxxxxxxx
1


 
1
kafka-console-producer --broker-list kafka:9092 --topic readings <<END
2
1,"2020-05-16 23:55:44",14.2
3
2,"2020-05-16 23:55:45",20.1
4
3,"2020-05-16 23:55:51",12.9
5
END


Transfer to the readings table will take a couple of seconds. If we select from it we get the following output. 

Java
 






Great!  Kafka and ClickHouse are now connected. 

Rereading Messages from Kafka

The previous example started from the beginning position in the Kafka topic and read messages as they arrived. That’s the normal way, but sometimes it is useful to read messages again. For example, you might want to reread messages after fixing a bug in the schema or after reloading a backup. Fortunately, this is easy to do. We just reset the offsets in the consumer group. 

Suppose we lose all the messages in the readings table and want to reload them from Kafka. First, let’s “lose” the messages using a TRUNCATE command.  

Java
 




xxxxxxxxxx
1


 
1
TRUNCATE TABLE readings;


Before resetting offsets on the partitions, we need to turn off message consumption. Do this by detaching the readings_queue table in ClickHouse as follows. 

Java
 




xxxxxxxxxx
1


 
1
DETACH TABLE readings_queue


Next, use the following Kafka command to reset the partition offsets in the consumer group used for the readings_queue table. (Important--this is not a SQL command. You run it against Kafka, not ClickHouse.)

Java
 




xxxxxxxxxx
1


 
1
kafka-consumer-groups --bootstrap-server kafka:9092 \
2
 --topic readings --group readings_consumer_group1 \
3
 --reset-offsets --to-earliest --execute


Now re-attach the readings_queue table. Here you are back in ClickHouse. 

Java
 




xxxxxxxxxx
1


 
1
ATTACH TABLE readings_queue


Wait a few seconds, and the missing records will be restored. You can run a SELECT to confirm they arrived. 

Adding Virtual Columns

It is often useful to tag rows with information showing the original Kafka message coordinates. The Kafka table engine has automatically defined virtual columns for this purpose. Here’s how to change our readings table to show the source topic, partition, and offset.  

First, let’s disable message consumption by detaching the Kafka table. Messages can pile up on the topic but we won’t miss them. 

Java
 




xxxxxxxxxx
1


 
1
DETACH TABLE readings_queue


Next, we alter the target table and materialized view by executing the following SQL commands in succession.  Note that we just drop and recreate the materialized view whereas we alter the target table, which preserves existing data.

Java
 




xxxxxxxxxx
1
10


 
1
ALTER TABLE readings
2
  ADD COLUMN _topic String,
3
  ADD COLUMN _offset UInt64,
4
  ADD COLUMN _partition UInt64
5
  
6
DROP TABLE readings_queue_mv
7
 
           
8
CREATE MATERIALIZED VIEW readings_queue_mv TO readings AS
9
  SELECT readings_id, time, temperature, _topic, _offset, _partition
10
  FROM readings_queue;


Finally, we enable message consumption again by re-attaching the readings_queue table. 

Java
 




xxxxxxxxxx
1


 
1
ATTACH TABLE readings_queue


You can confirm the new schema by truncating the table and reloading the messages as we did in the previous section. If you select the data it will look like the following. 

Java
 




xxxxxxxxxx
1
10


1
SELECT
2
    readings_id AS id, time, temperature AS temp,
3
    _topic, _offset, _partition
4
FROM readings
5
 
           
6
┌─id─┬────────────────time─┬──temp─┬─_topic───┬─_offset─┬─_partition─┐
7
  1  2020-05-16 23:55:44  14.20  readings        0           5 
8
  2  2020-05-16 23:55:45  20.10  readings        1           5 
9
  3  2020-05-16 23:55:51  12.90  readings        2           5 
10
└────┴─────────────────────┴───────┴──────────┴─────────┴────────────┘


The foregoing procedure incidentally is the same way you would upgrade schema when message formats change. Also, materialized views provide a very general way to adapt Kafka messages to target table rows.  You can even define multiple materialized views to split the message stream across different target tables. 

Writing From ClickHouse to Kafka

We will end the tutorial by showing how to write messages from ClickHouse back to Kafka.  This is a relatively new feature that is available in the current Altinity stable build 19.16.18.85. 

Let’s start by creating a new topic in Kafka to contain messages. We’ll call it ‘readings_high’ for reasons that will become apparent shortly. 

Java
 




xxxxxxxxxx
1


 
1
kafka-topics \
2
--bootstrap-server kafka:9092 \
3
--topic readings_high \
4
--create --partitions 6 \
5
--replication-factor 2


Next, we need to define a table using the Kafka table engine that points to our new topic.  This table can read and write messages, as it turns out, but in this example we’ll just use it for writing. 

Java
 




xxxxxxxxxx
1
11


 
1
CREATE TABLE readings_high_queue (
2
    readings_id Int32,
3
    time DateTime,
4
    temperature Decimal(5,2)
5
)
6
ENGINE = Kafka
7
SETTINGS kafka_broker_list = 'kafka:9092',
8
         kafka_topic_list = 'readings_high',
9
         kafka_group_name = 'readings_high_consumer_group1',
10
         kafka_format = 'CSV',
11
         kafka_max_block_size = 1048576;


Finally, let’s add a materialized view to transfer any row with a temperature greater than 20.0 to the readings_high_queue table. This example illustrates yet another use case for ClickHouse materialized views, namely, to generate events under particular conditions. 

Java
 




xxxxxxxxxx
1


 
1
CREATE MATERIALIZED VIEW readings_high_queue_mv TO readings_high_queue AS
2
SELECT readings_id, time, temperature FROM readings
3
WHERE toFloat32(temperature) >= 20.0


Start a consumer in separate terminal window to print out messages from the readings_high topic on Kafka as follows.  This will allow you to see rows as ClickHouse writes them to Kafka. 

Java
 




xxxxxxxxxx
1


 
1
kafka-console-consumer --bootstrap-server kafka:9092 --topic readings_high


Finally, load some data that will demonstrate writing back to Kafka. Let’s add a new batch to our original topic.  Run the following command in another window. 

Java
 




xxxxxxxxxx
1


 
1
kafka-console-producer --broker-list kafka:9092 --topic readings <<END
2
4,"2020-05-16 23:55:52",9.7
3
5,"2020-05-16 23:55:56",25.3
4
6,"2020-05-16 23:55:58",14.1
5
END


After a few seconds you will see the second row pop out in the window running the kafka-console-consumer command. It should look like this:

Java
 




xxxxxxxxxx
1


 
1
5,"2020-05-16 23:55:56",25.3


Dealing with Failures

If you run into problems with any examples, have a look at the ClickHouse log. Enable trace logging if you have not already done so. You can see messages like the following that signal activity in the Kafka Table Engine.

Java
 




xxxxxxxxxx
1


 
1
2020.05.17 07:24:20.609147 [ 64 ] {} <Debug> StorageKafka (readings_queue): Started streaming to 1 attached views


Errors, if any, will appear in the clickhouse-server.err.log. 

Conclusion and Further Reading

As this blog article shows, the Kafka Table Engine offers a simple and powerful way to integrate Kafka topics and ClickHouse tables. There is obviously a lot more to managing the integration--especially in a production system. We hope this article will help get you started and enable you to explore other possibilities yourself.

For more information on the ClickHouse side, check out the Kafka Table Engine documentation as well as the excellent ClickHouse Kafka Engine FAQ on this blog. For Kafka, you can start with the Apache Kafka website or documentation for your distribution. For a higher-level understanding of Kafka in general, have a look at the primer on Streams and Tables in Apache Kafka published by Confluent. 

 

 

 

 

Top