Data Ingestion From RDBMS: Leveraging Confluent's JDBC Kafka Connector

JDBC Kafka Connector

Kafka Connect assumes a significant part for streaming data between Apache Kafka and other data systems. As a tool, it holds the responsibility of a scalable and reliable way to move the data in and out of Apache Kafka. Importing data from the Database set to Apache Kafka is surely perhaps the most well-known use instance of JDBC Connector (Source and Sink) that belongs to Kafka Connect.

This article aims to elaborate the steps and procedure to integrate the Confluent’s JDBC Kafka connector with an operational multi-broker Apache Kafka Cluster for ingesting data from the functioning MySQL Database to Kafka topic. Since we are not going to export/pull out data from the topic via consumer and pass through any subsequent data pipeline, the Schema Registry has not clubbed/configured with the existing multi-broker Kafka cluster to support schema evolution, compatibility, etc. The Schema Registry is not mandatory to integrate or run any Kafka Connector like JDBC, File, etc but it supports to use easily Avro, Protobuf, and JSON Schema as common data formats for the Kafka records that Kafka Connectors read from and write to. The Confluent Platform includes the Apache Kafka, additional tools, and services including Schema Registry but comes under community and commercial license.

Article Structure

This article has structured in seven parts:

Environment Details and Assumptions

Here we are considering the operational Kafka cluster having four nodes and each one is already installed and running Kafka of version 2.6.0 with Zookeeper (V 3.5.6) on top of OS Ubuntu 14.04 LTS and java version '1.8.0_101.'  As mentioned, there won't be any operation on the post ingested data, the Confluent's Schema Registry won’t be running or included in this exercise. The transported data from the MySQL database would be in a raw JSON document. But It is advisable to integrate and run the Schema Registry in real-time scenarios or in the production environment so that compatible data format can be maintained across the data sources (RDBMS) and multiple consumers along with enforced compatibility rules. You can read here how to integrate Confluent Schema Registry with a multi-node Kafka cluster.

Build/Download Confluent's JDBC Source Connector and Installation

Using the Confluent Hub client, the JDBC connectors can be installed. But we downloaded manually confluentinc-kafka-connect-jdbc-10.0.2.zip file from here and extracted on a healthy node in the cluster.   The version is 10.0.2 and the license is free to use.

JDBC Connector (Source and Sink)

You can manually build the development version by downloading the source code available at https://github.com/confluentinc/kafka-connect-jdbc.

Downloaded confluentinc-kafka-connect-jdbc-10.0.1.zip, copied to the node, extracted, and eventually copied to the location. The location should be under /usr/share/java and to make it more specific from other connectors, created one subfolder named 'kafka-connect-jdbc' with sudo privileges.

kafka-connect-jdbc

Not mandatory to copy all the jar files available inside the extracted folder, only kafka-connect-jdbc-10.0.1.jar should be copied to /usr/share/java/kafka-connect-jdbcas the rest of the jars are already available inside Apache Kafka’s lib folder.

kafka.connect.jdbc Folder

As we are importing data from the MySQL database, MySQL java Connector is required by the Kafka JDBC source connector to establish the JDBC connection. The jar file 'mysql-connector-java-5.1.48.jar' too should be copied under /usr/share/java/kafka-connect-jdbcThis MySQL-specific driver class can be downloaded from here.

Allow Remote Access to Functioning MySQL Database

We have changed the configurations of the MySQL database server instance that runs on a separate node in the cluster in order to make it accessible from another node with the root user. Normally, the MySQL database server can be accessed using a separate database user but decided not to create a separate user. By default, the MySQL server instance is only configured to listen for local connections. To enable the order to listen for an external IP address, we need to modify my.cnf file where the MySQL server is running.

>sudo vi /etc/mysql/my.cnf

Modifying my.cnf

By default, this value is set to 127.0.0.1 for the key bind-address that means the MySQL server will only look for local connections. To avoid this, we replaced 127.0.0.1 with the IP address of the node which is 192.168.10.130.

Replacing IP Address

After changing this line, save and close the file then restart the MySQL service to put the changes we made to my.cnf into effect.

> sudo systemctl restart mysql

To grants the root user with full access to the database on the remote host that is 192.168.10.110 (in this node Kafka JDBC Connector would be running); login to MySQL server which is running at node 192.168.10.130 in the cluster and connect to the MySQL database as the root user. After opening the MySQL client, execute the following SQL statements to grant all.

GRANT ALL ON <local database name>.* 
TO <remote username>@<remote node IP address> 
IDENTIFIED BY '<database user password>';

In our scenario, it was:

GRANT ALL ON Order_Streaming.* TO root@192.168.10.110 IDENTIFIED BY ‘mypassword’;

then flush privileges and exit from the MySQL client.

Configurations changes/updates on the Kafka Cluster as well as JDBC Connector.

connect-standalone.properties

As we are going to run the JDBC connector on the standalone mode in one of the nodes of the multi-broker Kafka cluster, the connect-standalone.properties file should be updated available under the config directory of Apache Kafka. In Standalone mode, a single process executes all connectors and their associated tasks.

bootstrap.servers=192.168.10.130:9092, 192.168.10.150:9092, 192.168.10.110:9092

key.converter.schemas.enable=false

value.converter.schemas.enable=false

key.converter=org.apache.kafka.connect.json.JsonConverter

value.converter=org.apache.kafka.connect.json.JsonConverter

offset.storage.file.filename=/tmp/connect.offsets

offset.flush.interval.ms=10000

The value of the key 'plugin.path' should be updated as '/usr/share/java' as mentioned above in the installation section where the jar files kafka-connect-jdbc-10.0.1.jar and mysql-connector-java-5.1.48.jar are available.

plugin.path=/usr/share/java

connect-standalone.properties

connect-jdbc-mysql-source.properties

In the same node where the JDBC connector would run, created another configuration file with the name 'connect-jdbc-myql-source.properties' under the config directory of Apache Kafka. Update the 'connect-jdbc-myql-source.properties' file with below properties:

name= local-mysql-source

connector.class=io.confluent.connect.jdbc.JdbcSourcceConnector

tax.max=1

connection.url=jdbc:mysql://192.168.10.130:3306/order_streaming

connection.user=root

connection.password=XXXX

mode=bulk

topic.prefix=connect-test

Already created one topic named connect-test with replication factor 3 and partitions 5:

Topic connect-test

Execution/Running the JDBC Connector

There is no complexity for running the JDBC Kafka connector on a multi-node Apache Kafka cluster. The connector can be run from the same node where it installed and configured, make sure the MySQL database server should be accessible that running on another node in the cluster.

If the multi-node Kafka cluster is up and running with a topic named in connect-jdbc-myql-source.propertiesopen a new terminal on the installed JDBC connector node, and run the following command inside the bin directory.

/kafka/bin $ ./connect-standalone.sh ../config/connect-standalone.properties 
../config/connect-jdbc-myql-source.properties


After running the above command, if all the settings, configurations correct then the following log would occur in the same terminal:


[2021-03-08 04:22:04,818] INFO Using JDBC dialect MySql (io.confluent.connect.jdbc.source.JdbcSourceTask:102)

[2021-03-08 04:22:04,819] INFO Attempting to open connection #1 to MySql (io.confluent.connect.jdbc.util.CachedConnectionProvider:82)

[2021-03-08 04:22:04,887] INFO Started JDBC source task (io.confluent.connect.jdbc.source.JdbcSourceTask:261)


Besides, using the built-in Kafka Connect REST API that runs on default port 8083 we can check the status of the JDBC connector. This is useful for getting status information, adding and removing connectors without stopping the process. Open another terminal and type the following command.

$ curl -X GET http://[IP of node the connector is running]:
8083/connectors/{name of the connector}/status

$ curl -X GET http://192.168.10.110:8083/connectors/{local-mysql-source}/status

And above would returns 

{"name":"local-mysql-source","connector":{"state":"RUNNING","worker_id":"192.168.10.110:8083"},"tasks":[{"id":0,"state":"RUNNING","worker_id":"192.168.10.110:8083"}],"type":"source"}

Open another terminal to execute a built-in console consumer script so that ingested data from the MySQL database can be viewed immediately in raw JSON format.

Running ingest data script

Data Verification in Messages

In this exercise, we have imported a very small sample of dummy data from a database 'order_streaming,' having only one table named 'order_details' with 3 rows.

Order Table

As mentioned at the beginning, we have not clubbed or executed any data pipeline via consumer once data arrived on the topic, so verified or displayed the messages on the console only by running Apache Kafka’s built-in console consumer script. Due to the absence of the Schema Registry and designed schema definition in it, the data in the messages are displaying in a raw JSON document.

Raw JSON Document

In real-time implementations, if we are continuously ingesting data from the databases to Apache Kafka and subsequently passing through multiple data pipelines into other data systems, it is advisable to integrate the Schema Registry to reduce operational complexity. Click here to know more about Confluent Schema Registry.

Final Note

Ingesting data from RDBMS to Apache Kafka by leveraging JDBC Kafka Connector is a good way to go even though having some downsides. Without writing a single line of code, we can start shipping data continuously from tables and subsequently sink to other data systems. However, can’t be utilized completely for event streaming from the database tables. This connector is limited only to detecting new rows on incrementing columns. To assume the column is updated with a new row, the table should have an additional timestamp column. By using incrementing + timestamp columns, the JDBC connector detect and ingest the newly added raw data to the Kafka topic. Besides, JDBC Connector also lags to listen to DELETE events on the rows of tables as it uses only SELECT queries to retrieve data. Hope you have enjoyed this read.

 

 

 

 

Top