Kafka JDBC Source Connector for Large Data

Recently I had a task to migrate some data from an old monolith’s Oracle database to a microservice with a PostgreSQL database. The problem was that the data needed for migration had a parent table with around 2 million records with 150 columns and on top of that, everything was brought into view with a payload column aggregating data from various tables in XML. As you can imagine, the SELECT from that view was pretty slow, and by pretty, I mean insanely slow which was not going to work very well for the connector. So, in this article we’ll take a look at a similar simplified use case and how can we deal with it.

Use Case

We have a course-catalogue application with a PostgreSQL database that deals with instructors and their courses. Now we need to migrate some legacy instructors from another PostgreSQL database that soon is going to be decommissioned. So we have instructors-legacy-db and the course-catalog-db. In our case, both databases won’t be that overwhelmed with records, with just about 200 records for the instructors-legacy-db, but for the sake of the example, just imagine that instructors-legacy-db is that table with 2 million cumbersome records. 

Right, here is the docker-compose.yml

YAML
 
version: '3'
services:
  course-catalog-operational-db:
    image: postgres:13.3
    container_name: course-catalog-operational-db
    command:
      - "postgres"
      - "-c"
      - "wal_level=logical"
    environment:
      POSTGRES_PASSWORD: 123456
      POSTGRES_DB: course-catalog-db
    ports:
      - "5433:5432"
  instructors-legacy-db:
    image: postgres:13.3
    container_name: instructors-legacy-db
    command:
      - "postgres"
      - "-c"
      - "wal_level=logical"
    environment:
      POSTGRES_PASSWORD: 123456
      POSTGRES_DB: instructors-db
    ports:
      - "5434:5432"
    volumes:
      - ./init.sql:/docker-entrypoint-initdb.d/init.sql


Here is the course-catalog-db/instructors table:

SQL
 
create table instructors
(
    id          integer   not null
        primary key,
    created_at  timestamp not null,
    updated_at  timestamp not null,
    description varchar(3000),
    name        varchar(255),
    summary     varchar(3000)
);


And here is the instructors-legacy-db/instructors table:

SQL
 
create table instructors
(
    id         integer   not null
        primary key,
    created_at timestamp not null,
    updated_at timestamp not null,
    first_name       varchar(255),
    last_name       varchar(255),
    title varchar(255)
);


Also if you’ve noticed for the instructors-legacy-db container I’m using a init.sql script to create the table and do some inserts on the startup, so we’ll have some data to play around with. There is nothing special in that script, just 200 randomly generated inserts, here are some of them (the rest of them can be viewed on the repo) :

SQL
 
INSERT INTO public.instructors (id, created_at, updated_at, first_name, last_name, title) VALUES (0, '2022-08-28 21:00:19.315552', '2022-08-28 21:00:19.315552', 'Liam', 'Martinez', 'Mr.');
INSERT INTO public.instructors (id, created_at, updated_at, first_name, last_name, title) VALUES (1, '2022-08-28 21:00:19.315552', '2022-08-28 21:00:19.315552', 'Thomas', 'Williams', 'Mr.');
INSERT INTO public.instructors (id, created_at, updated_at, first_name, last_name, title) VALUES (2, '2022-08-28 21:00:19.315552', '2022-08-28 21:00:19.315552', 'Mateo', 'Martinez', 'Mr.');
INSERT INTO public.instructors (id, created_at, updated_at, first_name, last_name, title) VALUES (3, '2022-08-28 21:00:19.315552', '2022-08-28 21:00:19.315552', 'Ciro', 'Smith', 'Mr.');


And here is the view that we are going to address:

SQL
 
create view instructor_aggregate_vw(id, created_at, updated_at, name) as
SELECT instructors.id,
       instructors.created_at,
       instructors.updated_at,
       (((instructors.title::text || ' '::text) || instructors.first_name::text) || ' '::text) ||
       instructors.last_name::text AS name
FROM instructors;


Okay, not that we have everything in place, how do we get our data queried and published?

Kafka Connect in Action

That’s right, we are going to use io.confluent.connect.jdbc.JdbcSourceConnector for that.

The Kafka Connect JDBC Source connector allows you to import data from any relational database with a JDBC driver into an Apache Kafka® topic. This connector can support a wide variety of databases.

But first, we need to set up our Kafka Environment. For this, I am going to use the landoop/fast-data-dev’s docker image, since it comes with almost everything properly configured starting from the zookeeper, schema registry, kafka-connectand the broker and ending with a nice UI provided by Landoop for managing everything Kafka-related. Here is what we are going to add to our docker-compose.yml

YAML
 
  course-catalog-kafka-cluster:
    container_name: course-catalog-kafka-cluster
    image: landoop/fast-data-dev
    environment:
      ADV_HOST: 127.0.0.1         
      RUNTESTS: 0                 
    ports:
      - 2181:2181                 
      - 3030:3030                 
      - 8081-8083:8081-8083       
      - 9581-9585:9581-9585       
      - 9092:9092                 
    volumes:
      # Specify an absolute path mapping
      - C:\Users\ionpa\Projects\course-catalog\infra:/my-data


Okay, docker-compose up, and our entire environment is up. You can go ahead and check the UI on http://localhost:3030/ and explore it a bit. 

Now about the connector. Connectors can be added both through the UI on localhost here following a property-based or JSON-based configuration or by executing an HTTP request http://localhost:8083/connectors/ with the PUT method and a JSON body containing your connector’s config. Let’s take a look at the configuration that we are going to use for our connector:

Properties files
 
name=legacy.instructors.connector
topic.prefix=legacy-instructors
connector.class=io.confluent.connect.jdbc.JdbcSourceConnector
table.types=VIEW
connection.url=jdbc:postgresql://instructors-legacy-db:5432/instructors-db
connection.user=postgres
connection.password=123456
connection.attempts=100
connection.backoff.ms=300000
poll.interval.ms=100
transforms=AddNamespace,createKey,AddKeyNamespace
transforms.AddNamespace.type=org.apache.kafka.connect.transforms.SetSchemaMetadata$Value
transforms.AddNamespace.schema.name=inc.evil.coursecatalog.InstructorAggregate
transforms.AddKeyNamespace.type=org.apache.kafka.connect.transforms.SetSchemaMetadata$Key
transforms.AddKeyNamespace.schema.name=inc.evil.coursecatalog.Key
transforms.createKey.type=org.apache.kafka.connect.transforms.ValueToKey
transforms.createKey.fields=id
mode=timestamp+incrementing
timestamp.column.name=updated_at
timestamp.delay.interval.ms=3000
incrementing.column.name=id
numeric.mapping=best_fit
query=select * from(select * from instructor_aggregate_vw where "updated_at" < ? AND (("updated_at" = ? AND "id" > ?) OR "updated_at" > ?) order by "updated_at","id" ASC) limit 50 --


First of all, hats off to Confluent, for doing such a great job of documenting everything. You can check the meaning of each property here. But, anyway, I am going to give a short description of what we have here:

This should be it, if you create this connector, you won’t see much since our view is very light on the data that it provides, and the streaming should happen really fast, but if you’re to use a heavy view/table you’ll notice how data appears in your queue in batches of 50 messages.

Consuming

If we are to go even further to consume this data and bring it in course-catalog-db/instructors, it is a matter of defining a sink connector since we don’t do any transformation or use a simple consumer. In Spring Boot with Kotlin, a Kafka listener would look like this

Java
 
@KafkaListener(
    topics = ["\${kafka.consumer.legacyInstructorsTopic}"],
    containerFactory = KafkaConfiguration.LEGACY_INSTRUCTORS_CONTAINER_FACTORY
)
fun handle(
    @Header(KafkaHeaders.RECEIVED_MESSAGE_KEY) key: Key,
    @Payload instructorAggregate: InstructorAggregate
) {
    log.info("Received old instructor [${instructorAggregate}]")
    instructorService.upsert(instructorAggregate)
}


For the AVRO deserialization, I got the generated AVRO schemas from the schema registry here and saved the .avsc files. And with the help of this plugin id("com.github.davidmc24.gradle.plugin.avro")  configured like this

Groovy
 
tasks.withType<com.github.davidmc24.gradle.plugin.avro.GenerateAvroJavaTask> {
    source(file("${projectDir}\\src\\main\\resources\\avro"))
    setOutputDir(file("${projectDir}\\src\\main\\kotlin"))
}


I obtained the POJOs, used in the listener.

Conclusion

We’ve seen how a connector can be configured to work in batches and track both new and updated records in the most robust way. You can check all the mentioned codes and files here.

That’s all folks, I hope that you’ve learned something. Happy coding!

 

 

 

 

Top