Mirror Maker v2.0

Before we start let's make some abbreviations.

Find the Project

Find all the stuff on this document over here. Here is what the repository contains.

Have questions or wanna know more? Follow me on twitter Rodrigo_Loza_L

Introduction 

The new release of mirror maker introduces a lot of new features usable in disaster recovery, backup, streaming and aggregation scenarios. Before Kafka mirror maker, the Kafka replicator product form confluence allowed some of the same features. For me, I built a lot of custom code to provide mmv1 the features of mmv2. Not as dynamic or scalable but still usable. 

This document presents a tutorial as detailed as possible of mmv2. Specifically, details the latest release on May 29th which is Kafka 2.5.0.

Active to active vs active to passive

Comparison Between mmv1 and mmv2

There are a lot of differences architecturereplication policies, monitoring, etc. Here is a summary.

Mirror Maker v1.0

mmv1

Mirror Maker v2.0

MirrorSourceConnector and workers

Examples

Let's move onto the examples. I suggest you read the catch-ups before we start, most of these are neither documented nor explicitly detailed. Thus, these might be useful to debug problems.

Catch-ups

Active/Passive Datacenter Replication

Architecture

Architecture

Configuration File

Create a file named mm2.properties and fill it with the following content.

Properties files
 




x



1
# Kafka datacenters.
2
clusters = source, target
3
source.bootstrap.servers = kafka-source:9092
4
target.bootstrap.servers = kafka-target:9092
5
 
          
6
# Source and target cluster configurations.
7
source.config.storage.replication.factor = 1
8
target.config.storage.replication.factor = 1
9
 
          
10
source.offset.storage.replication.factor = 1
11
target.offset.storage.replication.factor = 1
12
 
          
13
source.status.storage.replication.factor = 1
14
target.status.storage.replication.factor = 1
15
 
          
16
source->target.enabled = true
17
target->source.enabled = false
18
 
          
19
# Mirror maker configurations.
20
offset-syncs.topic.replication.factor = 1
21
heartbeats.topic.replication.factor = 1
22
checkpoints.topic.replication.factor = 1
23
 
          
24
topics = .*
25
groups = .*
26
 
          
27
tasks.max = 1
28
replication.factor = 1
29
refresh.topics.enabled = true
30
sync.topic.configs.enabled = true
31
refresh.topics.interval.seconds = 30
32
 
          
33
topics.blacklist = .*[\-\.]internal, .*\.replica, __consumer_offsets
34
groups.blacklist = console-consumer-.*, connect-.*, __.*
35
 
          
36
# Enable heartbeats and checkpoints.
37
source->target.emit.heartbeats.enabled = true
38
source->target.emit.checkpoints.enabled = true


What's important to highlight:

Once you have your infrastructure and configuration file setup, run the following commands on the Docker container used for mmv2:

Shell
 




xxxxxxxxxx
1


1
./run-kakfa-mirror-maker.sh
2
cd kafka/bin/
3
./connect-mirror-maker.sh mm2.properties


The logs are very verbose, it is hard to catch everything at first. Make sure you familiarize with Kafka connect and the initialization steps. If the following issue pops up, then verify you are not moving the jars manually at some point in your installation. Otherwise, upgrade your Kafka installation to release 2.5.0 which fixed this problem for me.

Once that mmv2 starts, a periodic log will be triggered that depicts the connectors are copying the topics, offsets and heartbeats. Check for warnings or errors as they may reflect a misconfiguration. 

Checking for warnings and errors

Let's make sure mmv2 has successfully connected the two Kafka nodes. For this let's list the topics on both Kafkas.

On the source cluster.

Shell
 




xxxxxxxxxx
1


1
./kafka-topics.sh --zookeeper zookeeper-source:2181 --list



Source cluster output

On the target cluster.

Shell
 




xxxxxxxxxx
1


1
./kafka-topics.sh --zookeeper zookeeper-target:2181 --list


Target cluster output

Let's check the topic replication. Create a bunch of topics normal, compacted, etc.

Shell
 




x







1
./kafka-topics.sh --zookeeper zookeeper-source:2181 --create --partitions 1 --replication-factor 1 --topic topic_1 --config cleanup.policy=delete
2
 
          
3
./kafka-topics.sh --zookeeper zookeeper-source:2181 --create --partitions 2 --replication-factor 1 --topic topic_2 --config cleanup.policy=delete
4
 
          
5
./kafka-topics.sh --zookeeper zookeeper-source:2181 --create --partitions 5 --replication-factor 1 --topic compact_3 --config cleanup.policy=delete
6
 
          
7
./kafka-topics.sh --zookeeper zookeeper-source:2181 --create --partitions 1 --replication-factor 1 --topic compacted_topic_1 --config cleanup.policy=compact
8
 
          
9
./kafka-topics.sh --zookeeper zookeeper-source:2181 --create --partitions 1 --replication-factor 1 --topic compacted_topic_2 --config cleanup.policy=compact


The topics on the source node look as follows.

Topics on source node

The topics on the target node look as follows.

Topics on target node

Note the topics have the alias of the cluster as a suffix. This is how mmv2 assures topics will not step into each other in an active/active replication architecture.

Let's describe the topics one by one in order to verify if the configurations have been replicated. Check the config on the compacted_topic_1 it does not have the cleanup.policy=compact Remember this is eventually replicated. If you are in a hurry, restart mmv2.

Restart mmv2

After 10 minutes the config is copied.

Copied config

Active/Active Datacenter Replication 

Architecture

Active to active replication

Configuration File

Create a file named mm2.properties and fill it with the following content.

Shell
 




x


 
1
# Kafka datacenters.
2
clusters = source, target
3
source.bootstrap.servers = kafka-source:9092
4
target.bootstrap.servers = kafka-target:9092
5
 
          
6
# Source and target clusters configurations.
7
source.config.storage.replication.factor = 1
8
target.config.storage.replication.factor = 1
9
 
          
10
source.offset.storage.replication.factor = 1
11
target.offset.storage.replication.factor = 1
12
 
          
13
source.status.storage.replication.factor = 1
14
target.status.storage.replication.factor = 1
15
 
          
16
source->target.enabled = true
17
target->source.enabled = true
18
 
          
19
# Mirror maker configurations.
20
offset-syncs.topic.replication.factor = 1
21
heartbeats.topic.replication.factor = 1
22
checkpoints.topic.replication.factor = 1
23
 
          
24
topics = .*
25
groups = .*
26
 
          
27
tasks.max = 1
28
replication.factor = 1
29
refresh.topics.enabled = true
30
sync.topic.configs.enabled = true
31
refresh.topics.interval.seconds = 10
32
 
          
33
topics.blacklist = .*[\-\.]internal, .*\.replica, __consumer_offsets
34
groups.blacklist = console-consumer-.*, connect-.*, __.*
35
 
          
36
# Enable heartbeats and checkpoints.
37
source->target.emit.heartbeats.enabled = true
38
source->target.emit.checkpoints.enabled = true


What's important to highlight:

Once you have your infrastructure and configuration file setup, run the following commands on the Docker container used for mmv2:

Shell
 




x


1
./run-kakfa-mirror-maker.sh
2
cd kafka/bin/
3
./connect-mirror-maker.sh /data/active-to-active-mm2.properties


Check the topics on the source cluster.

Source cluster topics

Check the topics on the target cluster.

Target cluster topics

Let's create some topics. Each on a different Kafka node. Note the zookeeper is different on each command.

Shell
 




xxxxxxxxxx
1


 
1
./kafka-topics.sh --zookeeper zookeeper-source:2181 --create --partitions 1 --replication-factor 1 --topic topic_1
2
./kafka-topics.sh --zookeeper zookeeper-target:2181 --create --partitions 1 --replication-factor 1 --topic topic_2


Let's list the topics.

On the source cluster. Check the local topics and one replicated from the target cluster.

Source cluster topics

On the target cluster. Check the local topics and one replicated from the source cluster.

local topics on target cluster

That is it for this post. Check the next one for more examples such as aggregation. Also I'll show you how to create your own replication policy using Java.

 

 

 

 

Top