Using CockroachDB CDC With Apache Pulsar

Previous Articles on CockroachDB CDC

Motivation

Apache Pulsar is a cloud-native distributed messaging and streaming platform. In my customer conversations, it most often comes up when compared to Apache Kafka. I have a customer needing a Pulsar sink support as they rely on Pulsar's multi-region capabilities. CockroachDB does not have a native Pulsar sink; however, the Pulsar project supports Kafka on Pulsar protocol support, and that's the core of today's article.

This tutorial assumes you have an enterprise license, you can also leverage our managed offerings where enterprise changefeeds are enabled by default. I am going to demonstrate the steps using a Docker environment instead.

High-Level Steps

Step-By-Step Instructions

Deploy Apache Pulsar

Since I'm using Docker, I'm relying on the KoP Docker Compose environment provided by the Stream Native platform, which spearheads the development of Apache Pulsar.

I've used the service taken from the KoP example almost as-is aside from a few differences:

pulsar:
    container_name: pulsar
    hostname: pulsar
    image: streamnative/sn-pulsar:2.11.0.5
    command: >
      bash -c "bin/apply-config-from-env.py conf/standalone.conf &&
      exec bin/pulsar standalone -nss -nfw" # disable stream storage and functions worker
    environment:
      allowAutoTopicCreationType: partitioned
      brokerDeleteInactiveTopicsEnabled: "false"
      PULSAR_PREFIX_messagingProtocols: kafka
      PULSAR_PREFIX_kafkaListeners: PLAINTEXT://pulsar:9092
      PULSAR_PREFIX_brokerEntryMetadataInterceptors: org.apache.pulsar.common.intercept.AppendIndexMetadataInterceptor
      PULSAR_PREFIX_webServicePort: "8088"
    ports:
      - 6650:6650
      - 8088:8088
      - 9092:9092


I removed PULSAR_PREFIX_kafkaAdvertisedListeners: PLAINTEXT://127.0.0.1:19092 as I don't need it, I also changed the exposed port for - 19092:9092 to - 9092:9092. My PULSAR_PREFIX_kafkaListeners address points to a Docker container with the hostname pulsar. I will need to access the address from other containers and I can't rely on the localhost. I'm also using a more recent version of the image than the one in their docs.

Deploy a CockroachDB Cluster With Enterprise Changefeeds

I am using a 3-node cluster in Docker. If you've followed my previous articles, you should be familiar with it.

I am using Flyway to set up the schema and seed the tables. The actual schema and data are taken from the changefeed examples we have in our docs. The only difference is I'm using a database called example.

To enable CDC we need to execute the following commands:

SET CLUSTER SETTING cluster.organization = '<organization name>';

SET CLUSTER SETTING enterprise.license = '<secret>';

SET CLUSTER SETTING kv.rangefeed.enabled = true;


Again, if you don't have an enterprise license, you won't be able to complete this tutorial. Feel free to use our Dedicated or Serverless instances if you want to follow along.

Finally, after the tables and the data are in place, we can create a changefeed on these tables.

CREATE CHANGEFEED FOR TABLE office_dogs, employees INTO 'kafka://pulsar:9092';


Here I am using the Kafka port and the address of the Pulsar cluster, in my case pulsar.

        job_id
----------------------
  855538618543276035
(1 row)

NOTICE: changefeed will emit to topic office_dogs
NOTICE: changefeed will emit to topic employees

Time: 50ms total (execution 49ms / network 1ms)


Everything seems to work and changefeed does not error out.

Deploy a Kafka Consumer

To validate data is being written to Pulsar, we need to stand up a Kafka client. I've created an image that downloads and installs Kafka. Once the entire Docker Compose environment is running, we can access the client and run the console consumer to verify.

/opt/kafka/bin/kafka-console-consumer.sh --bootstrap-server pulsar:9092 --topic office_dogs --from-beginning
{"after": {"id": 1, "name": "Petee H"}}
{"after": {"id": 2, "name": "Carl"}}


If we want to validate that new data is flowing, let's insert another record into CockroachDB:

INSERT INTO office_dogs VALUES (3, 'Test');


The consumer will print a new row:

{"after": {"id": 3, "name": "Test"}}


Since we've created two topics, let's now look at the employees topic.

/opt/kafka/bin/kafka-console-consumer.sh --bootstrap-server pulsar:9092 --topic employees --from-beginning
{"after": {"dog_id": 1, "employee_name": "Lauren", "rowid": 855539880336523267}}
{"after": {"dog_id": 2, "employee_name": "Spencer", "rowid": 855539880336654339}}


Similarly, let's update a record and see the changes propagate to Pulsar.

UPDATE employees SET employee_name = 'Spencer Kimball' WHERE dog_id = 2;
{"after": {"dog_id": 2, "employee_name": "Spencer Kimball", "rowid": 855539880336654339}}


Verify

We've confirmed we can produce messages to Pulsar topics using the Kafka protocol via KoP. We've also confirmed we can consume using the Kafka console consumer. We can also use the native Pulsar tooling to confirm the data is consumable from Pulsar. I installed the Pulsar Python client, pip install pulsar-client, on the Kafka client machine and created a Python script with the following code:

import pulsar

client = pulsar.Client('pulsar://pulsar:6650')
consumer = client.subscribe('employees',
                            subscription_name='my-sub')

while True:
    msg = consumer.receive()
    print("Received message: '%s'" % msg.data())
    consumer.acknowledge(msg)

client.close()


I execute the script:

root@kafka-client:/opt/kafka# python3 consume_messages.py 
2023-04-11 14:17:21.761 INFO  [281473255101472] Client:87 | Subscribing on Topic :employees
2023-04-11 14:17:21.762 INFO  [281473255101472] ClientConnection:190 | [<none> -> pulsar://pulsar:6650] Create ClientConnection, timeout=10000
2023-04-11 14:17:21.762 INFO  [281473255101472] ConnectionPool:97 | Created connection for pulsar://pulsar:6650
2023-04-11 14:17:21.763 INFO  [281473230237984] ClientConnection:388 | [172.28.0.3:33826 -> 172.28.0.6:6650] Connected to broker
2023-04-11 14:17:21.771 INFO  [281473230237984] HandlerBase:72 | [persistent://public/default/employees-partition-0, my-sub, 0] Getting connection from pool
2023-04-11 14:17:21.776 INFO  [281473230237984] ClientConnection:190 | [<none> -> pulsar://pulsar:6650] Create ClientConnection, timeout=10000
2023-04-11 14:17:21.776 INFO  [281473230237984] ConnectionPool:97 | Created connection for pulsar://localhost:6650
2023-04-11 14:17:21.776 INFO  [281473230237984] ClientConnection:390 | [172.28.0.3:33832 -> 172.28.0.6:6650] Connected to broker through proxy. Logical broker: pulsar://localhost:6650
2023-04-11 14:17:21.786 INFO  [281473230237984] ConsumerImpl:238 | [persistent://public/default/employees-partition-0, my-sub, 0] Created consumer on broker [172.28.0.3:33832 -> 172.28.0.6:6650] 
2023-04-11 14:17:21.786 INFO  [281473230237984] MultiTopicsConsumerImpl:274 | Successfully Subscribed to a single partition of topic in TopicsConsumer. Partitions need to create : 0
2023-04-11 14:17:21.786 INFO  [281473230237984] MultiTopicsConsumerImpl:137 | Successfully Subscribed to Topics


Let's insert a record into the employees tables:

INSERT INTO employees (dog_id, employee_name) VALUES (3, 'Test');
UPDATE employees SET employee_name = 'Artem' WHERE dog_id = 3;


The Pulsar client output is as follows:

Received message: 'b'{"after": {"dog_id": 3, "employee_name": "Test", "rowid": 855745376561364994}}''
Received message: 'b'{"after": {"dog_id": 3, "employee_name": "Artem", "rowid": 855745376561364994}}''


Conclusion

This is how you can leverage existing CockroachDB capability with non-standard services like Apache Pulsar. Hopefully, you've found this article useful and can start leveraging the existing Kafka sink with non-standard message brokers.

 

 

 

 

Top