Installing and Configuring Replicator

You can install Confluent Replicator by downloading Confluent Platform from http://confluent.io/download/. Next, extract the contents of the archive. Then, make sure you have two clusters of Apache Kafka® brokers up and running. If you are not sure about that, you can refer to Tutorial: Replicating Data Between Clusters.

Replicator is a Kafka Connect Plugin. In order to run Replicator, you need to take the following steps:

  • Install and Configure Connect Cluster
  • Configure and run a Confluent Replicator on the Connect Cluster

This section walks you through both these steps in detail, and reviews the available configuration options for Replicator.

Install and Configure Kafka Connect Cluster for Replicator

Replicator runs as a plugin (Connector) in Connect, so you'll need to run Connect Workers before you can run Replicator. The quick start, shows how to run Replicator in Connect's stand-alone mode.

Stand-alone mode is recommended for proof-of-concepts, tests, and small deployments where the throughput from a single Replicator node is sufficient. For larger-scale production deployments, you'll want to run multiple Connect Workers in distributed mode.

Refer to Connect documentation to learn how to run Connect in distributed mode.

Keep in mind the following recommendations and best practices when configuring distributed Connect Workers for Replicator.

Configuring Origin and Destination Brokers

Connect clusters are associated with a cluster of Kafka brokers. The brokers of the Kafka cluster are specified in bootstrap.servers configuration parameter of the Connect Workers. If you are configuring a new Connect Worker cluster for running Replicator, make sure this parameter contains the destination Kafka brokers cluster. If you are planning to run Replicator on an existing Connect cluster, make sure it is already associated with the destination brokers.

Note

Replicator is responsible for reading events from the origin cluster. It then passes the events to the Connect Worker which is responsible for writing the events to the destination cluster. Therefore, you configure Replicator with information about the origin and the Worker with information about the destination.

Where to Install Connect Workers

If you are replicating events between different datacenters (rather than between two Kafka clusters in the same datacenter), we recommend running the Connect Workers in the destination datacenter. For example, if you are sending data from New York to San Francisco, Replicator should run in SF and consume data across the US from NYC. The reason for this is that long distance networks can be a bit less reliable than inside a datacenter. If there is a network partition and you lose connectivity between the datacenters, having a consumer that is unable to connect to a cluster is less disruptive than a producer that cannot connect. Remote consuming tends to be a better model than remote producing. That said, there is no inherent risk in running Replicator at the origin datacenter. Replicator will capture and forward all events, including in cases of connectivity loss.

Running Replicator on Existing Connect Cluster

You can run Replicator on the same Connect cluster as other connectors, but in some cases it is not recommended:

  • If you are replicating data between two datacenters that are far apart and thus have high latency, you’ll want to tune both the Connect Worker and Replicator appropriately. Intra-DC tuning is different from inter-DC tuning for this very reason. By giving Replicator its own Connect cluster, you can tune the Connect Workers specifically for Replicator without worrying about other connectors being affected.
  • Any changes to a connector will cause Replicator to pause while connectors are being re-assigned to Connect Workers. If you frequently start and stop connectors, you may want to run Replicator on its own cluster and allow it to run without interruptions.

Configuring Logging for Connect Cluster

Connect logging is configured in the file etc/kafka/connect-log4j.properties. By default, it writes to the console, but for production deployments you should log to a file. Before you start Replicator, add these lines to the connect-log4j.properties file:

log4j.appender.file=org.apache.log4j.RollingFileAppender
log4j.appender.file.File=logs/replicator.log
log4j.appender.file.layout=org.apache.log4j.PatternLayout
log4j.appender.file.layout.ConversionPattern=[%d] %p %m (%c:%L)%n
log4j.appender.file.MaxFileSize=10MB
log4j.appender.file.MaxBackupIndex=5
log4j.appender.file.append=true

Add the newly defined appender file that you created above to the log4j.rootLogger parameter:

# By default
log4j.rootLogger=INFO, stdout, file

License Key

Without the license key, you can use Replicator for a 30-day trial period. If you are a Confluent customer, you can contact customer support and ask for a Replicator license key. Then add confluent.license configuration to the Replicator configuration file (see below) followed by the key you recieved from Confluent support.

Configure and Run Replicator on the Connect Cluster

The quick start shows how to run Replicator in Connect's stand-alone mode. This section shows how to run Replicator on a distributed Connect cluster.

You should have at least one distributed mode Connect Worker already up and running. If you are not sure how to do that, review the distributed mode documentation.

You can check if the Connect Worker is up and running by checking its REST API:

$ curl http://localhost:8083/
{"version":"2.3.0-ccs","commit":"078e7dc02a100018"}

If everything is fine, you will see a version number and commit hash for the version of the Connect Worker you are running.

Run Replicator by sending the Connect REST API its configuration file in JSON format (note that this is different from stand-alone mode that uses Java's property file format). Here's an example configuration:

{
        "name":"replicator",
        "config":{
                "connector.class":"io.confluent.connect.replicator.ReplicatorSourceConnector",
                "tasks.max":4,
                "key.converter":"io.confluent.connect.replicator.util.ByteArrayConverter",
                "value.converter":"io.confluent.connect.replicator.util.ByteArrayConverter",
                "src.kafka.bootstrap.servers":"localhost:9082",
                "src.zookeeper.connect":"localhost:2171",
                "dest.zookeeper.connect":"localhost:2181",
                "topic.whitelist":"test-topic",
                "topic.rename.format":"${topic}.replica",
                "confluent.license":"eyJhbGciOiJSUzI1NiJ9.eyJpc3MiOiJDb25mbHVlbnQiLCJhdWQiOiJDMDAwMDAiLCJleHAiOjE0OTk0NzIwMDAsImp0aSI6ImpJNFpCM0RjNlNoUDJXejVHd04xY2ciLCJpYXQiOjE0OTkyNjk4OTEsIm5iZiI6MTQ5OTI2OTc3MSwic3ViIjoiY29udHJvbC1jZW50ZXIiLCJtb25pdG9yaW5nIjp0cnVlfQ.dnFkb9BS95Bv47HVlpI1OhSxrbK0nWOD0eRqPCcOgWrh5Pp6H-NQOlt5qtECgPfxMkV-Z5xAdf7l6p3-Ou3q7wWVmFAb8zkUrXnz_TmCkBN117fbUsZ0WZ1GAKxU1CsACsZ5rARSicGFJ54MuibwCCcHtAEOV5_Sv39t-cTRTw-cSE_NWpYyg77V7AAIirFVDMTZTFUg9RBCVEWu59UF1iYgkvlmN4qC0TdchfnTS4XQDuJlM_opYUEbZZoFxj8UY-dMyi136DFGaVF37LSaJguXCAm3KjCar8ipvyX5oLGmHhekw9b-xoEr-j4VTW_9QSfOHNDJ_ssGIISOJjBCPA"
        }
}

You can send this to Replicator using curl. This assumes the above JSON is in a file called example-replicator.json:

curl -X POST -d @example-replicator.json  http://localhost:8083/connectors --header "content-Type:application/json"

This example demonstrates the use of some important configuration parameters. You can read an explanation of all the configuration parameters in here.

  • key.converter and value.converter - Classes used to convert Kafka records to Connect's internal format. The Connect Worker configuration specifies global converters and those will be used if you don't specify anything in the Replicator configuration. For Replication, however, no conversion is necessary. You just want to read bytes out of the origin cluster and write them to the destination with no changes. So, you can override the global converters with the ByteArrayConverter which just leaves the records as is.

  • src.kafka.bootstrap.servers - A list of brokers from the origin cluster

  • src.zookeeper.connect and dest.zookeeper.connect - Connection strings for ZooKeeper in the origin and destination clusters respectively. These are used to replicate topic configuration from origin to destination.

  • topic.whitelist - An explicit list of the topics that you want replicated. This quick start replicates a topic named test-topic.

    Tip

    You can also tell Replicator which topics to replicate using a regular expression with the topic.regex parameter. You should use a regular expression if you want Replicator to automatically start replicating new topics if they match a certain pattern. For example, to replicate all production topics, including new ones, configure Replicator to replicate topics that match prod.*. Note that if add new topics to the list, you must bounce Replicator for the change to take effect.

  • topic.rename.format - A substitution string that is used to rename topics in the destination cluster. The snippet above uses ${topic}.replica, where ${topic} will be substituted with the topic name from the origin cluster. That means that the test-topic being replicated from the origin cluster will be renamed to test-topic.replica in the destination cluster.

  • confluent.license - Without the license, you can use Replicator for a 30-day trial period. If you are a Confluent customer, you can contact customer support and ask for a Replicator license. Then use it as shown in the example.

Suggested Reading