.. _replicator_quickstart: Tutorial: Replicating Data Between Clusters =========================================== .. This is all copy-paste from the Replicator repository to avoid refactoring that repository at the same time I'm building these docs. Once we figure out the structure here, we can decide what to do with existing Replicator docs (either refactor them and have the build script copy them into place, or simply delete them from Replicator repository completely) This guide describes how to start two |ak-tm| clusters and then a Replicator process to replicate data between them. Note that for tutorial purposes, we are running both clusters on the same machine. In order to do that, we jump through a hoop or two to make sure each cluster has its own ports and data directories. You will not need to perform these changes on the |zk| and Broker configuration if you are running in a normal environment where each cluster has its own servers. .. figure:: replicator-quickstart-configuration.png :align: center Replicator Quick Start Configuration Start the destination cluster ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~ First, startup a |zk| server. In this guide, we are assuming services will run on ``localhost``. .. codewithvars:: bash # Start ZooKeeper. Run this command in its own terminal. /bin/zookeeper-server-start /etc/kafka/zookeeper.properties .. include:: ../includes/installation-types-zip-tar.rst Next, startup a |ak| broker that will serve as our single node |ak| cluster. .. codewithvars:: bash # Start Kafka. Run this command in its own terminal. /bin/kafka-server-start /etc/kafka/server.properties For complete details on getting these services up and running see the :ref:`quick start ` instructions for |cp|. .. note:: The destination cluster should be running the same (or higher) version of Confluent Platform as the source cluster (the reason for this is that Replicator runs within a Connect Cluster linked to the destination cluster, and it reads messages from the source cluster -- so will not be able to interpret this message format if the destination is runnind an older version). Start the origin cluster ~~~~~~~~~~~~~~~~~~~~~~~~ While we configured the destination cluster to run on default ports, we will need to run the origin cluster on a different port to avoid collisions. The |ak| in the origin cluster is configured on port 9082, |zk| is configured on 2171. Copy the configuration files to a temporary location and modify them so they do not conflict with the destination cluster. .. codewithvars:: bash #Copy the config files to /tmp cp /etc/kafka/zookeeper.properties /tmp/zookeeper_origin.properties cp /etc/kafka/server.properties /tmp/server_origin.properties #Update the port numbers sed -i '' -e "s/2181/2171/g" /tmp/zookeeper_origin.properties sed -i '' -e "s/9092/9082/g" /tmp/server_origin.properties sed -i '' -e "s/2181/2171/g" /tmp/server_origin.properties sed -i '' -e "s/#listen/listen/g" /tmp/server_origin.properties #Update data directories sed -i '' -e "s/zookeeper/zookeeper_origin/g" /tmp/zookeeper_origin.properties sed -i '' -e "s/kafka-logs/kafka-logs-origin/g" /tmp/server_origin.properties From here, you can start up the origin cluster. .. codewithvars:: bash # Start ZooKeeper. Run this command in its own terminal. /bin/zookeeper-server-start /tmp/zookeeper_origin.properties # Start Kafka. Run this command in its own terminal. /bin/kafka-server-start /tmp/server_origin.properties Create a topic ~~~~~~~~~~~~~~~ Now, lets create a topic named "test-topic" in the origin cluster with the following command:: /bin/kafka-topics --create --topic test-topic --replication-factor 1 --partitions 1 --zookeeper localhost:2171 Once we configure and run Replicator, this topic will get replicated to the destination cluster with the exact configuration we defined above. Note that for the sake of this example, we created a topic with just one partition. Replicator will work with any number of topics and partitions. .. _config-and-run-replicator: Configure and run Replicator ~~~~~~~~~~~~~~~~~~~~~~~~~~~~ |crep-full| runs as a Connector in the |kconnect-long| framework. In the quick start guide we will start a stand-alone Connect Worker process that runs Replicator as a Connector. For complete details on Connect see :ref:`kafka_connect`. The script that runs the stand-alone Connect Worker takes two configuration files. The first is the configuration for the Connect Worker itself and the second is the configuration for the Replicator. .. note:: Replicator is responsible for reading events from the origin cluster. It then passes the events to the Connect Worker which is responsible for writing the events to the destination cluster. Therefore we configure Replicator with information about the origin and the Worker with information about the destination. We'll start by configuring the Connect Worker, and then configure Replicator. The Worker configuration file is ``/etc/kafka/connect-standalone.properties``, edit the file and make sure it contains the addresses of brokers from the **destination** cluster. The default broker list will match the destination cluster we started earlier:: # Connect Standalone Worker configuration bootstrap.servers=localhost:9092 Next, we will look at the Replicator configuration file, ``/etc/kafka-connect-replicator/quickstart-replicator.properties``:: name=replicator connector.class=io.confluent.connect.replicator.ReplicatorSourceConnector tasks.max=4 key.converter=io.confluent.connect.replicator.util.ByteArrayConverter value.converter=io.confluent.connect.replicator.util.ByteArrayConverter src.kafka.bootstrap.servers=localhost:9082 src.zookeeper.connect=localhost:2171 dest.zookeeper.connect=localhost:2181 topic.whitelist=test-topic topic.rename.format=${topic}.replica .. note:: Starting with version 4.1.0, configuring direct connections to |zk| through Replicator properties is now optional. If omitted, Replicator will contact |ak| for topic management, metadata refresh as well as for storing the Confluent license. The changes required in order to use the new functionality are minimal, and can be enabled selectively for the origin, the destination, both, or neither. Below you see how Replicator's properties change when Replicator is configured to use |ak| as its direct point of contact, both in the origin and the destination. To use Replicator this way, edit ``/etc/kafka-connect-replicator/quickstart-replicator.properties`` to reflect the alternative configuration as follows: :: name=replicator-sans-zk connector.class=io.confluent.connect.replicator.ReplicatorSourceConnector tasks.max=4 key.converter=io.confluent.connect.replicator.util.ByteArrayConverter value.converter=io.confluent.connect.replicator.util.ByteArrayConverter src.kafka.bootstrap.servers=localhost:9082 dest.kafka.bootstrap.servers=localhost:9092 # Store license, trial or regular, in Kafka instead of Zookeeper. # Default: confluent.topic=_command-topic # Default: confluent.topic.replication.factor=3 # replicator.factor may not be larger than the number of Kafka brokers in the destination cluster. # Here we set this to '1' for demonstration purposes. Always use at least '3' in production configurations. confluent.topic.replication.factor=1 topic.whitelist=test-topic topic.rename.format=${topic}.replica In both examples, a few of the configuration parameters are important to understand and we'll explain them here. You can read an explanation of all the configuration parameters in :ref:`here `. * ``key.converter`` and ``value.converter`` - Classes used to convert |ak| records to Connect's internal format. The Connect Worker configuration specifies global converters and the default is JsonConverter. For Replication, however, no conversion is necessary. We just want to read bytes out of the origin cluster and write them to the destination with no changes. So we override the global converters with the ByteArrayConverter which just leaves the records as is. * ``src.zookeeper.connect`` and ``dest.zookeeper.connect`` - Connection strings for |zk| in the origin and destination clusters respectively. These are used to replicate topic configuration from origin to destination. Since version 4.1.0 these properties are optional. * ``src.kafka.bootstrap.servers`` and ``dest.kafka.bootstrap.servers`` - A list of brokers in the origin and destination clusters respectively. Only the **origin** brokers are always required. The destination brokers should be set if ``dest.zookeeper.connect`` is unset. * ``topic.whitelist`` - An explicit list of the topics you want replicated. In this quick start, we will replicate the topic named "test-topic." * ``topic.rename.format`` - A substitution string that is used to rename topics in the destination cluster. In the snippet above, we have used ``${topic}.replica``, where ``${topic}`` will be substituted with the topic name from the origin cluster. That means that the ``test-topic`` we're replicating from the origin cluster will be renamed to ``test-topic.replica`` in the destination cluster. * ``confluent.topic`` and ``confluent.topic.replication.factor`` - If Replicator does not connect directly to |zk| in the destination, Replicator will use |ak| to store your Confluent license. By default, confluent topic name is ``_command-topic`` and its replication factor is ``3``. If using Replicator against a small destination |ak| cluster, such as in a development environment or proof of concept, you must set ``confluent.topic.replication.factor`` to a value that is no larger than the size of the cluster. For example, if the cluster has only one broker, you must set ``confluent.topic.replication.factor=1`` as in the example above. Once you update the quick start configuration, you can run the connector in a standalone |kconnect-long| Worker:: /bin/connect-standalone /etc/kafka/connect-standalone.properties \ /etc/kafka-connect-replicator/quickstart-replicator.properties When the connector has finished initialization, it will check the origin cluster for topics that need to be replicated. In this case, it will find ``test-topic`` and will try to create the corresponding topic in the destination cluster. You can check this with the following command:: /bin/kafka-topics --describe --topic test-topic.replica --zookeeper localhost:2181 Note that we're checking the existence of ``test-topic.replica`` since ``test-topic`` was renamed according to our configuration. After verifying the topic's existence, you should confirm that four partitions were created. In general, Replicator will ensure that the destination topic has as many partitions as the origin topic. If this topic has Avro records used with |sr|, and the destination uses Kafka Connect sink connectors or has KSQL applications, :ref:`register the existing schema ` used by ``test-topic`` for ``test-topic.replica``. At any time after you've created the topic in the origin cluster, you can begin sending data to it using a |ak| producer to write to ``test-topic`` in the origin cluster. You can then confirm that the data has been replicated by consuming from ``test-topic.replica`` in the destination cluster. For example, to send a sequence of numbers using |ak|'s console producer, you can use the following command:: seq 10000 | /bin/kafka-console-producer --topic test-topic --broker-list localhost:9082 You can then confirm delivery in the destination cluster using the console consumer:: /bin/kafka-console-consumer --from-beginning --topic test-topic.replica --bootstrap-server localhost:9092 If the numbers 1 to 10,000 appear in the consumer output, this indicates that you have successfully created multi-cluster replication.