Tutorial: Replicating Data Between Clusters

This guide describes how to start two Apache Kafka® clusters and then a Replicator process to replicate data between them. Note that for tutorial purposes, we are running both clusters on the same machine. In order to do that, we jump through a hoop or two to make sure each cluster has its own ports and data directories. You will not need to perform these changes on the ZooKeeper and Broker configuration if you are running in a normal environment where each cluster has its own servers.

../_images/replicator-quickstart-configuration.png

Replicator Quick Start Configuration

Start the destination cluster

First, startup a ZooKeeper server. In this guide, we are assuming services will run on localhost.

# Start ZooKeeper. Run this command in its own terminal.
  <path-to-confluent>/bin/zookeeper-server-start <path-to-confluent>/etc/kafka/zookeeper.properties

Tip

These instructions assume you are installing Confluent Platform by using ZIP or TAR archives. For more information, see On-Premises Deployments.

Next, startup a Kafka broker that will serve as our single node Kafka cluster.

# Start Kafka. Run this command in its own terminal.
  <path-to-confluent>/bin/kafka-server-start <path-to-confluent>/etc/kafka/server.properties

For complete details on getting these services up and running see the quick start instructions for Confluent Platform.

Note

The destination cluster should be running the same (or higher) version of Confluent Platform as the source cluster (the reason for this is that Replicator runs within a Connect Cluster linked to the destination cluster, and it reads messages from the source cluster – so will not be able to interpret this message format if the destination is runnind an older version).

Start the origin cluster

While we configured the destination cluster to run on default ports, we will need to run the origin cluster on a different port to avoid collisions. The Kafka in the origin cluster is configured on port 9082, ZooKeeper is configured on 2171. Copy the configuration files to a temporary location and modify them so they do not conflict with the destination cluster.

#Copy the config files to /tmp
cp <path-to-confluent>/etc/kafka/zookeeper.properties /tmp/zookeeper_origin.properties
cp <path-to-confluent>/etc/kafka/server.properties /tmp/server_origin.properties

#Update the port numbers
sed -i '' -e "s/2181/2171/g" /tmp/zookeeper_origin.properties
sed -i '' -e "s/9092/9082/g" /tmp/server_origin.properties
sed -i '' -e "s/2181/2171/g" /tmp/server_origin.properties
sed -i '' -e "s/#listen/listen/g" /tmp/server_origin.properties

#Update data directories
sed -i '' -e "s/zookeeper/zookeeper_origin/g" /tmp/zookeeper_origin.properties
sed -i '' -e "s/kafka-logs/kafka-logs-origin/g" /tmp/server_origin.properties

From here, you can start up the origin cluster.

# Start ZooKeeper. Run this command in its own terminal.
  <path-to-confluent>/bin/zookeeper-server-start /tmp/zookeeper_origin.properties

# Start Kafka. Run this command in its own terminal.
  <path-to-confluent>/bin/kafka-server-start /tmp/server_origin.properties

Create a topic

Now, lets create a topic named “test-topic” in the origin cluster with the following command:

<path-to-confluent>/bin/kafka-topics --create --topic test-topic --replication-factor 1 --partitions 1 --zookeeper localhost:2171

Once we configure and run Replicator, this topic will get replicated to the destination cluster with the exact configuration we defined above. Note that for the sake of this example, we created a topic with just one partition. Replicator will work with any number of topics and partitions.

Configure and run Replicator

Confluent Replicator can run as an executable or as a Connector in the Kafka Connect framework. In the quick start guide we will start Replicator as an executable.

The Replicator executable script expects three configuration files. Configuration for the origin cluster, configuration for the destination cluster and Replicator configurations.

Start by configuring the origin cluster in a new file named consumer.properties. Edit the file and make sure it contains the addresses of brokers from the origin cluster. The default broker list will match the origin cluster you started earlier:

# Origin cluster connection configuration
bootstrap.servers=localhost:9082

Next configure the destination cluster in a new file named producer.properties. Edit the file and make sure it contains the addresses of brokers from the destination cluster. The default broker list will match the destination cluster you started earlier:

# Destination cluster connection configuration
bootstrap.servers=localhost:9092

Finally, configure the Replicator configurations in a new file named replication.properties. This quickstart shows a configuration for topic.rename.format but any of the Configuration Properties that are not connection related can be supplied in this file:

# Replication configuration
topic.rename.format=${topic}.replica
replication.factor=1
config.storage.replication.factor=1
offset.storage.replication.factor=1
status.storage.replication.factor=1
confluent.topic.replication.factor=1

Note

The replication factor properties above are needed because our test clusters are small. The recommended minimum cluster size in production is 3 and this is the default for these properties.

Once you have created the necessary configuration files, start Replicator executable with the command below:

<path-to-confluent>/bin/replicator --cluster.id replicator --consumer.config consumer.properties --producer.config producer.properties --replication.config replication.properties --whitelist 'test-topic'

Some Replicator executable parameters can be passed on the command line:

  • --cluster.id - An identifier used to determine which Replicator cluster this executable should join. Multiple Replicator executable instances with the same cluster.id will work together.
  • --consumer.config - The path to the origin cluster configuration
  • --producer.config - The path to the destination cluster configuration
  • --replication.config - The path to a file containing any non connection specific configuration. Command line arguments will override these configurations.
  • --topic.whitelist - A list of topics to replicate from origin to destination

For a full list of command line options see Command Line Parameters of Replicator Executable

When Replicator has finished initialization, it will check the origin cluster for topics that need to be replicated. In this case, it will find test-topic and will try to create the corresponding topic in the destination cluster. You can check this with the following command:

<path-to-confluent>/bin/kafka-topics --describe --topic test-topic.replica --zookeeper localhost:2181

Note that we’re checking the existence of test-topic.replica since test-topic was renamed according to our configuration.

At any time after you’ve created the topic in the origin cluster, you can begin sending data to it using a Kafka producer to write to test-topic in the origin cluster. You can then confirm that the data has been replicated by consuming from test-topic.replica in the destination cluster. For example, to send a sequence of numbers using Kafka’s console producer, you can use the following command:

seq 10000 | <path-to-confluent>/bin/kafka-console-producer --topic test-topic --broker-list localhost:9082

You can then confirm delivery in the destination cluster using the console consumer:

<path-to-confluent>/bin/kafka-console-consumer --from-beginning --topic test-topic.replica --bootstrap-server localhost:9092

If the numbers 1 to 10,000 appear in the consumer output, this indicates that you have successfully created multi-cluster replication.