Important
You are viewing documentation for an older version of Confluent Platform. For the latest, click here.
Replicator Tutorial¶
In this section, we provide a tutorial for running Replicator which replicates data from two source Kafka clusters to a destination Kafka cluster. By the end of this tutorial, you will have successfully run Replicator and replicated data for two topics from different source clusters to a destination cluster. Furthermore, you will have also set up a Kafka Connect cluster because Replicator is built on Connect.
Note
In this tutorial, Kafka and ZooKeeper are configured to store data locally in the Docker containers. For production deployments (or generally whenever you care about not losing data), you should use mounted volumes for persisting data in the event that a container stops running or is restarted. This is important when running a system like Kafka on Docker, as it relies heavily on the filesystem for storing and caching messages. For an example of how to add mounted volumes to the host machine, see the documentation on Docker external volumes.
Installing and Running Docker¶
For this tutorial, Docker is run using the Docker client. If you are interested in information on using Docker Compose to run the images, skip to the bottom of this guide.
To get started, you’ll need to first install Docker and get it running. The Confluent Platform Docker Images require Docker version 1.11 or greater.
Docker Client: Setting Up a Three Node Kafka Cluster¶
If you’re running on Windows or Mac OS X, you’ll need to use Docker Machine to start the Docker host. Docker runs natively on Linux, so the Docker host will be your local machine if you go that route. If you are running on Mac or Windows, be sure to allocate at least 4 GB of ram to the Docker Machine.
Now that you have all of the Docker dependencies installed, you can create a Docker machine and begin starting up Confluent Platform.
Note
In the following steps, each Docker container is run in detached mode and you are shown how to access to the
logs for a running container. You can also run the containers in the foreground by replacing the -d
flags
with -it
.
Create and configure the Docker machine.
docker-machine create --driver virtualbox --virtualbox-memory 6000 confluent
Next, configure your terminal window to attach it to your new Docker Machine:
eval $(docker-machine env confluent)
Clone the git repository:
git clone https://github.com/confluentinc/cp-docker-images cd cp-docker-images
Start the services by using the example Docker Compose file. It will start up 2 source Kafka clusters, one destination Kafka cluster and a Kafka Connect cluster. Navigate to
cp-docker-images/examples/enterprise-replicator
, where it is located:Start the Kafka and Kafka Connect clusters using Docker Compose
create
andstart
commands.docker-compose create
You should see the following
Creating enterprisereplicator_kafka-1-src-b_1 Creating enterprisereplicator_kafka-1-src-a_1 Creating enterprisereplicator_kafka-2-dest_1 Creating enterprisereplicator_zookeeper-src-b_1 Creating enterprisereplicator_zookeeper-src-a_1 Creating enterprisereplicator_connect-host-1_1 Creating enterprisereplicator_kafka-2-src-a_1 Creating enterprisereplicator_kafka-2-src-b_1 Creating enterprisereplicator_kafka-1-dest_1 Creating enterprisereplicator_zookeeper-dest_1 Creating enterprisereplicator_connect-host-2_1
Start all the services
docker-compose start
You should see the following
Starting kafka-1-src-b ... done Starting kafka-1-src-a ... done Starting kafka-2-dest ... done Starting zookeeper-src-b ... done Starting zookeeper-src-a ... done Starting connect-host-1 ... done Starting kafka-2-src-a ... done Starting kafka-2-src-b ... done Starting kafka-1-dest ... done Starting zookeeper-dest ... done Starting connect-host-2 ... done
Before we move on, let’s make sure the services are up and running:
docker-compose ps
You should see the following:
Name Command State Ports ---------------------------------------------------------------------------------- enterprisereplicator_connect-host-1_1 /etc/confluent/docker/run Up enterprisereplicator_connect-host-2_1 /etc/confluent/docker/run Up enterprisereplicator_kafka-1-dest_1 /etc/confluent/docker/run Up enterprisereplicator_kafka-1-src-a_1 /etc/confluent/docker/run Up enterprisereplicator_kafka-1-src-b_1 /etc/confluent/docker/run Up enterprisereplicator_kafka-2-dest_1 /etc/confluent/docker/run Up enterprisereplicator_kafka-2-src-a_1 /etc/confluent/docker/run Up enterprisereplicator_kafka-2-src-b_1 /etc/confluent/docker/run Up enterprisereplicator_zookeeper-dest_1 /etc/confluent/docker/run Up enterprisereplicator_zookeeper-src-a_1 /etc/confluent/docker/run Up enterprisereplicator_zookeeper-src-b_1 /etc/confluent/docker/run Up
Now check the ZooKeeper logs for destination cluster to verify that ZooKeeper is healthy.
docker-compose logs zookeeper-dest | grep -i binding
You should see the following in your terminal window:
zookeeper-dest_1 | [2016-10-20 17:31:40,784] INFO binding to port 0.0.0.0/0.0.0.0:42181 (org.apache.zookeeper.server.NIOServerCnxnFactory)
Next, check the Kafka logs for the destination cluster to verify that it is healthy:
docker-compose logs kafka-1-dest | grep -i started
You should see message a message that looks like the following:
kafka-1-dest_1 | [2016-10-20 17:31:45,364] INFO [Socket Server on Broker 1002], Started 1 acceptor threads (kafka.network.SocketServer) kafka-1-dest_1 | [2016-10-20 17:31:45,792] INFO [Kafka Server 1002], started (kafka.server.KafkaServer) ....
Similarly verify that the
source-a
andsource-b
Kafka clusters are ready by running the following commands and verifying the output as described in the steps above.docker-compose logs zookeeper-src-a | grep -i binding docker-compose logs zookeeper-src-b | grep -i binding docker-compose logs kafka-1-src-a | grep -i started docker-compose logs kafka-1-src-b | grep -i started
Now, you can make sure that the Connect worker is up by running the following command to search the logs:
docker-compose logs connect-host-1 | grep started
You should see the following
connect-host-1_1 | [2016-10-20 17:31:48,942] INFO Kafka Connect started (org.apache.kafka.connect.runtime.Connect) connect-host-1_1 | [2016-10-20 17:31:50,403] INFO Worker started (org.apache.kafka.connect.runtime.Worker) connect-host-1_1 | [2016-10-20 17:31:50,988] INFO Herder started (org.apache.kafka.connect.runtime.distributed.DistributedHerder)
You will now create our first Kafka Connect Replicator connector for replicating topic “foo” from source cluster
source-a
.First, create a topic named
foo
.docker run \ --net=host \ --rm confluentinc/cp-kafka:4.1.5-SNAPSHOT \ kafka-topics --create --topic foo --partitions 3 --replication-factor 2 --if-not-exists --zookeeper localhost:22181
You should see the following output in your terminal window:
Created topic "foo".
Before moving on, verify that the topic was created successfully:
docker run \ --net=host \ --rm confluentinc/cp-kafka:4.1.5-SNAPSHOT \ kafka-topics --describe --topic foo --zookeeper localhost:22181
You should see the following output in your terminal window:
Topic:foo PartitionCount:3 ReplicationFactor:2 Configs: Topic: foo Partition: 0 Leader: 1002 Replicas: 1002,1001 Isr: 1002,1001 Topic: foo Partition: 1 Leader: 1001 Replicas: 1001,1002 Isr: 1001,1002 Topic: foo Partition: 2 Leader: 1002 Replicas: 1002,1001 Isr: 1002,1001
Next, we’ll try generating some data to our new topic:
docker run \ --net=host \ --rm \ confluentinc/cp-kafka:4.1.5-SNAPSHOT \ bash -c "seq 1000 | kafka-console-producer --request-required-acks 1 --broker-list localhost:9092 --topic foo && echo 'Produced 1000 messages.'"
This command will use the built-in Kafka Console Producer to produce 100 simple messages to the topic. Upon running it, you should see the following:
Produced 1000 messages.
Now create the connector using the Kafka Connect REST API. First, let’s exec into the Connect container.
docker-compose exec connect-host-1 bash
You should see a bash prompt now. We will call this the
docker exec
command prompt:root@confluent:/#
The next step is to create the Replicator connector. Run the following command on the
docker exec
command prompt.curl -X POST \ -H "Content-Type: application/json" \ --data '{ "name": "replicator-src-a-foo", "config": { "connector.class":"io.confluent.connect.replicator.ReplicatorSourceConnector", "key.converter": "io.confluent.connect.replicator.util.ByteArrayConverter", "value.converter": "io.confluent.connect.replicator.util.ByteArrayConverter", "src.zookeeper.connect": "localhost:22181", "src.kafka.bootstrap.servers": "localhost:9092", "dest.zookeeper.connect": "localhost:42181", "topic.whitelist": "foo", "topic.rename.format": "${topic}.replica"}}' \ http://localhost:28082/connectors
Upon running the command, you should see the following output in your
docker exec
command prompt:{"name":"replicator-src-a-foo","config":{"connector.class":"io.confluent.connect.replicator.ReplicatorSourceConnector","key.converter":"io.confluent.connect.replicator.util.ByteArrayConverter","value.converter":"io.confluent.connect.replicator.util.ByteArrayConverter","src.zookeeper.connect":"localhost:22181","src.kafka.bootstrap.servers":"localhost:9092","dest.zookeeper.connect":"localhost:42181","topic.whitelist":"foo","topic.rename.format":"${topic}.replica","name":"replicator-src-a-foo"},"tasks":[]}
Before moving on, let’s check the status of the connector using curl on the
docker exec
command prompt.curl -X GET http://localhost:28082/connectors/replicator-src-a-foo/status
You should see the following output including the
state
of the connector asRUNNING
:{"name":"replicator-src-a-foo","connector":{"state":"RUNNING","worker_id":"localhost:38082"},"tasks":[{"state":"RUNNING","id":0,"worker_id":"localhost:28082"}]}
Exit the
docker exec
command prompt by typingexit
on the prompt.exit
Now that the connector is up and running, it should replicate data from
foo
topic onsource-a
cluster tofoo.replica
topic on thedest
cluster.Read a sample of 1000 records from the
foo.replica
topic to check if the connector is replicating data to the destination Kafka cluster, as expected. Run the following command on your terminal (Make sure you have exited thedocker exec
command prompt):docker run \ --net=host \ --rm \ confluentinc/cp-kafka:4.1.5-SNAPSHOT \ kafka-console-consumer --bootstrap-server localhost:9072 --topic foo.replica --new-consumer --from-beginning --max-messages 1000
If everything is working as expected, each of the original messages we produced should be written back out:
1 .... 1000 Processed a total of 1000 messages
You will now verify that the destination topic is created with correct replication factor and partition count.
docker run \ --net=host \ --rm confluentinc/cp-kafka:4.1.5-SNAPSHOT \ kafka-topics --describe --topic foo.replica --zookeeper localhost:42181
You should see that the topic
foo.replica
is created with 3 partitions and 2 replicas, same as the original topicfoo
.Topic:foo.replica PartitionCount:3 ReplicationFactor:2 Configs:message.timestamp.type=CreateTime Topic: foo.replica Partition: 0 Leader: 1002 Replicas: 1002,1001 Isr: 1002,1001 Topic: foo.replica Partition: 1 Leader: 1001 Replicas: 1001,1002 Isr: 1001,1002 Topic: foo.replica Partition: 2 Leader: 1002 Replicas: 1002,1001 Isr: 1002,1001
Now, we will replicate another topic from a different source cluster.
First, create a new topic on the cluster
source-b
and add some data to it. Run the following commands to create and verify the topic. You should see output similar to steps 4 and 5 above:docker run \ --net=host \ --rm confluentinc/cp-kafka:4.1.5-SNAPSHOT \ kafka-topics --create --topic bar --partitions 3 --replication-factor 2 --if-not-exists --zookeeper localhost:32181
docker run \ --net=host \ --rm confluentinc/cp-kafka:4.1.5-SNAPSHOT \ kafka-topics --describe --topic bar --zookeeper localhost:32181
docker run \ --net=host \ --rm \ confluentinc/cp-kafka:4.1.5-SNAPSHOT \ bash -c "seq 1000 | kafka-console-producer --request-required-acks 1 --broker-list localhost:9082 --topic bar && echo 'Produced 1000 messages.'"
Now
exec
into the Kafka Connect container and run the replicator connector. Enter the following commands on your terminal. You should see output similar to step 6 above.Run the following to into the container to get
docker exec
command prompt.docker-compose exec connect-host-1 bash
Run the following command on the
docker exec
command prompt.curl -X POST \ -H "Content-Type: application/json" \ --data '{ "name": "replicator-src-b-bar", "config": { "connector.class":"io.confluent.connect.replicator.ReplicatorSourceConnector", "key.converter": "io.confluent.connect.replicator.util.ByteArrayConverter", "value.converter": "io.confluent.connect.replicator.util.ByteArrayConverter", "src.zookeeper.connect": "localhost:32181", "src.kafka.bootstrap.servers": "localhost:9082", "dest.zookeeper.connect": "localhost:42181", "topic.whitelist": "bar", "topic.rename.format": "${topic}.replica"}}' \ http://localhost:28082/connectors
curl -X GET http://localhost:28082/connectors/replicator-src-b-bar/status
Exit the
docker exec
command prompt by typingexit
on the prompt.exit
Now that the second replicator connector is up and running, it should replicate data from
bar
topic onsource-b
cluster tobar.replica
topic on thedest
cluster.Read data from
bar.replica
topic to check if the connector is replicating data properly followed by describing the topic to verify that the destination topic was created properly. You should see output similar to step 7 above. as expected.Run the following commands on your terminal (Make sure you have exited the
docker exec
command prompt):docker run \ --net=host \ --rm \ confluentinc/cp-kafka:4.1.5-SNAPSHOT \ kafka-console-consumer --bootstrap-server localhost:9072 --topic bar.replica --new-consumer --from-beginning --max-messages 1000
docker run \ --net=host \ --rm confluentinc/cp-kafka:4.1.5-SNAPSHOT \ kafka-topics --describe --topic bar.replica --zookeeper localhost:42181
Feel free to experiment with the replicator connector on your own now. When you are done, use the following commands to shutdown all the components.
docker-compose stop
If you want to remove all the containers, run:
docker-compose rm