Tutorial: Configure and Run Replicator for Confluent Platform as an Executable or Connector

Confluent Replicator is available as a part of Confluent Platform. This tutorial provides download and install instructions, along with step-by-step guides on how to run Replicator as an executable or as a plugin on a Connect cluster.

Download and install Replicator

To get started:

  1. Download Confluent Platform from https://confluent.io/download/.

  2. Extract the contents of the archive.

    For further install instructions, see Install and Upgrade Confluent Platform.

  3. Start two clusters of Apache Kafka® brokers.

    For more information, see Tutorial: Replicate Data Across Kafka Clusters in Confluent Platform.

Configure and Run Replicator

There are two ways to configure and run Replicator. Decide which method you want to use:

To learn more about configuration options to troubleshoot and improve performance, see Tune Replicator for Confluent Platform and Setting compression to improve performance with increased data loads.

Run Replicator as an executable

Replicator Executable is a quick and easy way to configure and start Replicator within a Connect cluster. This is recommended for deployments where a pre-existing Connect cluster cannot be used. After Confluent Platform is installed, find Replicator executable as bin/replicator (ZIP and TAR) within your installation directory. Running bin/replicator without arguments will print on your terminal a list of all the available command line arguments.

Important

The Replicator Executable uses a version of the Replicator connector that matches the installed version of Confluent Platform. Although the Replicator connector is available from Confluent Hub, do not change or upgrade the Replicator connector after installation unless instructed by Confluent.

Origin cluster configuration

Replicator Executable requires all configurations appropriate to the consumer from the origin cluster to be placed in a properties file. Use --consumer.config to point to the file that contains these properties. For example, this could be a file named consumer.properties with the following contents:

bootstrap.servers=localhost:9082

All Kafka consumer properties are valid in this file, for a full list of these see Kafka Consumer Configuration Reference for Confluent Platform.

Destination cluster configuration

Replicator Executable requires all configurations appropriate to the producer to the destination cluster to be placed in a properties file. Use --producer.config to point to the file that contains these properties. For example, this could be a file named producer.properties containing:

bootstrap.servers=localhost:9092

All Kafka producer properties are valid in this file, for a full list of these see Kafka Producer Configuration Reference for Confluent Platform.

Note

The property names defined in --consumer.config and --producer.config should not be prefixed and should match exactly those in the consumer/producer configurations referenced.

Replicator configuration

Replicator Executable allows any non-connection related Replicator properties to be overridden in a properties file referenced by --replication.config. Configuration for the following sections of Replicator properties can be placed here:

For example, this could be a file named replication.properties containing the following.

confluent.topic.bootstrap.servers=localhost:9092
offset.start=consumer

Note

The property group.id is a special case in Replicator Executable and should not be provided. For more information see Cluster ID and Group ID.

Connect cluster configurations

Replicator Executable allows any Connect related properties to be overridden in a properties file referenced by --replication.config. For a full list of Connect configurations see Kafka Connect Configuration Reference for Confluent Platform

For example, this could be a file named replication.properties containing:

offset.storage.topic=connect-offsets
status.storage.topic=connect-status
config.storage.topic=connect-configs

Important

Client configurations should not be included in the file referenced by --replication.config. The following prefixes are not allowed. Provide these without prefix in a separate configuration file:

Prefix Executable configuration
src.kafka. --consumer.config
src.consumer. --consumer.config
dest.kafka. --producer.config

Monitoring interceptor configurations

To enable monitoring interceptors, you may include their properties in the same files, or in separate ones, that you will pass to Replicator Executable using the parameters --consumer.monitoring.config and --producer.monitoring.config respectively. These properties do not require a producer. or consumer. prefix. For example, you can use interceptor.classes as opposed to producer.interceptor.classes. An example, configuration could be a file named interceptors.properties containing:

interceptor.classes=io.confluent.monitoring.clients.interceptor.MonitoringProducerInterceptor
confluent.monitoring.interceptor.bootstrap.servers=localhost:9092

Configure replication and run

Replicator Executable requires the parameter --cluster.id. This parameter defines a unique identifier for the Connect cluster created when the Replicator Executable is started. Replicator Executable instances with the same cluster.id form a cluster and share Replicator workload.

Note

  • All instances of the Replicator Executable with the same --cluster.id should be started with the exact same overall configuration.
  • For non-executable deployments (using Connect workers), the group.id of the Connect worker(s) is used as the unique ID for the cluster, and serves the same purpose as --cluster.id. To learn more about both parameters, see Cluster ID and Group ID.
  • The name of the replicator is the connector name submitted to the embedded Connect cluster. The replicator name defaults to “replicator” and is used as the consumer group.id if group.id is not specified. When running Replicator as an executable, use replicator name (specified or default) to set consumer group names.

You can now specify the configuration properties related for data replication. There are multiple ways to do this:

  • Store all the configuration in a file, and pass this file to Replicator Executable using the parameter --replication.config. For example:

    replicator \
     --consumer.config ./consumer.properties \
     --producer.config ./producer.properties \
     --cluster.id replicator \
     --replication.config ./replication.properties
    
  • Pass the replication properties from the command line using individual parameters, each corresponding to a property. For example, specify the original topics using --whitelist. Confluent license can be passed using --confluent.license. For example:

    replicator \
    --consumer.config ./consumer.properties \
    --producer.config ./producer.properties \
    --cluster.id replicator \
    --whitelist test-topic \
    --confluent.license "XYZ"
    
  • Use a mixture of some replication properties in a file and the rest using command line arguments. For example:

    replicator \
     --consumer.config ./consumer.properties \
     --producer.config ./producer.properties \
     --cluster.id replicator \
     --replication.config ./replication.properties \
     --whitelist test-topic
    

Configure logging

Replicator Executable reads logging settings from the file etc/kafka-connect-replicator/replicator-log4j.properties. By default, it writes to the console, but for production deployments you should log to a file. Before you start Replicator executable, add these lines to the replicator-log4j.properties file:

log4j.appender.file=org.apache.log4j.RollingFileAppender
log4j.appender.file.File=logs/replicator.log
log4j.appender.file.layout=org.apache.log4j.PatternLayout
log4j.appender.file.layout.ConversionPattern=[%d] %p %m (%c:%L)%n
log4j.appender.file.MaxFileSize=10MB
log4j.appender.file.MaxBackupIndex=5
log4j.appender.file.append=true

Add the appender file that you created above to the log4j.rootLogger parameter:

# By default
log4j.rootLogger=INFO, stdout, file

Troubleshooting

On Confluent Platform versions 6.2+, 7.0+, 7.1+, 7.2+, 7.3+, 7.4+, the Replicator servlet may not start when launched in standalone mode with the REST monitoring extension enabled, and will show java.lang.NoSuchMethodError: exceptions in the logs.

As a workaround, you can modify the bin/replicator script to the swap the order of CLASSPATH loading. For full information about how to implement this workaround, log on to the Support portal to view the article on How to bypass the java.lang.NoSuchMethodError using Replicator.

The following describes the workaround in a nutshell.

The Replicator startup script loads the Java classes in a specific order. On line 27 of bin/replicator you will see the class loader in a loop:

for library in "kafka-connect-replicator" "confluent-security/connect" "kafka" "confluent-common" \
"kafka-serde-tools" "monitoring-interceptors" ; do
dir="$java_base_dir/$library"
if [ -d "$dir" ]; then
classpath_prefix="$CLASSPATH:"
if [ "x$CLASSPATH" = "x" ]; then
classpath_prefix=""
fi
CLASSPATH="$classpath_prefix$dir/*"
fi
done

You can change the order of the class loader by modifying the bin/replicator script to the swap the order of the CLASSPATH loading. Load kafka-connect-replicator to the second place after security.

for library in "confluent-security/connect" "kafka-connect-replicator" "kafka" "confluent-common" \
"kafka-serde-tools" "monitoring-interceptors"; do
dir="$java_base_dir/$library"
if [ -d "$dir" ]; then
classpath_prefix="$CLASSPATH:"
if [ "x$CLASSPATH" = "x" ]; then
classpath_prefix=""
fi
CLASSPATH="$classpath_prefix$dir/*"
fi
done

Run Replicator as a connector

To run Replicator as a Connector in the recommended distributed Connect cluster see Manually configure and run Replicator on Connect clusters.

If deploying Confluent Platform on AWS VMs, be aware that VMs with burstable CPU types (T2, T3, T3a, and T4g) will not support high throughput streaming workloads. Replicator worker nodes running on these VMs experience throughput degradation due to credits expiring, making these VMs unsuitable for Confluent Platform nodes expected to run at elevated CPU levels for a sustained period of time, and supporting workloads that are above and beyond their baseline resource rates.

Test your Replicator

Following is a generic Replicator testing scenario. A similar testing strategy is covered with more context as a part of the Replicator tutorial in the section Configure and run Replicator.

  1. Create a test topic.

    If you haven’t already, create a topic named test-topic in the source cluster with the following command.

    ./bin/kafka-topics --create --topic test-topic --replication-factor \
    1 --partitions 4 --bootstrap-server localhost:9082
    
    ./bin/kafka-topics --describe --topic test-topic.replica --bootstrap-server localhost:9092
    

    The kafka-topics --describe --topic step in the above command checks whether test-topic.replica exits. After verifying that the topic exists, confirm that four partitions were created. In general, the Replicator makes sure that the destination topic has at least as many partitions as the source topic. It is fine if it has more, but because the Replicator preserves the partition assignment of the source data, any additional partitions will not be utilized.

  2. Send data to the source cluster.

    At any time after you’ve created the topic in the source cluster, you can begin sending data to it using a Kafka producer to write to test-topic in the source cluster. You can then confirm that the data has been replicated by consuming from test-topic.replica in the destination cluster.

    For example, to send a sequence of numbers using Kafka’s console producer, you can use the following command.

    seq 10000 | ./bin/kafka-console-producer --topic test-topic --broker-list localhost:9082
    
  3. Run a consumer to confirm that the destination cluster got the data.

    You can then confirm delivery in the destination cluster using the console consumer.

    ./bin/kafka-console-consumer --from-beginning --topic test-topic.replica \
    --bootstrap-server localhost:9092
    

Run Replicator executable from a Docker image

The simplest way to run Replicator is as an executable from a script or from a Docker image. You can find the docker run commands and configuration parameters for Replicator in that documentation.

Command line parameters of Replicator executable

The available commands line parameters are:

Command line parameter Value Description
--blacklist <Topic Blacklist> A comma-separated list of topics that should not be replicated, even if they are included in the whitelist or matched by the regular expression.
--cluster.id <Replicator Cluster Id> (required) Specifies the unique identifier for the Replicator cluster.
--cluster.threads <Total Replicator threads> The total number of threads across all workers in the Replicator cluster. If this command starts another Replicator worker in an existing cluster, this can be used to change the number of threads in the whole cluster.
--confluent.license <Confluent License Key> Your Confluent license key that enables you to use Replicator. Without the license key, you can use Replicator for a 30-day trial period. If you are a subscriber, contact Confluent Support for more information.
--consumer.config <consumer.properties> (required) Specifies the location of the file that contains the configuration settings for the consumer reading from the origin cluster.
--consumer.monitoring.config <consumer-monitoring.properties> Specifies the location of the file that contains the producer settings for the Kafka cluster where monitoring information about the Replicator consumer is to be sent. This must be specified if monitoring is to be enabled, but may point to a different Kafka cluster than the origin or destination clusters. Use the same file as –producer-config to write metrics to the destination cluster.
-h, --help   Display help information
--producer.config <producer.properties> (required) Specifies the location of the file that contains the configuration settings for the producer writing to the destination cluster.
--producer.monitoring.config <producer-monitoring.properties> Specifies the location of the file that contains the producer settings for the Kafka cluster where monitoring information about the Replicator producer is to be sent. This must be specified if monitoring is to be enabled, but may point to a different Kafka cluster than the origin or destination clusters. Use the same file as –producer-config to write metrics to the destination cluster.
--replication.config <replication.properties> Specifies the location of the file that contains the configuration settings for replication. When used, any property in this file can be overridden via a command line parameter. When this is not supplied, all of the properties defining how topics are to be replicated should be specified on the command line.
--topic.auto.create true/false Whether to automatically create topics in the destination cluster if required.
--topic.config.sync true/false Whether to periodically sync topic configuration to the destination cluster.
--topic.config.sync.interval.ms <Topic Config Sync Interval(ms)> How often to check for configuration changes when ‘topic.config.sync’ is enabled.
--topic.create.backoff.ms <Topic Creation Backoff(ms)> Time to wait before retrying auto topic creation or expansion.
--topic.poll.interval.ms <Topic Config Sync Interval(ms)> Specifies how frequently to poll the source cluster for new topics
--topic.preserve.partitions true/false Whether to automatically increase the number of partitions in the destination cluster to match the source cluster and ensure that messages replicated from the source cluster use the same partition in the destination cluster.
--topic.regex <Regular Expression to Match Topics for Replication> A regular expression that matches the names of the topics to be replicated. Any topic that matches this expression (or is listed in the whitelist) and not in the blacklist will be replicated.
--topic.rename.format <Rename Format> A format string for the topic name in the destination cluster, which may contain ${topic} as a placeholder for the originating topic name. For example, ${topic}_dc1 for the topic ‘orders’ will map to the destination topic name ‘orders_dc1.’ Can be placed inside the file specified by –replication.config.
--topic.timestamp.type <Topic Timestamp Type> The timestamp type for the topics in the destination cluster.
--whitelist <Topic Whitelist> A comma-separated list of the names of topics that should be replicated. Any topic that is in this list and not in the blacklist will be replicated.

Convert Replicator connector configurations to Replicator executable configurations

Replicator connect configuration can be converted to a Replicator executable configuration. One of the key differences between the two is that the Connect configuration has two configuration files (a worker properties file and a connector properties or JSON file) while Replicator executable has three configuration files (a consumer, a producer, and a replication properties file). It’s helpful to think about this in the following way:

  • The consumer configuration file contains all the properties you need to configure the consumer embedded within Replicator that consumes from the source cluster. This would include any special configurations you want to use to tune the source consumer, in addition to the necessary security and connection details needed for the consumer to connect to the source cluster.
  • The producer configuration file contains all the properties you need to configure the producer embedded within Replicator that produces to the destination cluster. This would include any special configurations you want to use to tune the destination producer, in addition to the necessary security and connection details needed for the producer to connect to the destination cluster.
  • The replication configuration file contains all the properties you need to configure the actual Replicator that does the work of taking the data from the source consumer and passing it to the destination producer. This would include all Connect-specific configurations needed for Replicator as well as any necessary Replicator configurations.

If you have the following worker properties:

config.storage.replication.factor=3
offset.storage.replication.factor=3
status.storage.replication.factor=3
connect.protocol=eager
connector.client.config.override.policy=All
bootstrap.servers=destination-cluster:9092
ssl.endpoint.identification.algorithm=https
sasl.mechanism=PLAIN
security.protocol=SASL_SSL
sasl.jaas.config=org.apache.kafka.common.security.plain.PlainLoginModule required username=\"destUser\" password=\"destPassword\";

And the following Replicator JSON:

{
  "connector.class":"io.confluent.connect.replicator.ReplicatorSourceConnector",
  "tasks.max":4,
  "topic.whitelist":"test-topic",
  "topic.rename.format":"${topic}.replica",
  "confluent.license":"XYZ"
  "name": "replicator",
  "header.converter": "io.confluent.connect.replicator.util.ByteArrayConverter",
  "key.converter": "io.confluent.connect.replicator.util.ByteArrayConverter",
  "value.converter": "io.confluent.connect.replicator.util.ByteArrayConverter",
  "src.consumer.max.poll.records":"10000",
  "producer.override.linger.ms":"10",
  "producer.override.compression.type":"lz4",
  "src.kafka.bootstrap.servers": "source-cluster:9092",
  "src.kafka.ssl.endpoint.identification.algorithm": "https",
  "src.kafka.security.protocol": "SASL_SSL",
  "src.kafka.sasl.mechanism": "PLAIN",
  "src.kafka.sasl.jaas.config": "org.apache.kafka.common.security.plain.PlainLoginModule required username=\"sourceUser\" password=\"sourcePassword\";",
  "dest.kafka.bootstrap.servers": "destination-cluster:9092",
  "dest.kafka.ssl.endpoint.identification.algorithm": "https",
  "dest.kafka.security.protocol": "SASL_SSL",
  "dest.kafka.sasl.mechanism": "PLAIN",
  "dest.kafka.sasl.jaas.config": "org.apache.kafka.common.security.plain.PlainLoginModule required username=\"destUser\" password=\"destPassword\";"
}

You can convert the configuration shown above in these two configuration files to the following Consumer, Producer, and Replication configurations needed to use Replicator executable:

Consumer Configurations:

bootstrap.servers=source-cluster:9092
ssl.endpoint.identification.algorithm=https
security.protocol=SASL_SSL
sasl.mechanism=PLAIN
sasl.jaas.config=org.apache.kafka.common.security.plain.PlainLoginModule required username=\"sourceUser\" password=\"sourcePassword\";
max.poll.records=10000

For the consumer configurations, strip the src.kafka and src.consumer prefixes and simply list the actual configuration you want for the source consumer. The Replicator executable will know that because this has been placed in the consumer configuration, it needs to apply these configurations to the source consumer that will poll the source cluster.

Producer Configurations:

bootstrap.servers=destination-cluster:9092
ssl.endpoint.identification.algorithm=https
security.protocol=SASL_SSL
sasl.mechanism=PLAIN
sasl.jaas.config=org.apache.kafka.common.security.plain.PlainLoginModule required username=\"destUser\" password=\"destPassword\";
linger.ms=10
compression.type=lz4

For the producer configurations, strip the dest.kafka and producer.overrides prefixes and simply list the actual configuration you want for the destination producer. The Replicator executable will know that because this has been placed in the producer configuration, it needs to apply these configurations to the destination producer that will write to the destination cluster.

Replication Configurations:

config.storage.replication.factor=3
offset.storage.replication.factor=3
status.storage.replication.factor=3
connect.protocol=eager
tasks.max=4
topic.whitelist=test-topic
topic.rename.format=${topic}.replica
confluent.license=XYZ
name=replicator
header.converter=io.confluent.connect.replicator.util.ByteArrayConverter
key.converter=io.confluent.connect.replicator.util.ByteArrayConverter
value.converter=io.confluent.connect.replicator.util.ByteArrayConverter

For the replication configurations, only include the configurations that are important for Replicator or Connect. It’s important to note that here you don’t need the connector.client.config.override.policy configuration anymore, as the Replicator executable directly passes in the producer configurations specified in the configuration file. This makes it easier to think about configuring the important consumers and producers for replication, rather than incorporating an extra Connect configuration.

Manually configure and run Replicator on Connect clusters

After downloading and installing Confluent Platform, per Download and install Replicator, bring up two clusters of Kafka brokers.

To learn more, refer to Tutorial: Replicate Data Across Kafka Clusters in Confluent Platform.

Replicator is a Kafka Connect Plugin. To run Replicator, you need to take the following steps:

  • Install and Configure the Connect Cluster
  • Configure and run a Confluent Replicator on the Connect Cluster

This section walks you through both these steps in detail, and reviews the available configuration options for Replicator.

Configure the Connect cluster for Replicator

Replicator runs as a plugin (Connector) in Connect, so you’ll need to run Connect Workers before you can run Replicator. The quick start, shows how to run Replicator in a single node Connect cluster.

Refer to Connect documentation to learn how to run Connect in :distributed mode.

Keep in mind the following recommendations and best practices when configuring distributed Connect Workers for Replicator.

Configuring origin and destination brokers

Connect clusters are associated with a cluster of Kafka brokers. The brokers of the Kafka cluster are specified in bootstrap.servers configuration parameter of the Connect Workers. If you are configuring a new Connect Worker cluster for running Replicator, make sure this parameter contains the destination Kafka brokers cluster. If you are planning to run Replicator on an existing Connect cluster, make sure it is already associated with the destination brokers.

Note

Replicator is responsible for reading events from the origin cluster. It then passes the events to the Connect Worker responsible for writing the events to the destination cluster. Therefore, you configure Replicator with information about the origin and the Worker with information about the destination.

Where to install Connect workers

If you are replicating events between different datacenters (rather than between two Kafka clusters in the same datacenter), best practice is to run the Connect Workers in the destination datacenter. For example, if you are sending data from New York to San Francisco, Replicator should run in SF and consume data across the US from NYC. The reason for this is that long distance networks can be a bit less reliable than inside a datacenter. If there is a network partition and you lose connectivity between the datacenters, having a consumer that is unable to connect to a cluster is less disruptive than a producer that cannot connect. Remote consuming tends to be a better model than remote producing. That said, there is no inherent risk in running Replicator at the origin datacenter. Replicator will capture and forward all events, including in cases of connectivity loss.

Running Replicator on existing Connect cluster

You can run Replicator on the same Connect cluster as other connectors, but in some cases it is not recommended:

  • If you are replicating data between two datacenters that are far apart and thus have high latency, you’ll want to tune both the Connect Worker and Replicator appropriately. Intra-DC tuning is different from inter-DC tuning for this very reason. By giving Replicator its own Connect cluster, you can tune the Connect Workers specifically for Replicator without worrying about other connectors being affected.
  • Any changes to a connector will cause Replicator to pause while connectors are being re-assigned to Connect Workers. If you frequently start and stop connectors, you may want to run Replicator on its own cluster and allow it to run without interruptions.
Configuring logging for Connect cluster

Connect logging is configured in the file etc/kafka/connect-log4j.properties. By default, it writes to the console, but for production deployments you should log to a file. Before you start Replicator, add these lines to the connect-log4j.properties file:

log4j.appender.file=org.apache.log4j.RollingFileAppender
log4j.appender.file.File=logs/replicator.log
log4j.appender.file.layout=org.apache.log4j.PatternLayout
log4j.appender.file.layout.ConversionPattern=[%d] %p %m (%c:%L)%n
log4j.appender.file.MaxFileSize=10MB
log4j.appender.file.MaxBackupIndex=5
log4j.appender.file.append=true

Add the appender file that you just created to the log4j.rootLogger parameter:

# By default
log4j.rootLogger=INFO, stdout, file

Configure and run Replicator on the Connect cluster

You should have at least one distributed mode Connect Worker already up and running. To learn more, review the distributed mode documentation .

You can check if the Connect Worker is up and running by checking its REST API:

curl http://localhost:8083/
{"version":"7.5.7-ccs","commit":"078e7dc02a100018"}

If everything is fine, you will see a version number and commit hash for the version of the Connect Worker you are running.

Run Replicator by sending the Connect REST API its configuration file in JSON format. Here’s an example configuration:

{
        "name":"replicator",
        "config":{
                "connector.class":"io.confluent.connect.replicator.ReplicatorSourceConnector",
                "tasks.max":4,
                "key.converter":"io.confluent.connect.replicator.util.ByteArrayConverter",
                "value.converter":"io.confluent.connect.replicator.util.ByteArrayConverter",
                "src.kafka.bootstrap.servers":"localhost:9082",
                "topic.whitelist":"test-topic",
                "topic.rename.format":"${topic}.replica",
                "confluent.license":"XYZ"
        }
}

You can send this to Replicator using curl. This assumes the above JSON is in a file called example-replicator.json:

curl -X POST -d @example-replicator.json  http://localhost:8083/connectors --header "content-Type:application/json"

This example demonstrates use of some important configuration parameters. For an explanation of all configuration parameters, see Replicator Configuration Reference for Confluent Platform.

  • key.converter and value.converter - Classes used to convert Kafka records to Connect’s internal format. The Connect Worker configuration specifies global converters and those will be used if you don’t specify anything in the Replicator configuration. For Replication, however, no conversion is necessary. You just want to read bytes out of the origin cluster and write them to the destination with no changes. Therefore, you can override the global converters with the ByteArrayConverter, which leaves the records as is.

  • src.kafka.bootstrap.servers - A list of brokers from the origin cluster

  • topic.whitelist - An explicit list of the topics that you want replicated. The quick start replicates a topic named test-topic.

    Tip

    You can also tell Replicator which topics to replicate using a regular expression with the topic.regex parameter. You should use a regular expression if you want Replicator to automatically start replicating new topics if they match a certain pattern. For example, to replicate all production topics, including new ones, configure Replicator to replicate topics that match prod.*. If you add new topics to the list, you must bounce Replicator for the change to take effect.

  • topic.rename.format - A substitution string that is used to rename topics in the destination cluster. The snippet above uses ${topic}.replica, where ${topic} will be substituted with the topic name from the origin cluster. That means that the test-topic being replicated from the origin cluster will be renamed to test-topic.replica in the destination cluster.

  • confluent.license - Without the license, you can use Replicator for a 30-day trial period. If you are a Confluent customer, you can contact customer support and ask for a Replicator license. Then use it as shown in the example.

Run Replicator on the source cluster

Replicator should be run on the destination cluster if possible. If this is not practical it is possible to run Replicator on the source cluster from Confluent Platform 5.4.0 onwards. Make the following changes to run Replicator in this way:

  • connector.client.config.override.policy to be set to All in the Connect worker configuration or in --replication.config if using Replicator Executable.
  • bootstrap.servers in the Connect worker configuration should point to the source cluster (for Replicator Executable specify this in --producer.config)
  • any client configurations (security etc.) for the source cluster should be provided in the Connect worker configuration (for Replicator Executable specify these in --producer.config)
  • producer.override.bootstrap.servers in the connector configuration should point to the destination cluster (for Replicator Executable specify this in --replication.config)
  • any client configurations (security etc.) for the destination cluster should be provided in the connector configuration with prefix producer.override. (for Replicator Executable specify these in --replication.config)
  • configurations with the prefix src.kafka. and dest.kafka should be provided as usual

An example configuration for Replicator running as a connector on the source cluster can be seen below:

{
  "connector.class": "io.confluent.connect.replicator.ReplicatorSourceConnector",
  "name": "replicator",
  "producer.override.ssl.endpoint.identification.algorithm": "https",
  "producer.override.sasl.mechanism": "PLAIN",
  "producer.override.request.timeout.ms": 20000,
  "producer.override.bootstrap.servers": "destination-cluster:9092",
  "producer.override.retry.backoff.ms": 500,
  "producer.override.sasl.jaas.config": "org.apache.kafka.common.security.plain.PlainLoginModule required username=\"someUser\" password=\"somePassword\";",
  "producer.override.security.protocol": "SASL_SSL",
  "key.converter": "io.confluent.connect.replicator.util.ByteArrayConverter",
  "value.converter": "io.confluent.connect.replicator.util.ByteArrayConverter",
  "topic.whitelist": "someTopic",
  "src.kafka.bootstrap.servers": "source-cluster:9092",
  "dest.kafka.bootstrap.servers": "destination-cluster:9092",
  "dest.kafka.ssl.endpoint.identification.algorithm": "https",
  "dest.kafka.security.protocol": "SASL_SSL",
  "dest.kafka.sasl.mechanism": "PLAIN",
  "dest.kafka.sasl.jaas.config": "org.apache.kafka.common.security.plain.PlainLoginModule required username=\"someUser\" password=\"somePassword\";"
}

In this configuration Replicator is producing between clusters rather than consuming and the default producer configurations are not optimal for this. Consider adjusting the following configurations to increase the throughput of the producer flow:

  • producer.override.linger.ms=500
  • producer.override.batch.size=600000

These values are provided as a starting point only and should be further tuned to your environment and use case.

For more detail on running Replicator on the source cluster when the destination is Confluent Cloud, see Confluent Replicator to Confluent Cloud Configurations.

License key

Without the license key, you can use Replicator Executable for a 30-day trial period. If you are a Confluent customer, you can contact customer support and ask for a Replicator license key. Then, use the key you received from Confluent support with --confluent.license command line parameter or by adding it to the confluent.license property within the replication configuration file you pass to --replication.config.

Important

From 5.5.0 onwards, Replicator will fail immediately after the license key expires, even if Replicator is running.