Tutorial: Configure and Run Replicator for Confluent Platform as an Executable or Connector¶
Confluent Replicator is available as a part of Confluent Platform. This tutorial provides download and install instructions, along with step-by-step guides on how to run Replicator as an executable or as a plugin on a Connect cluster.
Download and install Replicator¶
To get started:
Download Confluent Platform from https://confluent.io/download/.
Extract the contents of the archive.
For further install instructions, see Install and Upgrade Confluent Platform.
Start two clusters of Apache Kafka® brokers.
For more information, see Tutorial: Replicate Data Across Kafka Clusters in Confluent Platform.
Configure and Run Replicator¶
There are two ways to configure and run Replicator. Decide which method you want to use:
- Run Replicator as an executable - The Replicator executable bundles a distributed Connect Worker and a Replicator runtime in a single application. This is a quick method to get started. You configure Replicator with command line flags at runtime. (You can also run Run Replicator executable from a Docker image.)
- Manually configure and run Replicator on Connect clusters - Run Replicator as a Connect worker in stand-alone mode or multiple Connect workers in distributed mode. You configure the replication details in properties files on the workers.
To learn more about configuration options to troubleshoot and improve performance, see Tune Replicator for Confluent Platform and Setting compression to improve performance with increased data loads.
Run Replicator as an executable¶
Replicator Executable is a quick and easy way to configure and start Replicator within a
Connect cluster. This is recommended for deployments where a pre-existing
Connect cluster cannot be used. After Confluent Platform is installed, find Replicator
executable as bin/replicator
(ZIP and TAR) within your installation
directory. Running bin/replicator
without arguments will print on your
terminal a list of all the available command line arguments.
Important
The Replicator Executable uses a version of the Replicator connector that matches the installed version of Confluent Platform. Although the Replicator connector is available from Confluent Hub, do not change or upgrade the Replicator connector after installation unless instructed by Confluent.
Origin cluster configuration¶
Replicator Executable requires all configurations appropriate to the consumer from the origin cluster to be placed in a
properties file. Use --consumer.config
to point to the file that contains these properties. For example, this
could be a file named consumer.properties
with the following contents:
bootstrap.servers=localhost:9082
All Kafka consumer properties are valid in this file, for a full list of these see Kafka Consumer Configuration Reference for Confluent Platform.
Destination cluster configuration¶
Replicator Executable requires all configurations appropriate to the producer to the destination cluster to be placed in a
properties file. Use --producer.config
to point to the file that contains these properties. For example, this
could be a file named producer.properties
containing:
bootstrap.servers=localhost:9092
All Kafka producer properties are valid in this file, for a full list of these see Kafka Producer Configuration Reference for Confluent Platform.
Note
The property names defined in --consumer.config
and --producer.config
should not be prefixed and should match exactly those in the consumer/producer configurations referenced.
Replicator configuration¶
Replicator Executable allows any non-connection related Replicator properties to be overridden in a properties file referenced
by --replication.config
. Configuration for the following sections of Replicator properties can be placed here:
For example, this could be a file named replication.properties
containing the following.
confluent.topic.bootstrap.servers=localhost:9092
offset.start=consumer
Note
The property group.id
is a special case in Replicator Executable and should not be provided. For more information see Cluster ID and Group ID.
Connect cluster configurations¶
Replicator Executable allows any Connect related properties to be overridden in a properties file referenced
by --replication.config
. For a full list of Connect configurations see Kafka Connect Configuration Reference for Confluent Platform
For example, this could be a file named replication.properties
containing:
offset.storage.topic=connect-offsets
status.storage.topic=connect-status
config.storage.topic=connect-configs
Important
Client configurations should not be included in the file referenced by --replication.config
. The following prefixes are not allowed. Provide these without prefix in a separate configuration file:
Prefix | Executable configuration |
---|---|
src.kafka. |
--consumer.config |
src.consumer. |
--consumer.config |
dest.kafka. |
--producer.config |
Monitoring interceptor configurations¶
To enable monitoring interceptors, you may include their properties in the same files, or in separate ones, that you
will pass to Replicator Executable using the parameters --consumer.monitoring.config
and
--producer.monitoring.config
respectively. These properties do not require a producer.
or consumer.
prefix.
For example, you can use interceptor.classes
as opposed to producer.interceptor.classes
. An example,
configuration could be a file named interceptors.properties
containing:
interceptor.classes=io.confluent.monitoring.clients.interceptor.MonitoringProducerInterceptor
confluent.monitoring.interceptor.bootstrap.servers=localhost:9092
Configure replication and run¶
Replicator Executable requires the parameter --cluster.id
. This parameter defines a unique identifier for the Connect
cluster created when the Replicator Executable is started. Replicator Executable instances with the same cluster.id
form a
cluster and share Replicator workload.
Note
- All instances of the Replicator Executable with the same
--cluster.id
should be started with the exact same overall configuration. - For non-executable deployments (using Connect workers), the
group.id
of the Connect worker(s) is used as the unique ID for the cluster, and serves the same purpose as--cluster.id
. To learn more about both parameters, see Cluster ID and Group ID. - The
name
of the replicator is the connector name submitted to the embedded Connect cluster. The replicatorname
defaults to “replicator” and is used as the consumergroup.id
ifgroup.id
is not specified. When running Replicator as an executable, use replicatorname
(specified or default) to set consumer group names.
You can now specify the configuration properties related for data replication. There are multiple ways to do this:
Store all the configuration in a file, and pass this file to Replicator Executable using the parameter
--replication.config
. For example:replicator \ --consumer.config ./consumer.properties \ --producer.config ./producer.properties \ --cluster.id replicator \ --replication.config ./replication.properties
Pass the replication properties from the command line using individual parameters, each corresponding to a property. For example, specify the original topics using
--whitelist
. Confluent license can be passed using--confluent.license
. For example:replicator \ --consumer.config ./consumer.properties \ --producer.config ./producer.properties \ --cluster.id replicator \ --whitelist test-topic \ --confluent.license "XYZ"
Use a mixture of some replication properties in a file and the rest using command line arguments. For example:
replicator \ --consumer.config ./consumer.properties \ --producer.config ./producer.properties \ --cluster.id replicator \ --replication.config ./replication.properties \ --whitelist test-topic
Configure logging¶
Replicator Executable reads logging settings from the file etc/kafka-connect-replicator/replicator-log4j.properties
.
By default, it writes to the console, but for production deployments you should log to a file. Before you start Replicator
executable, add these lines to the replicator-log4j.properties
file:
log4j.appender.file=org.apache.log4j.RollingFileAppender
log4j.appender.file.File=logs/replicator.log
log4j.appender.file.layout=org.apache.log4j.PatternLayout
log4j.appender.file.layout.ConversionPattern=[%d] %p %m (%c:%L)%n
log4j.appender.file.MaxFileSize=10MB
log4j.appender.file.MaxBackupIndex=5
log4j.appender.file.append=true
Add the appender file that you created above to the log4j.rootLogger
parameter:
# By default
log4j.rootLogger=INFO, stdout, file
Troubleshooting¶
On Confluent Platform versions 6.2+, 7.0+, 7.1+, 7.2+, 7.3+, 7.4+, the Replicator servlet may not start
when launched in standalone mode with the REST monitoring extension enabled, and will show
java.lang.NoSuchMethodError:
exceptions in the logs.
As a workaround, you can modify the bin/replicator
script to the swap the order of CLASSPATH loading.
For full information about how to implement this workaround, log on to the Support portal to view the
article on How to bypass the java.lang.NoSuchMethodError using Replicator.
The following describes the workaround in a nutshell.
The Replicator startup script loads the Java classes in a specific order.
On line 27 of bin/replicator
you will see the class loader in a loop:
for library in "kafka-connect-replicator" "confluent-security/connect" "kafka" "confluent-common" \
"kafka-serde-tools" "monitoring-interceptors" ; do
dir="$java_base_dir/$library"
if [ -d "$dir" ]; then
classpath_prefix="$CLASSPATH:"
if [ "x$CLASSPATH" = "x" ]; then
classpath_prefix=""
fi
CLASSPATH="$classpath_prefix$dir/*"
fi
done
You can change the order of the class loader by modifying the bin/replicator
script to the swap the order
of the CLASSPATH loading. Load kafka-connect-replicator
to the second place after security.
for library in "confluent-security/connect" "kafka-connect-replicator" "kafka" "confluent-common" \
"kafka-serde-tools" "monitoring-interceptors"; do
dir="$java_base_dir/$library"
if [ -d "$dir" ]; then
classpath_prefix="$CLASSPATH:"
if [ "x$CLASSPATH" = "x" ]; then
classpath_prefix=""
fi
CLASSPATH="$classpath_prefix$dir/*"
fi
done
Run Replicator executable from a Docker image¶
The simplest way to run Replicator is as an executable from a script or from a Docker image. You can find the docker run
commands and configuration parameters for Replicator in that documentation.
Command line parameters of Replicator executable¶
The available commands line parameters are:
Command line parameter | Value | Description |
---|---|---|
--blacklist |
<Topic Blacklist> | A comma-separated list of topics that should not be replicated, even if they are included in the whitelist or matched by the regular expression. |
--cluster.id |
<Replicator Cluster Id> (required) | Specifies the unique identifier for the Replicator cluster. |
--cluster.threads |
<Total Replicator threads> | The total number of threads across all workers in the Replicator cluster. If this command starts another Replicator worker in an existing cluster, this can be used to change the number of threads in the whole cluster. |
--confluent.license |
<Confluent License Key> | Your Confluent license key that enables you to use Replicator. Without the license key, you can use Replicator for a 30-day trial period. If you are a subscriber, contact Confluent Support for more information. |
--consumer.config |
<consumer.properties> (required) | Specifies the location of the file that contains the configuration settings for the consumer reading from the origin cluster. |
--consumer.monitoring.config |
<consumer-monitoring.properties> | Specifies the location of the file that contains the producer settings for the Kafka cluster where monitoring information about the Replicator consumer is to be sent. This must be specified if monitoring is to be enabled, but may point to a different Kafka cluster than the origin or destination clusters. Use the same file as –producer-config to write metrics to the destination cluster. |
-h , --help |
Display help information | |
--producer.config |
<producer.properties> (required) | Specifies the location of the file that contains the configuration settings for the producer writing to the destination cluster. |
--producer.monitoring.config |
<producer-monitoring.properties> | Specifies the location of the file that contains the producer settings for the Kafka cluster where monitoring information about the Replicator producer is to be sent. This must be specified if monitoring is to be enabled, but may point to a different Kafka cluster than the origin or destination clusters. Use the same file as –producer-config to write metrics to the destination cluster. |
--replication.config |
<replication.properties> | Specifies the location of the file that contains the configuration settings for replication. When used, any property in this file can be overridden via a command line parameter. When this is not supplied, all of the properties defining how topics are to be replicated should be specified on the command line. |
--topic.auto.create |
true/false | Whether to automatically create topics in the destination cluster if required. |
--topic.config.sync |
true/false | Whether to periodically sync topic configuration to the destination cluster. |
--topic.config.sync.interval.ms |
<Topic Config Sync Interval(ms)> | How often to check for configuration changes when ‘topic.config.sync’ is enabled. |
--topic.create.backoff.ms |
<Topic Creation Backoff(ms)> | Time to wait before retrying auto topic creation or expansion. |
--topic.poll.interval.ms |
<Topic Config Sync Interval(ms)> | Specifies how frequently to poll the source cluster for new topics |
--topic.preserve.partitions |
true/false | Whether to automatically increase the number of partitions in the destination cluster to match the source cluster and ensure that messages replicated from the source cluster use the same partition in the destination cluster. |
--topic.regex |
<Regular Expression to Match Topics for Replication> | A regular expression that matches the names of the topics to be replicated. Any topic that matches this expression (or is listed in the whitelist) and not in the blacklist will be replicated. |
--topic.rename.format |
<Rename Format> | A format string for the topic name in the destination cluster, which may contain ${topic} as a placeholder for the originating topic name. For example, ${topic}_dc1 for the topic ‘orders’ will map to the destination topic name ‘orders_dc1.’ Can be placed inside the file specified by –replication.config. |
--topic.timestamp.type |
<Topic Timestamp Type> | The timestamp type for the topics in the destination cluster. |
--whitelist |
<Topic Whitelist> | A comma-separated list of the names of topics that should be replicated. Any topic that is in this list and not in the blacklist will be replicated. |
Run Replicator as a connector¶
To run Replicator as a Connector in the recommended distributed Connect cluster see Manually configure and run Replicator on Connect clusters.
If deploying Confluent Platform on AWS VMs, be aware that VMs with burstable CPU types (T2, T3, T3a, and T4g) will not support high throughput streaming workloads. Replicator worker nodes running on these VMs experience throughput degradation due to credits expiring, making these VMs unsuitable for Confluent Platform nodes expected to run at elevated CPU levels for a sustained period of time, and supporting workloads that are above and beyond their baseline resource rates.
Test your Replicator¶
Following is a generic Replicator testing scenario. A similar testing strategy is covered with more context as a part of the Replicator tutorial in the section Configure and run Replicator.
Create a test topic.
If you haven’t already, create a topic named
test-topic
in the source cluster with the following command../bin/kafka-topics --create --topic test-topic --replication-factor \ 1 --partitions 4 --bootstrap-server localhost:9082 ./bin/kafka-topics --describe --topic test-topic.replica --bootstrap-server localhost:9092
The
kafka-topics --describe --topic
step in the above command checks whethertest-topic.replica
exits. After verifying that the topic exists, confirm that four partitions were created. In general, the Replicator makes sure that the destination topic has at least as many partitions as the source topic. It is fine if it has more, but because the Replicator preserves the partition assignment of the source data, any additional partitions will not be utilized.Send data to the source cluster.
At any time after you’ve created the topic in the source cluster, you can begin sending data to it using a Kafka producer to write to
test-topic
in the source cluster. You can then confirm that the data has been replicated by consuming fromtest-topic.replica
in the destination cluster.For example, to send a sequence of numbers using Kafka’s console producer, you can use the following command.
seq 10000 | ./bin/kafka-console-producer --topic test-topic --broker-list localhost:9082
Run a consumer to confirm that the destination cluster got the data.
You can then confirm delivery in the destination cluster using the console consumer.
./bin/kafka-console-consumer --from-beginning --topic test-topic.replica \ --bootstrap-server localhost:9092
Convert Replicator connector configurations to Replicator executable configurations¶
Replicator connect configuration can be converted to a Replicator executable configuration. One of the key differences between the two is that the Connect configuration has two configuration files (a worker properties file and a connector properties or JSON file) while Replicator executable has three configuration files (a consumer, a producer, and a replication properties file). It’s helpful to think about this in the following way:
- The consumer configuration file contains all the properties you need to configure the consumer embedded within Replicator that consumes from the source cluster. This would include any special configurations you want to use to tune the source consumer, in addition to the necessary security and connection details needed for the consumer to connect to the source cluster.
- The producer configuration file contains all the properties you need to configure the producer embedded within Replicator that produces to the destination cluster. This would include any special configurations you want to use to tune the destination producer, in addition to the necessary security and connection details needed for the producer to connect to the destination cluster.
- The replication configuration file contains all the properties you need to configure the actual Replicator that does the work of taking the data from the source consumer and passing it to the destination producer. This would include all Connect-specific configurations needed for Replicator as well as any necessary Replicator configurations.
If you have the following worker properties:
config.storage.replication.factor=3
offset.storage.replication.factor=3
status.storage.replication.factor=3
connect.protocol=eager
connector.client.config.override.policy=All
bootstrap.servers=destination-cluster:9092
ssl.endpoint.identification.algorithm=https
sasl.mechanism=PLAIN
security.protocol=SASL_SSL
sasl.jaas.config=org.apache.kafka.common.security.plain.PlainLoginModule required username=\"destUser\" password=\"destPassword\";
And the following Replicator JSON:
{
"connector.class":"io.confluent.connect.replicator.ReplicatorSourceConnector",
"tasks.max":4,
"topic.whitelist":"test-topic",
"topic.rename.format":"${topic}.replica",
"confluent.license":"XYZ"
"name": "replicator",
"header.converter": "io.confluent.connect.replicator.util.ByteArrayConverter",
"key.converter": "io.confluent.connect.replicator.util.ByteArrayConverter",
"value.converter": "io.confluent.connect.replicator.util.ByteArrayConverter",
"src.consumer.max.poll.records":"10000",
"producer.override.linger.ms":"10",
"producer.override.compression.type":"lz4",
"src.kafka.bootstrap.servers": "source-cluster:9092",
"src.kafka.ssl.endpoint.identification.algorithm": "https",
"src.kafka.security.protocol": "SASL_SSL",
"src.kafka.sasl.mechanism": "PLAIN",
"src.kafka.sasl.jaas.config": "org.apache.kafka.common.security.plain.PlainLoginModule required username=\"sourceUser\" password=\"sourcePassword\";",
"dest.kafka.bootstrap.servers": "destination-cluster:9092",
"dest.kafka.ssl.endpoint.identification.algorithm": "https",
"dest.kafka.security.protocol": "SASL_SSL",
"dest.kafka.sasl.mechanism": "PLAIN",
"dest.kafka.sasl.jaas.config": "org.apache.kafka.common.security.plain.PlainLoginModule required username=\"destUser\" password=\"destPassword\";"
}
You can convert the configuration shown above in these two configuration files to the following Consumer, Producer, and Replication configurations needed to use Replicator executable:
Consumer Configurations:
bootstrap.servers=source-cluster:9092
ssl.endpoint.identification.algorithm=https
security.protocol=SASL_SSL
sasl.mechanism=PLAIN
sasl.jaas.config=org.apache.kafka.common.security.plain.PlainLoginModule required username=\"sourceUser\" password=\"sourcePassword\";
max.poll.records=10000
For the consumer configurations, strip the src.kafka
and src.consumer
prefixes and simply list the actual configuration you want for the source consumer. The Replicator executable will know that because this has been placed in the consumer configuration, it needs to apply these configurations to the source consumer that will poll the source cluster.
Producer Configurations:
bootstrap.servers=destination-cluster:9092
ssl.endpoint.identification.algorithm=https
security.protocol=SASL_SSL
sasl.mechanism=PLAIN
sasl.jaas.config=org.apache.kafka.common.security.plain.PlainLoginModule required username=\"destUser\" password=\"destPassword\";
linger.ms=10
compression.type=lz4
For the producer configurations, strip the dest.kafka
and producer.overrides
prefixes and simply list the actual configuration you want for the destination producer. The Replicator executable will know that because this has been placed in the producer configuration, it needs to apply these configurations to the destination producer that will write to the destination cluster.
Replication Configurations:
config.storage.replication.factor=3
offset.storage.replication.factor=3
status.storage.replication.factor=3
connect.protocol=eager
tasks.max=4
topic.whitelist=test-topic
topic.rename.format=${topic}.replica
confluent.license=XYZ
name=replicator
header.converter=io.confluent.connect.replicator.util.ByteArrayConverter
key.converter=io.confluent.connect.replicator.util.ByteArrayConverter
value.converter=io.confluent.connect.replicator.util.ByteArrayConverter
For the replication configurations, only include the configurations that are important for Replicator or Connect. It’s important to note that here you don’t need the connector.client.config.override.policy
configuration anymore, as the Replicator executable directly passes in the producer configurations specified in the configuration file.
This makes it easier to think about configuring the important consumers and producers for replication, rather than incorporating an extra Connect configuration.
Manually configure and run Replicator on Connect clusters¶
After downloading and installing Confluent Platform, per Download and install Replicator, bring up two clusters of Kafka brokers.
To learn more, refer to Tutorial: Replicate Data Across Kafka Clusters in Confluent Platform.
Replicator is a Kafka Connect Plugin. To run Replicator, you need to take the following steps:
- Install and Configure the Connect Cluster
- Configure and run a Confluent Replicator on the Connect Cluster
This section walks you through both these steps in detail, and reviews the available configuration options for Replicator.
Configure the Connect cluster for Replicator¶
Replicator runs as a plugin (Connector) in Connect, so you’ll need to run Connect Workers before you can run Replicator. The quick start, shows how to run Replicator in a single node Connect cluster.
Refer to Connect documentation to learn how to run Connect in :distributed mode.
Keep in mind the following recommendations and best practices when configuring distributed Connect Workers for Replicator.
Configuring origin and destination brokers¶
Connect clusters are associated with a cluster of Kafka brokers. The brokers of the Kafka cluster are specified in bootstrap.servers
configuration parameter of the Connect Workers.
If you are configuring a new Connect Worker cluster for running Replicator, make sure this parameter contains the destination Kafka brokers cluster.
If you are planning to run Replicator on an existing Connect cluster, make sure it is already associated with the destination brokers.
Note
Replicator is responsible for reading events from the origin cluster. It then passes the events to the Connect Worker responsible for writing the events to the destination cluster. Therefore, you configure Replicator with information about the origin and the Worker with information about the destination.
Where to install Connect workers¶
If you are replicating events between different datacenters (rather than between two Kafka clusters in the same datacenter), best practice is to run the Connect Workers in the destination datacenter. For example, if you are sending data from New York to San Francisco, Replicator should run in SF and consume data across the US from NYC. The reason for this is that long distance networks can be a bit less reliable than inside a datacenter. If there is a network partition and you lose connectivity between the datacenters, having a consumer that is unable to connect to a cluster is less disruptive than a producer that cannot connect. Remote consuming tends to be a better model than remote producing. That said, there is no inherent risk in running Replicator at the origin datacenter. Replicator will capture and forward all events, including in cases of connectivity loss.
Running Replicator on existing Connect cluster¶
You can run Replicator on the same Connect cluster as other connectors, but in some cases it is not recommended:
- If you are replicating data between two datacenters that are far apart and thus have high latency, you’ll want to tune both the Connect Worker and Replicator appropriately. Intra-DC tuning is different from inter-DC tuning for this very reason. By giving Replicator its own Connect cluster, you can tune the Connect Workers specifically for Replicator without worrying about other connectors being affected.
- Any changes to a connector will cause Replicator to pause while connectors are being re-assigned to Connect Workers. If you frequently start and stop connectors, you may want to run Replicator on its own cluster and allow it to run without interruptions.
Configuring logging for Connect cluster¶
Connect logging is configured in the file etc/kafka/connect-log4j.properties
.
By default, it writes to the console, but for production deployments you should log to a file.
Before you start Replicator, add these lines to the connect-log4j.properties
file:
log4j.appender.file=org.apache.log4j.RollingFileAppender
log4j.appender.file.File=logs/replicator.log
log4j.appender.file.layout=org.apache.log4j.PatternLayout
log4j.appender.file.layout.ConversionPattern=[%d] %p %m (%c:%L)%n
log4j.appender.file.MaxFileSize=10MB
log4j.appender.file.MaxBackupIndex=5
log4j.appender.file.append=true
Add the appender file that you just created to the log4j.rootLogger
parameter:
# By default
log4j.rootLogger=INFO, stdout, file
Configure and run Replicator on the Connect cluster¶
You should have at least one distributed mode Connect Worker already up and running. To learn more, review the distributed mode documentation .
You can check if the Connect Worker is up and running by checking its REST API:
curl http://localhost:8083/
{"version":"7.8.0-ccs","commit":"078e7dc02a100018"}
If everything is fine, you will see a version number and commit hash for the version of the Connect Worker you are running.
Run Replicator by sending the Connect REST API its configuration file in JSON format. Here’s an example configuration:
{
"name":"replicator",
"config":{
"connector.class":"io.confluent.connect.replicator.ReplicatorSourceConnector",
"tasks.max":4,
"key.converter":"io.confluent.connect.replicator.util.ByteArrayConverter",
"value.converter":"io.confluent.connect.replicator.util.ByteArrayConverter",
"src.kafka.bootstrap.servers":"localhost:9082",
"topic.whitelist":"test-topic",
"topic.rename.format":"${topic}.replica",
"confluent.license":"XYZ"
}
}
You can send this to Replicator using curl
. This assumes the above JSON is in a file called example-replicator.json
:
curl -X POST -d @example-replicator.json http://localhost:8083/connectors --header "content-Type:application/json"
This example demonstrates use of some important configuration parameters. For an explanation of all configuration parameters, see Replicator Configuration Reference for Confluent Platform.
key.converter
andvalue.converter
- Classes used to convert Kafka records to Connect’s internal format. The Connect Worker configuration specifies global converters and those will be used if you don’t specify anything in the Replicator configuration. For Replication, however, no conversion is necessary. You just want to read bytes out of the origin cluster and write them to the destination with no changes. Therefore, you can override the global converters with theByteArrayConverter
, which leaves the records as is.src.kafka.bootstrap.servers
- A list of brokers from the origin clustertopic.whitelist
- An explicit list of the topics that you want replicated. The quick start replicates a topic namedtest-topic
.Tip
You can also tell Replicator which topics to replicate using a regular expression with the topic.regex parameter. You should use a regular expression if you want Replicator to automatically start replicating new topics if they match a certain pattern. For example, to replicate all production topics, including new ones, configure Replicator to replicate topics that match
prod.*
. If you add new topics to the list, you must bounce Replicator for the change to take effect.topic.rename.format
- A substitution string that is used to rename topics in the destination cluster. The snippet above uses${topic}.replica
, where${topic}
will be substituted with the topic name from the origin cluster. That means that thetest-topic
being replicated from the origin cluster will be renamed totest-topic.replica
in the destination cluster.confluent.license
- Without the license, you can use Replicator for a 30-day trial period. If you are a Confluent customer, you can contact customer support and ask for a Replicator license. Then use it as shown in the example.
Run Replicator on the source cluster¶
Replicator should be run on the destination cluster if possible. If this is not practical it is possible to run Replicator on the source cluster from Confluent Platform 5.4.0 onwards. Make the following changes to run Replicator in this way:
connector.client.config.override.policy
to be set toAll
in the Connect worker configuration or in--replication.config
if using Replicator Executable.bootstrap.servers
in the Connect worker configuration should point to the source cluster (for Replicator Executable specify this in--producer.config
)- any client configurations (security etc.) for the source cluster should be provided in the Connect worker configuration (for Replicator Executable specify these in
--producer.config
) producer.override.bootstrap.servers
in the connector configuration should point to the destination cluster (for Replicator Executable specify this in--replication.config
)- any client configurations (security etc.) for the destination cluster should be provided in the connector configuration with prefix
producer.override.
(for Replicator Executable specify these in--replication.config
) - configurations with the prefix
src.kafka.
anddest.kafka
should be provided as usual
An example configuration for Replicator running as a connector on the source cluster can be seen below:
{
"connector.class": "io.confluent.connect.replicator.ReplicatorSourceConnector",
"name": "replicator",
"producer.override.ssl.endpoint.identification.algorithm": "https",
"producer.override.sasl.mechanism": "PLAIN",
"producer.override.request.timeout.ms": 20000,
"producer.override.bootstrap.servers": "destination-cluster:9092",
"producer.override.retry.backoff.ms": 500,
"producer.override.sasl.jaas.config": "org.apache.kafka.common.security.plain.PlainLoginModule required username=\"someUser\" password=\"somePassword\";",
"producer.override.security.protocol": "SASL_SSL",
"key.converter": "io.confluent.connect.replicator.util.ByteArrayConverter",
"value.converter": "io.confluent.connect.replicator.util.ByteArrayConverter",
"topic.whitelist": "someTopic",
"src.kafka.bootstrap.servers": "source-cluster:9092",
"dest.kafka.bootstrap.servers": "destination-cluster:9092",
"dest.kafka.ssl.endpoint.identification.algorithm": "https",
"dest.kafka.security.protocol": "SASL_SSL",
"dest.kafka.sasl.mechanism": "PLAIN",
"dest.kafka.sasl.jaas.config": "org.apache.kafka.common.security.plain.PlainLoginModule required username=\"someUser\" password=\"somePassword\";"
}
In this configuration Replicator is producing between clusters rather than consuming and the default producer configurations are not optimal for this. Consider adjusting the following configurations to increase the throughput of the producer flow:
producer.override.linger.ms=500
producer.override.batch.size=600000
These values are provided as a starting point only and should be further tuned to your environment and use case.
For more detail on running Replicator on the source cluster when the destination is Confluent Cloud, see Confluent Replicator to Confluent Cloud Configurations.
License key¶
Without the license key, you can use Replicator Executable for a 30-day trial period. If you are a Confluent customer,
you can contact customer support and ask for a Replicator license key. Then, use the key you received from Confluent
support with --confluent.license
command line parameter or by adding it to the confluent.license
property
within the replication configuration file you pass to --replication.config
.
Important
From 5.5.0 onwards, Replicator will fail immediately after the license key expires, even if Replicator is running.