Using CockroachDB CDC With Apache Pulsar
Previous Articles on CockroachDB CDC
- Using CockroachDB CDC with Azure Event Hubs
- Using CockroachDB CDC with Confluent Cloud Kafka and Schema Registry
- SaaS Galore: Integrating CockroachDB with Confluent Kafka, Fivetran, and Snowflake
- CockroachDB CDC using Minio as a cloud storage sink
- CockroachDB CDC using Hadoop Ozone S3 Gateway as a cloud storage sink
Motivation
Apache Pulsar is a cloud-native distributed messaging and streaming platform. In my customer conversations, it most often comes up when compared to Apache Kafka. I have a customer needing a Pulsar sink support as they rely on Pulsar's multi-region capabilities. CockroachDB does not have a native Pulsar sink; however, the Pulsar project supports Kafka on Pulsar protocol support, and that's the core of today's article.
This tutorial assumes you have an enterprise license, you can also leverage our managed offerings where enterprise changefeeds are enabled by default. I am going to demonstrate the steps using a Docker environment instead.
High-Level Steps
- Deploy Apache Pulsar
- Deploy a CockroachDB cluster with enterprise changefeeds
- Deploy a Kafka Consumer
- Verify
- Conclusion
Step-By-Step Instructions
Deploy Apache Pulsar
Since I'm using Docker, I'm relying on the KoP Docker Compose environment provided by the Stream Native platform, which spearheads the development of Apache Pulsar.
I've used the service taken from the KoP example almost as-is aside from a few differences:
pulsar:
container_name: pulsar
hostname: pulsar
image: streamnative/sn-pulsar:2.11.0.5
command: >
bash -c "bin/apply-config-from-env.py conf/standalone.conf &&
exec bin/pulsar standalone -nss -nfw" # disable stream storage and functions worker
environment:
allowAutoTopicCreationType: partitioned
brokerDeleteInactiveTopicsEnabled: "false"
PULSAR_PREFIX_messagingProtocols: kafka
PULSAR_PREFIX_kafkaListeners: PLAINTEXT://pulsar:9092
PULSAR_PREFIX_brokerEntryMetadataInterceptors: org.apache.pulsar.common.intercept.AppendIndexMetadataInterceptor
PULSAR_PREFIX_webServicePort: "8088"
ports:
- 6650:6650
- 8088:8088
- 9092:9092
I removed PULSAR_PREFIX_kafkaAdvertisedListeners: PLAINTEXT://127.0.0.1:19092
as I don't need it, I also changed the exposed port for - 19092:9092
to - 9092:9092
. My PULSAR_PREFIX_kafkaListeners
address points to a Docker container with the hostname pulsar
. I will need to access the address from other containers and I can't rely on the localhost. I'm also using a more recent version of the image than the one in their docs.
Deploy a CockroachDB Cluster With Enterprise Changefeeds
I am using a 3-node cluster in Docker. If you've followed my previous articles, you should be familiar with it.
I am using Flyway to set up the schema and seed the tables. The actual schema and data are taken from the changefeed examples we have in our docs. The only difference is I'm using a database called example
.
To enable CDC we need to execute the following commands:
SET CLUSTER SETTING cluster.organization = '<organization name>'; SET CLUSTER SETTING enterprise.license = '<secret>'; SET CLUSTER SETTING kv.rangefeed.enabled = true;
Again, if you don't have an enterprise license, you won't be able to complete this tutorial. Feel free to use our Dedicated or Serverless instances if you want to follow along.
Finally, after the tables and the data are in place, we can create a changefeed on these tables.
CREATE CHANGEFEED FOR TABLE office_dogs, employees INTO 'kafka://pulsar:9092';
Here I am using the Kafka port and the address of the Pulsar cluster, in my case pulsar
.
job_id ---------------------- 855538618543276035 (1 row) NOTICE: changefeed will emit to topic office_dogs NOTICE: changefeed will emit to topic employees Time: 50ms total (execution 49ms / network 1ms)
Everything seems to work and changefeed does not error out.
Deploy a Kafka Consumer
To validate data is being written to Pulsar, we need to stand up a Kafka client. I've created an image that downloads and installs Kafka. Once the entire Docker Compose environment is running, we can access the client and run the console consumer to verify.
/opt/kafka/bin/kafka-console-consumer.sh --bootstrap-server pulsar:9092 --topic office_dogs --from-beginning
{"after": {"id": 1, "name": "Petee H"}} {"after": {"id": 2, "name": "Carl"}}
If we want to validate that new data is flowing, let's insert another record into CockroachDB:
INSERT INTO office_dogs VALUES (3, 'Test');
The consumer will print a new row:
{"after": {"id": 3, "name": "Test"}}
Since we've created two topics, let's now look at the employees
topic.
/opt/kafka/bin/kafka-console-consumer.sh --bootstrap-server pulsar:9092 --topic employees --from-beginning
{"after": {"dog_id": 1, "employee_name": "Lauren", "rowid": 855539880336523267}} {"after": {"dog_id": 2, "employee_name": "Spencer", "rowid": 855539880336654339}}
Similarly, let's update a record and see the changes propagate to Pulsar.
UPDATE employees SET employee_name = 'Spencer Kimball' WHERE dog_id = 2;
{"after": {"dog_id": 2, "employee_name": "Spencer Kimball", "rowid": 855539880336654339}}
Verify
We've confirmed we can produce messages to Pulsar topics using the Kafka protocol via KoP. We've also confirmed we can consume using the Kafka console consumer. We can also use the native Pulsar tooling to confirm the data is consumable from Pulsar. I installed the Pulsar Python client, pip install pulsar-client
, on the Kafka client machine and created a Python script with the following code:
import pulsar client = pulsar.Client('pulsar://pulsar:6650') consumer = client.subscribe('employees', subscription_name='my-sub') while True: msg = consumer.receive() print("Received message: '%s'" % msg.data()) consumer.acknowledge(msg) client.close()
I execute the script:
root@kafka-client:/opt/kafka# python3 consume_messages.py 2023-04-11 14:17:21.761 INFO [281473255101472] Client:87 | Subscribing on Topic :employees 2023-04-11 14:17:21.762 INFO [281473255101472] ClientConnection:190 | [<none> -> pulsar://pulsar:6650] Create ClientConnection, timeout=10000 2023-04-11 14:17:21.762 INFO [281473255101472] ConnectionPool:97 | Created connection for pulsar://pulsar:6650 2023-04-11 14:17:21.763 INFO [281473230237984] ClientConnection:388 | [172.28.0.3:33826 -> 172.28.0.6:6650] Connected to broker 2023-04-11 14:17:21.771 INFO [281473230237984] HandlerBase:72 | [persistent://public/default/employees-partition-0, my-sub, 0] Getting connection from pool 2023-04-11 14:17:21.776 INFO [281473230237984] ClientConnection:190 | [<none> -> pulsar://pulsar:6650] Create ClientConnection, timeout=10000 2023-04-11 14:17:21.776 INFO [281473230237984] ConnectionPool:97 | Created connection for pulsar://localhost:6650 2023-04-11 14:17:21.776 INFO [281473230237984] ClientConnection:390 | [172.28.0.3:33832 -> 172.28.0.6:6650] Connected to broker through proxy. Logical broker: pulsar://localhost:6650 2023-04-11 14:17:21.786 INFO [281473230237984] ConsumerImpl:238 | [persistent://public/default/employees-partition-0, my-sub, 0] Created consumer on broker [172.28.0.3:33832 -> 172.28.0.6:6650] 2023-04-11 14:17:21.786 INFO [281473230237984] MultiTopicsConsumerImpl:274 | Successfully Subscribed to a single partition of topic in TopicsConsumer. Partitions need to create : 0 2023-04-11 14:17:21.786 INFO [281473230237984] MultiTopicsConsumerImpl:137 | Successfully Subscribed to Topics
Let's insert a record into the employees
tables:
INSERT INTO employees (dog_id, employee_name) VALUES (3, 'Test'); UPDATE employees SET employee_name = 'Artem' WHERE dog_id = 3;
The Pulsar client output is as follows:
Received message: 'b'{"after": {"dog_id": 3, "employee_name": "Test", "rowid": 855745376561364994}}'' Received message: 'b'{"after": {"dog_id": 3, "employee_name": "Artem", "rowid": 855745376561364994}}''
Conclusion
This is how you can leverage existing CockroachDB capability with non-standard services like Apache Pulsar. Hopefully, you've found this article useful and can start leveraging the existing Kafka sink with non-standard message brokers.