Running Replicator as an Executable

You can use the Replicator Executable to run Confluent Replicator from the command line. The Replicator Executable bundles a distributed Kafka Connect Worker and a Replicator Connect in a single application.

Download and Install

Download and install the Replicator Executable from http://confluent.io/download/. Extract the contents of the archive or install Confluent Platform. After Confluent Platform is installed, find Replicator Executable as bin/replicator (ZIP and TAR) within your installation directory. Running bin/replicator without arguments will print on your terminal a list of all the available command line arguments.

Important

The Replicator Executable uses a version of the Replicator connector that matches the installed version of Confluent Platform. Although the Replicator connector is available from Confluent Hub, do not change or upgrade the Replicator connector after installation unless instructed by Confluent.

Create Consumer and Producer Configs

The Replicator Executable expects the properties that are required to establish a connection with the origin and destination clusters to be passed using files. Use --consumer.config to point to the file that contains the properties that will allow Replicator to connect to the origin cluster. For example, a file named consumer.properties with contents:

zookeeper.connect=localhost:2171
bootstrap.servers=localhost:9082

Next, place the producer properties for the destination cluster in a file and pass this file via the --producer.config argument. For example, point to a file called producer.properties containing:

zookeeper.connect=localhost:2181
bootstrap.servers=localhost:9092

Note

The ZooKeeper properties are optional.

The properties that are specified in here do not require prefixes specific to the origin, destination, producer, or consumer (bootstrap.servers as opposed to dest.kafka.bootstrap.servers) as required in the Replicator Connect configuration. This is something that Replicator Executable can infer from the command line parameter. The parameters --consumer.config and --producer.config are required in every call to bin/replicator.

To enable monitoring interceptors, you may include their properties in the same files, or in separate ones, that you will pass to Replicator Executable using the parameters --consumer.monitoring.config and --producer.monitoring.config respectively. These properties do not require a producer. or consumer. prefix. For example, you can use interceptor.classes as opposed to producer.interceptor.classes.

Configure Replication and Run

The other required parameter you need to pass to Replicator Executable is --cluster.id. This parameter defines a unique identifier for the cluster that is formed when several instances of Replicator Executable start with the same --cluster.id.

Note

All instances of the Replicator Executable with the same --cluster.id should be started with the exact same overall configuration.

You can now specify the configuration properties related for data replication. There are multiple ways to do this:

  • Store all the configuration in a file, and pass this file to Replicator Executable using the parameter --replication.config. For example:

    replicator \
     --consumer.config ./consumer.properties \
     --producer.config ./producer.properties \
     --cluster.id replicator \
     --replication.config ./replication.properties
    
  • Pass the replication properties from the command line using individual parameters, each corresponding to a property. For example, specify the original topics using --topic.whitelist. Confluent license can be passed using --confluent.license. For example:

    replicator \
    --consumer.config ./consumer.properties \
    --producer.config ./producer.properties \
    --cluster.id replicator \
    --topic.whitelist test-topic \
    --confluent.license "XYZ"
    
  • Use a mixture of some replication properties in a file and the rest using command line arguments. For example:

    replicator \
     --consumer.config ./consumer.properties \
     --producer.config ./producer.properties \
     --cluster.id replicator \
     --replication.config ./replication.properties \
     --topic.whitelist test-topic
    

Configure Logging

Replicator Executable reads logging settings from the file etc/kafka-connect-replicator/replicator-log4j.properties. By default, it writes to the console, but for production deployments you should log to a file. By default it just writes to the console, but for production deployments you will want to log to a file. Before you start Replicator Executable, add these lines to the replicator-log4j.properties file:

log4j.appender.file=org.apache.log4j.RollingFileAppender
log4j.appender.file.File=logs/replicator.log
log4j.appender.file.layout=org.apache.log4j.PatternLayout
log4j.appender.file.layout.ConversionPattern=[%d] %p %m (%c:%L)%n
log4j.appender.file.MaxFileSize=10MB
log4j.appender.file.MaxBackupIndex=5
log4j.appender.file.append=true

Add the newly defined appender file that you created above to the log4j.rootLogger parameter:

# By default
log4j.rootLogger=INFO, stdout, file

Run Replicator as a Connector

  1. Make sure that the Connect worker configuration matches the settings in the destination cluster:

    # Connect standalone worker configuration
    src.kafka.bootstrap.servers = localhost:9092
    
    The ``topic.whitelist`` setting is an explicit list of the topics you want
    replicated. In this tutorial, the ``test-topic`` is replicated. The
    ``topic.rename.format`` setting provides the capability to rename topics in the
    destination cluster. In the ``quickstart-replicator.properties``,
    ``${topic}.replica`` is used, where ``${topic}`` will be substituted with the
    topic name from the source cluster. That means that the ``test-topic`` we're
    replicating from the source cluster will be renamed to ``test-topic.replica`` in
    the destination cluster.
    
  2. Create a topic named test-topic in the source cluster with the following command:

    ./bin/kafka-topics --create --topic test-topic --replication-factor \
    1 --partitions 4 --zookeeper localhost:2171
    
  3. Update the quick start configuration and then run the connector in a standalone Kafka Connect worker:

    ./bin/connect-standalone ./etc/schema-registry/connect-avro-standalone.properties \
    ./etc/kafka-connect-replicator/quickstart-replicator.properties
    

    When the connector has finished initialization, it will check the source cluster for topics that need to be replicated. In this case, it will find test-topic and will try to create the corresponding topic in the destination cluster.

Test Your Replicator

  1. Create a test topic.

    If you haven't already, create a topic named test-topic in the source cluster with the following command:

      ./bin/kafka-topics --create --topic test-topic --replication-factor \
      1 --partitions 4 --zookeeper localhost:2171
    
      ./bin/kafka-topics --describe --topic test-topic.replica --zookeeper localhost:2181
    
    Note that the existence of ``test-topic.replica`` is being checked.
    After verifying the topic's existence, you should confirm that four
    partitions were created. In general, the replicator will ensure that
    the destination topic has at least as many partitions as the source
    topic. It is fine if it has more, but since the replicator preserves
    the partition assignment of the source data, any additional partitions
    will not be utilized.
    
  2. Send data to the source cluster.

    At any time after you've created the topic in the source cluster, you can begin sending data to it using a Kafka producer to write to test-topic in the source cluster. You can then confirm that the data has been replicated by consuming from test-topic.replica in the destination cluster.

    For example, to send a sequence of numbers using Kafka's console producer, you can use the following command:

    seq 10000 | ./bin/kafka-console-producer --topic test-topic --broker-list localhost:9082
    
  3. Run a consumer to confirm that the destination cluster got the data.

    You can then confirm delivery in the destination cluster using the console consumer:

    ./bin/kafka-console-consumer --from-beginning --topic test-topic.replica \
    --bootstrap-server localhost:9092
    

Run Replicator Executable from a Docker Image

The simplest way to run Replicator is as an executable from a script or from a Docker image. You can find the docker run commands and configuration parameters for Replicator in that documentation.

License Key

Without the license key, you can use Replicator Executable for a 30-day trial period. If you are a Confluent customer, you can contact customer support and ask for a Replicator license key. Then, use the key you received from Confluent support with --confluent.license command line parameter or by adding it to the confluent.license property within the replication configuration file you pass to --replication.config.

Command Line Parameters of Replicator Executable

The available commands line parameters are:

Command line parameter Value Description
--blacklist <Topic Blacklist> A comma-separated list of topics that should not be replicated, even if they are included in the whitelist or matched by the regular expression.
--cluster.id <Replicator Cluster Id> (required) Specifies the unique identifier for the Replicator cluster.
--cluster.threads <Total Replicator threads> The total number of threads across all workers in the Replicator cluster. If this command starts another Replicator worker in an existing cluster, this can be used to change the number of threads in the whole cluster.
--confluent.license <Confluent License Key> Your Confluent license key that enables you to use Replicator. Without the license key, you can use Replicator for a 30-day trial period. If you are a subscriber, please contact Confluent Support for more information.
--consumer.config <consumer.properties> (required) Specifies the location of the file that contains the configuration settings for the consumer reading from the origin cluster.
--consumer.monitoring.config <consumer-monitoring.properties> Specifies the location of the file that contains the producer settings for the Apache Kafka® cluster where monitoring information about the Replicator consumer is to be sent. This must be specified if monitoring is to be enabled, but may point to a different Kafka cluster than the origin or destination clusters. Use the same file as --producer-config to write metrics to the destination cluster.
-h, --help   Display help information
--producer.config <producer.properties> (required) Specifies the location of the file that contains the configuration settings for the producer writing to the destination cluster.
--producer.monitoring.config <producer-monitoring.properties> Specifies the location of the file that contains the producer settings for the Kafka cluster where monitoring information about the Replicator producer is to be sent. This must be specified if monitoring is to be enabled, but may point to a different Kafka cluster than the origin or destination clusters. Use the same file as --producer-config to write metrics to the destination cluster.
--replication.config <replication.properties> Specifies the location of the file that contains the configuration settings for replication. When used, any property in this file can be overridden via a command line parameter. When this is not supplied, all of the properties defining how topics are to be replicated should be specified on the command line.
--topic.auto.create   Whether to automatically create topics in the destination cluster if required.
--topic.config.sync   Whether to periodically sync topic configuration to the destination cluster.
--topic.config.sync.interval.ms <Topic Config Sync Interval(ms)> How often to check for configuration changes when 'topic.config.sync' is enabled.
--topic.create.backoff.ms <Topic Creation Backoff(ms)> Time to wait before retrying auto topic creation or expansion.
--topic.poll.interval.ms <Topic Config Sync Interval(ms)> Specifies how frequently to poll the source cluster for new topics
--topic.preserve.partitions   Whether to automatically increase the number of partitions in the destination cluster to match the source cluster and ensure that messages replicated from the source cluster use the same partition in the destination cluster.
--topic.regex <Regular Expression to Match Topics for Replication> A regular expression that matches the names of the topics to be replicated. Any topic that matches this expression (or is listed in the whitelist) and not in the blacklist will be replicated.
--topic.rename.format <Rename Format> A format string for the topic name in the destination cluster, which may contain ${topic} as a placeholder for the originating topic name. For example, ${topic}_dc1 for the topic 'orders' will map to the destination topic name 'orders_dc1.' Can be placed inside the file specified by --replication.config.
--topic.timestamp.type <Topic Timestamp Type> The timestamp type for the topics in the destination cluster.
--whitelist <Topic Whitelist> A comma-separated list of the names of topics that should be replicated. Any topic that is in this list and not in the blacklist will be replicated.