Change Data Captures CDC from MySQL Database to Kafka with Kafka Connect and Debezium

Introduction

Debezium is an open-source project developed by Red Hat which aims to simplify this process by allowing you to extract changes from various database systems (e.g. MySQL, PostgreSQL, MongoDB) and push them to Kafka

Architecture


Debezium Connectors

Debezium has a library of connectors that capture changes from a variety of databases and produce events with very similar structures, making it easier for the applications to consume and respond to the events regardless of where the changes originated. Debezium currently have the following connectors

In this article, we are going to capture the change events from MySQL database to the Kafka Topics.

The Debezium MySQL connector tracks the structure of the tables, performs snapshots, transforms bin log events into Debezium change events and records where those events are recorded in Kafka.

Prerequisites:

  1. Kafka installed on OpenShift
  2. Kafka connect installed on OpenShift
  3. MySQL Server installed and setup

I have Installed one node Kafka cluster with Kafka connect

Java
 




x
12


 
1
[kkakarla@kkakarla amq-streams-1.4.0-ocp-install-examples]$ oc get pods
2
NAME                                          READY   STATUS      RESTARTS   AGE
3
my-cluster-entity-operator-759bfbc89d-s68wx   3/3     Running     0          9m
4
my-cluster-kafka-0                            2/2     Running     2          9m55s
5
my-cluster-zookeeper-0                        2/2     Running     0          11m
6
my-cluster-zookeeper-1                        2/2     Running     0          11m
7
my-cluster-zookeeper-2                        2/2     Running     0          11m
8
my-connect-cluster-connect-1-build            0/1     Completed   0          6m4s
9
my-connect-cluster-connect-1-deploy           0/1     Completed   0          5m27s
10
my-connect-cluster-connect-1-ltphz            1/1     Running     0          5m25s
11
strimzi-cluster-operator-64d9cf9bc7-wsf4q     1/1     Running     0          15m
12
 
          


Installing the MySQL Connector

Installing the Debezium MySQL connector is a simple process; just download the JAR, extract it to the Kafka Connect environment, and ensure the plugin’s parent directory is specified in your Kafka Connect environment.

  1. Download the Debezium MySQL connector.
  2. Extract the files into your Kafka Connect environment.
  3. Add the plugin’s parent directory to your Kafka Connect plugin path:
  4. Create a directory with Kafka Connect plug-ins
Java
 




xxxxxxxxxx
1
20


 
1
[kkakarla@kkakarla upstream-plugins]$ tree
2
.
3
└── my-plugins
4
    └── debezium-connector-mysql
5
        ├── antlr4-runtime-4.7.2.jar
6
        ├── CHANGELOG.md
7
        ├── CONTRIBUTE.md
8
        ├── COPYRIGHT.txt
9
        ├── debezium-api-1.1.2.Final.jar
10
        ├── debezium-connector-mysql-1.1.2.Final.jar
11
        ├── debezium-core-1.1.2.Final.jar
12
        ├── debezium-ddl-parser-1.1.2.Final.jar
13
        ├── LICENSE-3rd-PARTIES.txt
14
        ├── LICENSE.txt
15
        ├── mysql-binlog-connector-java-0.19.1.jar
16
        ├── mysql-connector-java-8.0.16.jar
17
        └── README.md
18
 
          
19
2 directories, 13 files
20
 
          



The Kafka Connect S2I image takes your binaries (with plug-ins and connectors) and stores them in the /tmp/kafka-plugins/s2i directory. It creates a new Kafka Connect image from this directory, which can then be used with the Kafka Connect deployment. When started using the enhanced image, Kafka Connect loads any third-party plug-ins from the /tmp/kafka-plugins/s2i directory.

Java
 




xxxxxxxxxx
1


1
[kkakarla@kkakarla upstream-plugins]$ oc start-build my-connect-cluster-connect --from-dir ./my-plugins/
2
Uploading directory "my-plugins" as binary input for the build ...
3
....................
4
Uploading finished
5
build.build.openshift.io/my-connect-cluster-connect-2 started


Java
 




xxxxxxxxxx
1


1
[kkakarla@kkakarla upstream-plugins]$ oc get build
2
NAME                           TYPE     FROM     STATUS     STARTED          DURATION
3
my-connect-cluster-connect-1   Source            Complete   59 minutes ago   36s
4
my-connect-cluster-connect-2   Source   Binary   Complete   6 minutes ago    2m7s
5
 
          



Installing MySQL


Shell
 




x
37


 
1
[kkakarla@kkakarla upstream-plugins]$ oc new-app --name=mysql debezium/example-mysql:1.0
2
 
          
3
--> Found container image d8322b2 (3 days old) from Docker Hub for "debezium/example-mysql:1.0"
4
 
          
5
 
          
6
 
          
7
    * An image stream tag will be created as "mysql:1.0" that will track this image
8
 
          
9
    * This image will be deployed in deployment config "mysql"
10
 
          
11
    * Ports 3306/tcp, 33060/tcp will be load balanced by service "mysql"
12
 
          
13
      * Other containers can access this service through the hostname "mysql"
14
 
          
15
    * This image declares volumes and will default to use non-persistent, host-local storage.
16
 
          
17
      You can add persistent volumes later by running 'oc set volume dc/mysql --add ...'
18
 
          
19
    * WARNING: Image "debezium/example-mysql:1.0" runs as the 'root' user which may not be permitted by your cluster administrator
20
 
          
21
 
          
22
 
          
23
--> Creating resources ...
24
 
          
25
    imagestream.image.openshift.io "mysql" created
26
 
          
27
    deploymentconfig.apps.openshift.io "mysql" created
28
 
          
29
    service "mysql" created
30
 
          
31
--> Success
32
 
          
33
    Application is not exposed. You can expose services to the outside world by executing one or more of the commands below:
34
 
          
35
     'oc expose svc/mysql' 
36
 
          
37
    Run 'oc status' to view your app.



Shell
 




xxxxxxxxxx
1


 
1
[kkakarla@kkakarla upstream-plugins]$ oc set env dc/mysql MYSQL_ROOT_PASSWORD=debezium  MYSQL_USER=mysqluser MYSQL_PASSWORD=mysqlpw
2
deploymentconfig.apps.openshift.io/mysql updated
3
 
          
4
oc get pods -l app=mysql
5
NAME            READY   STATUS    RESTARTS   AGE
6
mysql-2-v2gbg   1/1     Running   0          64s
7
 
          



Shell
 




xxxxxxxxxx
1
44


 
1
oc exec mysql-2-v2gbg -it -- mysql -u mysqluser -p mysqlpw inventory
2
mysql -u root -p
3
 
          
4
mysql> show databases;
5
+--------------------+
6
| Database           |
7
+--------------------+
8
| information_schema |
9
| inventory          |
10
| mysql              |
11
| performance_schema |
12
| sys                |
13
+--------------------+
14
5 rows in set (0.00 sec)
15
 
          
16
mysql> use inventory
17
Reading table information for completion of table and column names
18
You can turn off this feature to get a quicker startup with -A
19
 
          
20
Database changed
21
mysql> show tables;
22
+---------------------+
23
| Tables_in_inventory |
24
+---------------------+
25
| addresses           |
26
| customers           |
27
| geom                |
28
| orders              |
29
| products            |
30
| products_on_hand    |
31
+---------------------+
32
6 rows in set (0.00 sec)
33
  
34
 mysql> select * from customers;
35
+------+------------+-----------+-----------------------+
36
| id   | first_name | last_name | email                 |
37
+------+------------+-----------+-----------------------+
38
| 1001 | Sally      | Thomas    | sally.thomas@acme.com |
39
| 1002 | George     | Bailey    | gbailey@foobar.com    |
40
| 1003 | Edward     | Walker    | ed@walker.com         |
41
| 1004 | Anne       | Kretchmar | annek@noanswer.org    |
42
+------+------------+-----------+-----------------------+
43
4 rows in set (0.00 sec) 
44
 
          



YAML
 




xxxxxxxxxx
1
19


 
1
apiVersion: kafka.strimzi.io/v1beta1
2
kind: KafkaConnector
3
metadata:
4
 name: inventory-connector  
5
 labels:
6
   strimzi.io/cluster: my-connect-cluster
7
spec:
8
 class: io.debezium.connector.mysql.MySqlConnector
9
 tasksMax: 1  
10
 config:  
11
   database.hostname: mysql  
12
   database.port: 3306
13
   database.user: debezium
14
   database.password: dbz
15
   database.server.id: 184054  
16
   database.server.name: dbserver1  
17
   database.whitelist: inventory  
18
   database.history.kafka.bootstrap.servers: my-cluster-kafka-bootstrap:9092 
19
   database.history.kafka.topic: schema-changes.inventory 


Inventory-connector.yaml 

Java
 




xxxxxxxxxx
1


1
oc logs $(oc get pods -o name -l strimzi.io/name=my-connect-cluster-connect)



Capture Data Change Events

Insert

Java
 




xxxxxxxxxx
1
11


 
1
INSERT INTO `inventory`.`customers`
2
(
3
    `first_name`,
4
    `last_name`,
5
    `email`)
6
VALUES
7
(
8
    'Ramu',
9
    'kakarla',
10
    'Ramu@gmail.com'
11
)
12
  
13
  "payload":{
14
      "before":null,
15
      "after":{
16
         "id":1005,
17
         "first_name":"Ramu",
18
         "last_name":"kakarla",
19
         "email":"Ramu@gmail.com"
20
      },
21
      "source":{
22
         "version":"1.1.2.Final",
23
         "connector":"mysql",
24
         "name":"dbserver1",
25
         "ts_ms":1591593372000,
26
         "snapshot":"false",
27
         "db":"inventory",
28
         "table":"customers",
29
         "server_id":223344,
30
         "gtid":null,
31
         "file":"mysql-bin.000003",
32
         "pos":364,
33
         "row":0,
34
         "thread":3,
35
         "query":null
36
      },
37
      "op":"c",
38
      "ts_ms":1591593372420,
39
      "transaction":null
40
   }
41
}


The before object is null while the after object shows the newly inserted value. Notice that the op attribute value is c, meaning it’s a CREATE event.

Update

Java
 




xxxxxxxxxx
1
38


 
1
mysql> UPDATE customers SET email='ramu@123@msn.com' WHERE id=1005;
2
Query OK, 1 row affected (0.02 sec)
3
Rows matched: 1  Changed: 1  Warnings: 0
4
  
5
 "payload":{
6
      "before":{
7
         "id":1005,
8
         "first_name":"Ramu",
9
         "last_name":"kakarla",
10
         "email":"Ramu@gmail.com"
11
      },
12
      "after":{
13
         "id":1005,
14
         "first_name":"Ramu",
15
         "last_name":"kakarla",
16
         "email":"ramu@123@msn.com"
17
      },
18
      "source":{
19
         "version":"1.1.2.Final",
20
         "connector":"mysql",
21
         "name":"dbserver1",
22
         "ts_ms":1591594059000,
23
         "snapshot":"false",
24
         "db":"inventory",
25
         "table":"customers",
26
         "server_id":223344,
27
         "gtid":null,
28
         "file":"mysql-bin.000003",
29
         "pos":673,
30
         "row":0,
31
         "thread":8,
32
         "query":null
33
      },
34
      "op":"u",
35
      "ts_ms":1591594059965,
36
      "transaction":null
37
   }
38
}  


The before object  shows the row state, while the after object shows the current state of the row. Notice that the op attribute value is u, meaning it’s a UPDATE event.

DELETE

Java
 




xxxxxxxxxx
1
32


1
mysql> DELETE FROM `inventory`.`customers` WHERE id = 1005;
2
Query OK, 1 row affected (0.01 sec)
3
  
4
   "payload":{
5
      "before":{
6
         "id":1005,
7
         "first_name":"Ramu",
8
         "last_name":"kakarla",
9
         "email":"ramu@123@msn.com"
10
      },
11
      "after":null,
12
      "source":{
13
         "version":"1.1.2.Final",
14
         "connector":"mysql",
15
         "name":"dbserver1",
16
         "ts_ms":1591594605000,
17
         "snapshot":"false",
18
         "db":"inventory",
19
         "table":"customers",
20
         "server_id":223344,
21
         "gtid":null,
22
         "file":"mysql-bin.000003",
23
         "pos":1018,
24
         "row":0,
25
         "thread":8,
26
         "query":null
27
      },
28
      "op":"d",
29
      "ts_ms":1591594605296,
30
      "transaction":null
31
   }
32
}


The before object  state is not null  while the after object state is null. Notice that the op attribute value is d, meaning it’s a DELETE event.

Enjoy!


 

 

 

 

Top