.. _replicator_install: Installing and Configuring Replicator ======================================= You can install Replicator by downloading |cp| from https://confluent.io/download/. Next, extract the contents of the archive. Then, make sure you have two clusters of |ak-tm| brokers up and running. If you are not sure about that, you can refer to :ref:`replicator_quickstart`. |crep-full| is a |kconnect-long| Plugin. In order to run Replicator, you need to take the following steps: * Install and Configure |kconnect-long| Cluster * Configure and run a |crep-full| on the Connect Cluster In this section we will walk you through both these steps in detail, as well as review the available configuration options for Replicator. Install and Configure |kconnect-long| Cluster for Replicator ------------------------------------------------------------ Replicator runs as a plugin (Connector) in |kconnect-long|, so you'll need to run |kconnect-long| Workers before you can run Replicator. In the :ref:`quick start `, we've seen how to run Replicator in Connect's *stand-alone* mode. Stand-alone mode is recommended for POC and small deployments where the throughput from a single Replicator node is sufficient. For larger-scale production deployments, you'll want to run multiple Connect Workers in *distributed mode*. Refer to Connect documentation to learn how to run Connect in :ref:`distributed mode`. Few things to keep in mind when configuring distributed Connect Workers for Replicator: Configuring origin and destination brokers ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~ Connect clusters are associated with a cluster of |ak| brokers. The brokers of the |ak| cluster are specified in ``bootstrap.servers`` configuration parameter of the Connect Workers. If you are configuring a new Connect Worker cluster for running Replicator, make sure this parameter contains the **destination** |ak| brokers cluster. If you are planning on running Replicator on an existing Connect cluster, make sure it is already associated with the **destination** brokers. .. note:: Replicator is responsible for reading events from the origin cluster. It then passes the events to the Connect Worker which is responsible for writing the events to the destination cluster. Therefore we configure Replicator with information about the origin and the Worker with information about the destination. Where to Install Connect Workers ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~ If you are replicating events between different data centers (rather than between two |ak| clusters in the same data center), we recommend running the Connect Workers in the ***destination*** data center. So if you are sending data from NYC to SF, Replicator should run in SF and consume data across the US from NYC. The reason for this is that long distance networks can be a bit less reliable than inside a datacenter. If there is a network partition and you lose connectivity between the datacenters, having a consumer that is unable to connect to a cluster is much safer than a producer that can't connect. If the consumer can't connect, it simply won't be able to read events - but the events are still stored in the origin |ak| cluster and can remain there for a long time. There is no risk to losing events. On the other hand, if the events were already consumed and Replicator can't produce them due to network partition, there is always a risk that these events will accidentally get lost. So remote consuming is safer than remote producing. Running Replicator on Existing Connect Cluster ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~ It is ok to run Replicator on the same Connect cluster as other connectors, but in some cases it is not recommended: * If you are replicating data between two data centers that are far apart and thus have high latency, you’ll want to tune both the Connect Worker and Replicator appropriately. Intra-DC tuning is different from inter-DC tuning for this very reason. By giving Replicator its own connect cluster, you can tune the Connect Workers specifically for Replicator without worrying about other connectors being affected. * Any changes to a connector will cause Replicator to pause while connectors are being re-assigned to Connect Workers. If you frequently start and stop connectors, you may want to run Replicator on its own cluster and allow it to run without interruptions. Configuring Logging for Connect Cluster ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~ |kconnect-long| logging is configured in the file ``etc/kafka/connect-log4j.properties``. By default, it writes to the console, but for production deployments you should log to a file. Before you start Replicator, add these lines to the ``connect-log4j.properties`` file:: log4j.appender.file=org.apache.log4j.RollingFileAppender log4j.appender.file.File=logs/replicator.log log4j.appender.file.layout=org.apache.log4j.PatternLayout log4j.appender.file.layout.ConversionPattern=[%d] %p %m (%c:%L)%n log4j.appender.file.MaxFileSize=10MB log4j.appender.file.MaxBackupIndex=5 log4j.appender.file.append=true Add the newly defined appender file that you created above to the ``log4j.rootLogger`` parameter:: # By default log4j.rootLogger=INFO, stdout, file License Key ~~~~~~~~~~~~~~ Without the license key, you can use Replicator for a 30-day trial period. If you are a Confluent customer, you can contact customer support and ask for a Replicator license key. Then add ``confluent.license`` configuration to the Replicator configuration file (see below) followed by the key you recieved from Confluent support. Configure and run a |crep-full| on the Connect Cluster ----------------------------------------------------------------- In the :ref:`quick start `, we've seen how to run Replicator in Connect's stand-alone mode. We'll now see how to run Replicator on a distributed Connect cluster. We'll assume that at least one distributed mode Connect Worker is already up and running. If you are not sure how to do that, review the :ref:`distributed mode documentation`. You can check if the Connect Worker is up and running by checking its REST API: .. codewithvars:: bash $ curl http://localhost:8083/ {"version":"|kafka_release|","commit":"078e7dc02a100018"} If everything is fine, you will see a version number and commit hash for the version of the Connect Worker you are running. We'll run Replicator by sending the Connect REST API its configuration file in JSON format (note that this is different from stand-alone mode that uses Java's property file format). Here's an example configuration:: { "name":"replicator", "config":{ "connector.class":"io.confluent.connect.replicator.ReplicatorSourceConnector", "tasks.max":4, "key.converter":"io.confluent.connect.replicator.util.ByteArrayConverter", "value.converter":"io.confluent.connect.replicator.util.ByteArrayConverter", "src.kafka.bootstrap.servers":"localhost:9082", "src.zookeeper.connect":"localhost:2171", "dest.zookeeper.connect":"localhost:2181", "topic.whitelist":"test-topic", "topic.rename.format":"${topic}.replica", "confluent.license":"XYZ" } } You can send this to Replicator using ``curl``. This assumes the above JSON is in a file called ``example-replicator.json``:: curl -X POST -d @example-replicator.json http://localhost:8083/connectors --header "content-Type:application/json" This example demonstrates the use of some important configuration parameters. You can read an explanation of all the configuration parameters in :ref:`here `. * ``key.converter`` and ``value.converter`` - Classes used to convert |ak| records to Connect's internal format. The Connect Worker configuration specifies global converters and those will be used if you don't specify anything in the Replicator configuration. For Replication, however, no conversion is necessary. We just want to read bytes out of the origin cluster and write them to the destination with no changes. So we override the global converters with the ByteArrayConverter which just leaves the records as is. * ``src.kafka.bootstrap.servers`` - A list of brokers from the **origin** cluster * ``src.zookeeper.connect`` and ``dest.zookeeper.connect`` - Connection strings for |zk| in the origin and destination clusters respectively. These are used to replicate topic configuration from origin to destination. * ``topic.whitelist`` - An explicit list of the topics that you want replicated. This quick start replicates a topic named ``test-topic``. .. tip:: You can also tell Replicator which topics to replicate using a regular expression with the :ref:`topic.regex parameter `. You should use a regular expression if you want Replicator to automatically start replicating new topics if they match a certain pattern. For example, to replicate all production topics, including new ones, configure Replicator to replicate topics that match ``prod.*``. Note that if add new topics to the list, you must bounce Replicator for the change to take effect. * ``topic.rename.format`` - A substitution string that is used to rename topics in the destination cluster. In the snippet above, we have used ``${topic}.replica``, where ``${topic}`` will be substituted with the topic name from the origin cluster. That means that the ``test-topic`` we're replicating from the origin cluster will be renamed to ``test-topic.replica`` in the destination cluster. * ``confluent.license`` - Without the license, you can use Replicator for a 30-day trial period. If you are a Confluent customer, you can contact customer support and ask for a Replicator license. Then use it as we show in the example.