Debezium Serialization With Apache Avro and Apicurio Service Registry

This article demonstrates how to use Debezium to monitor a MySQL database and then use Apache Avro with the Apicurio service registry to externalize the data schema and reduce the payload of each one of the captured events.  

What Is Debezium?

Debezium is a set of distributed services that captures row-level database changes so that applications can see and respond to them. Debezium connectors record all events to a Red Hat AMQ Streams Kafka cluster. Applications use AMQ Streams to consume change events. Debezium uses the Apache Kafka Connect framework, which makes all of Debezium’s connectors into Kafka Connector source connectors. As such, they can be deployed and managed using AMQ Streams’ Kafka Connect custom Kubernetes resources.

Debezium provides connectors for monitoring the following databases:

In this post, you will use the MySQL connector.

Avro Serialization

In Debezium the default behavior is that the JSON converter includes the record’s message schema, which makes each record very verbose. Alternatively, you can serialize the record keys and values by using Apache Avro. To use Apache Avro serialization, you must deploy a schema registry that manages Avro message schemas and their versions.

The Apicurio Registry open-source project provides several components that work with Avro:

In the following sections we will show how to get started with Avro serialization using the Apicurio service registry for Debezium events. To successfully execute the commands you will need the following prerequisites:

Starting the Services

  1. Clone this repository:
    git clone https://github.com/hguerrero/debezium-examples.git

  2. Change to the following directory:
    cd debezium-examples/debezium-registry-avro

  3. Start the environment
    docker-compose up -d

The last command will start the following components:

Apicurio Converters

Configuring Avro at the Debezium Connector involves specifying the converter and schema registry as a part of the connectors configuration. The connector configuration file configures the connector but explicitly sets the (de-)serializers for the connector to use Avro and specifies the location of the Apicurio registry.

The container image used in this environment includes all the required libraries to access the connectors and converters.

The following are the lines required to set the key and value converters and their respective registry configuration:


JSON
 




x


 
1
        "key.converter""io.apicurio.registry.utils.converter.AvroConverter",
2
        "key.converter.apicurio.registry.url""http://registry:8080/api",
3
        "key.converter.apicurio.registry.global-id""io.apicurio.registry.utils.serde.strategy.AutoRegisterIdStrategy",
4
        "kwy.converter.apicurio.registry.as-confluent""true",
5
        "value.converter""io.apicurio.registry.utils.converter.AvroConverter",
6
        "value.converter.apicurio.registry.url""http://registry:8080/api",
7
        "value.converter.apicurio.registry.global-id""io.apicurio.registry.utils.serde.strategy.AutoRegisterIdStrategy",
8
        "value.converter.apicurio.registry.as-confluent""true"


The compatibility mode allows you to use other providers tooling to deserialize and reuse the schemas in the Apicurio service registry.

Create the Connector

Let's create the Debezium connector to start capturing the changes of the database.

  1. Create the connector using the REST API
    curl -X POST http://localhost:8083/connectors -H 'content-type:application
    /json' -d @dbz-mysql-connector-avro.json

Check the Data

The previous step created and started the connector. The database had some initial data that has been captured by Debezium and was send as events into Kafka.

Let's check the information there.

  1. Review the data using kafkacat:

kafkacat -b localhost:9092 -t avro.inventory.customers -e

You will notice that the information is not human readable. That means it was serialized correctly with Avro.

  1. To get back a readable version of the data, we need to tell kafkacat that there it needs to query the schema from the Apicurio service registry and use it to deserialize the records. Run the following command with the registry config:

kafkacat -b localhost:9092 -t avro.inventory.customers -s avro -r 
http://localhost:8081/api/ccompat -e

y you have the jq utility installed, try the following instead:

kafkacat -b localhost:9092 -t avro.inventory.customers -s avro -r 
http://localhost:8081/api/ccompat -e | jq

You can now see the Kafka record information containing only the payload as expected, but without the overhead of the Debezium schema as it is now externalized in the registry.

Summary

Although Debezium makes it easy to capture database changes and record them in Kafka, one of the more important decisions you have to make is how those change events will be serialized in Kafka. Debezium allows you to select key and value converters to select from different type of options. The Apicurio service registry allows you to store externalized versions of the schema to minimize the payload to propagate.

Debezium Apache Kafka connectors are available through Red Hat Integration, which offers a comprehensive set of integration and messaging technologies that connect applications and data across hybrid infrastructures. 

 

 

 

 

Top