Example: Use Confluent Replicator to Copy Kafka Data to Confluent Cloud

Whether you are migrating from on-premises to cloud or have a persistent “bridge to cloud” strategy, you can use Confluent Replicator to copy Kafka data to Confluent Cloud. Learn the different ways to configure Replicator and Kafka Connect.

../../_images/replicator-to-ccloud.png

Important

Confluent Cloud examples that use actual Confluent Cloud resources may be billable. An example may create a new Confluent Cloud environment, Kafka cluster, topics, ACLs, and service accounts, and resources that have hourly charges such as connectors and ksqlDB applications. To avoid unexpected charges, carefully evaluate the cost of resources before you start. After you are done running a Confluent Cloud example, destroy all Confluent Cloud resources to avoid accruing hourly charges for services and verify that they have been deleted.

Concepts

Before diving into the different ways to configure Replicator, first review some basic concepts regarding Replicator and Kafka Connect. This will help you understand the logic for configuring Replicator because the way that the Kafka Connect cluster is configured dictates how Replicator should be configured.

Replicator is a Kafka connector and runs on Connect workers. Even the Replicator executable has a bundled Connect worker inside.

Replicator has an embedded consumer that reads data from the origin cluster, and the Connect worker has an embedded producer that copies that data to the destination cluster, which in this case is Confluent Cloud. To configure the proper connection information for Replicator to interact with the origin cluster, use the prefix src.. Replicator also has an admin client that it needs for interacting with the destination cluster, and this client can be configured with the prefix dest..

A Connect worker also has an admin client for creating Kafka topics for its own management, offset.storage.topic, config.storage.topic, and status.storage.topic, and these are in the Kafka cluster that backs the Connect worker. The Kafka Connect embedded producer can be configured directly on the Connect worker or overridden by any connector, including Replicator.

Configuration Types

The first type is where Replicator runs on a self-managed Connect cluster that is backed to the destination Confluent Cloud cluster. This allows Replicator, which is a source connector, to leverage the default behavior of the Connect worker’s admin client and embedded producer.

../../_images/replicator-worker-destination.png

For this case where Replicator runs on a Connect Cluster Backed to Destination, there are two configuration examples:

There are scenarios in which your self-managed Connect cluster may not be able to be backed to the destination Confluent Cloud cluster. For example, some highly secure clusters may block incoming network connections and only allow push connections, in which case an incoming connection from Replicator running on the destination cluster to the origin cluster would fail. In this case, you can have a Connect cluster backed to the origin cluster instead and push the replicated data to the destination cluster. This second configuration type is more complex because there are overrides you will need to configure.

../../_images/replicator-worker-origin.png

For this case where Replicator runs on a Connect Cluster Backed to Origin, there are two configuration examples:

Connect Cluster Backed to Destination

For this case where Replicator runs on a Connect Cluster Backed to Destination, there are two configuration examples:

on-premises to Confluent Cloud with Connect Backed to Destination

In this example, Replicator copies data from an on-premises Kafka cluster to Confluent Cloud, and Replicator runs on a Connect cluster backed to the destination Confluent Cloud cluster.

See also

There are many other configuration parameters for the Connect worker and Replicator, but these examples show you only the significant ones. For an example with complete configurations, refer to the Connect worker backed to destination Docker configuration and the Replicator reading from an on-premises cluster configuration file.

../../_images/onprem-ccloud-destination.png

Configure Kafka Connect

Set the management topics to replication factor of 3 as required by Confluent Cloud.

replication.factor=3
config.storage.replication.factor=3
offset.storage.replication.factor=3
status.storage.replication.factor=3

The Connect worker’s admin client requires connection information to the destination Confluent Cloud.

# Configuration for embedded admin client
bootstrap.servers=<bootstrap-servers-destination>
sasl.mechanism=PLAIN
security.protocol=SASL_SSL
sasl.jaas.config=org.apache.kafka.common.security.plain.PlainLoginModule required username="<api-key-destination>" password="<api-secret-destination>";

The Connect worker’s embedded producer requires connection information to the destination Confluent Cloud.

# Configuration for embedded producer
producer.sasl.mechanism=PLAIN
producer.security.protocol=SASL_SSL
producer.sasl.jaas.config=org.apache.kafka.common.security.plain.PlainLoginModule required username="<api-key-destination>" password="<api-secret-destination>";

If you are using Confluent Control Center and doing stream monitoring then the embedded producer’s monitoring interceptors require connection information to the destination Confluent Cloud.

# Configuration for embedded producer.confluent.monitoring.interceptor
producer.confluent.monitoring.interceptor.sasl.mechanism=PLAIN
producer.confluent.monitoring.interceptor.security.protocol=SASL_SSL
producer.confluent.monitoring.interceptor.sasl.jaas.config=org.apache.kafka.common.security.plain.PlainLoginModule required username="<api-key-destination>" password="<api-secret-destination>";

Configure Replicator

The origin cluster in this case is your on-premises Kafka cluster, and Replicator needs to know how to connect to this origin cluster which can be set by using the prefix src. for these configuration parameters. The origin cluster can have a varied set of security features enabled, but for simplicity this example shows no security configurations, just PLAINTEXT (see this page for more Replicator security configuration options).

src.kafka.bootstrap.servers=<bootstrap-servers-origin>

The destination cluster is your Confluent Cloud cluster, and Replicator needs to know how to connect to it. Use the prefix dest. to set these configuration parameters.

# Confluent Replicator license topic must have replication factor set to 3 for |ccloud|
confluent.topic.replication.factor=3

# New user topics that |crep-full| creates must have replication factor set to 3 for |ccloud|
dest.topic.replication.factor=3

If your deployment has Confluent Control Center end-to-end streams monitoring setup to gather data in Confluent Cloud, then you also need to setup the Confluent Monitoring Interceptors to send data to your Confluent Cloud cluster, which also requires appropriate connection information set for the embedded consumer with the prefix src.consumer.

src.consumer.interceptor.classes=io.confluent.monitoring.clients.interceptor.MonitoringConsumerInterceptor
src.consumer.confluent.monitoring.interceptor.bootstrap.servers=<bootstrap-servers-destination>
src.consumer.confluent.monitoring.interceptor.security.protocol=SASL_SSL
src.consumer.confluent.monitoring.interceptor.sasl.mechanism=PLAIN
src.consumer.confluent.monitoring.interceptor.sasl.jaas.config=org.apache.kafka.common.security.plain.PlainLoginModule required username="<api-key-destination>" password="<api-secret-destination>";

Configure ACLs

Replicator must have authorization to read Kafka data from the origin cluster and write Kafka data in the destination Confluent Cloud cluster. Replicator should be run with a Confluent Cloud service account, not super user credentials, so use the Confluent CLI to configure appropriate ACLs for the service account id corresponding to Replicator in Confluent Cloud.

For details on how to configure these ACLs for Replicator, see Security and ACL Configurations.

Confluent Cloud to Confluent Cloud with Connect Backed to Destination

In this example, Replicator copies data from one Confluent Cloud cluster to another Confluent Cloud cluster, and Replicator runs on a Connect cluster backed to the destination Confluent Cloud cluster.

See also

There are many other configuration parameters for the Connect worker and Replicator, but we will show you only the significant ones. For an example with complete configurations, refer to the Connect worker backed to destination Docker configuration and the Replicator reading from Confluent Cloud configuration file.

../../_images/ccloud-ccloud-destination.png

Configure Kafka Connect

Set the management topics to replication factor of 3 as required by Confluent Cloud.

replication.factor=3
config.storage.replication.factor=3
offset.storage.replication.factor=3
status.storage.replication.factor=3

The Connect worker’s admin client requires connection information to the destination Confluent Cloud.

# Configuration for embedded admin client
bootstrap.servers=<bootstrap-servers-destination>
sasl.mechanism=PLAIN
security.protocol=SASL_SSL
sasl.jaas.config=org.apache.kafka.common.security.plain.PlainLoginModule required username="<api-key-destination>" password="<api-secret-destination>";

The Connect worker’s embedded producer requires connection information to the destination Confluent Cloud.

# Configuration for embedded producer
producer.sasl.mechanism=PLAIN
producer.security.protocol=SASL_SSL
producer.sasl.jaas.config=org.apache.kafka.common.security.plain.PlainLoginModule required username="<api-key-destination>" password="<api-secret-destination>";

If you are using Confluent Control Center and doing stream monitoring then the embedded producer’s monitoring interceptors require connection information to the destination Confluent Cloud.

# Configuration for embedded producer.confluent.monitoring.interceptor
producer.confluent.monitoring.interceptor.sasl.mechanism=PLAIN
producer.confluent.monitoring.interceptor.security.protocol=SASL_SSL
producer.confluent.monitoring.interceptor.sasl.jaas.config=org.apache.kafka.common.security.plain.PlainLoginModule required username="<api-key-destination>" password="<api-secret-destination>";

Configure Replicator

The origin cluster in this case is a Confluent Cloud cluster, and Replicator admin client needs to know how to connect to this origin cluster, which can be configured by using the prefix src.kafka. for these connection configuration parameters.

src.kafka.bootstrap.servers=<bootstrap-servers-origin>
src.kafka.security.protocol=SASL_SSL
src.kafka.sasl.mechanism=PLAIN
src.kafka.sasl.jaas.config=org.apache.kafka.common.security.plain.PlainLoginModule required username="<api-key-origin>" password="<api-secret-origin>";

The destination cluster is your Confluent Cloud cluster, and Replicator needs to know how to connect to it. Use the prefix dest. to set these configuration parameters.

# Confluent Replicator license topic must have replication factor set to 3 for |ccloud|
confluent.topic.replication.factor=3

# New user topics that |crep-full| creates must have replication factor set to 3 for |ccloud|
dest.topic.replication.factor=3

If your deployment has Confluent Control Center end-to-end streams monitoring setup to gather data in Confluent Cloud, then you also need to setup the Confluent Monitoring Interceptors to send data to your Confluent Cloud cluster, which also requires appropriate connection information set for the embedded consumer with the prefix src.consumer.

src.consumer.interceptor.classes=io.confluent.monitoring.clients.interceptor.MonitoringConsumerInterceptor
src.consumer.confluent.monitoring.interceptor.bootstrap.servers=<bootstrap-servers-destination>
src.consumer.confluent.monitoring.interceptor.security.protocol=SASL_SSL
src.consumer.confluent.monitoring.interceptor.sasl.mechanism=PLAIN
src.consumer.confluent.monitoring.interceptor.sasl.jaas.config=org.apache.kafka.common.security.plain.PlainLoginModule required username="<api-key-destination>" password="<api-secret-destination>";

Configure ACLs

Replicator must have authorization to read Kafka data from the origin cluster and write Kafka data in the destination Confluent Cloud cluster. Replicator should be run with a Confluent Cloud service account, not super user credentials, so use the Confluent CLI to configure appropriate ACLs for the service account id corresponding to Replicator. Since the origin cluster and destination cluster in this example are both Confluent Cloud, configure appropriate ACLs for the service account ids corresponding to Replicator in both Confluent Cloud clusters.

For details on how to configure these ACLs for Replicator, see Security and ACL Configurations.

Connect Cluster Backed to Origin

For this case where Replicator runs on a Connect Cluster Backed to Origin, there are two configuration examples:

On-premises to Confluent Cloud with Connect Backed to Origin

In this example, Replicator copies data from an on-premises Kafka cluster to Confluent Cloud, and Replicator runs on a Connect cluster backed to the origin on-premises cluster.

See also

There are many other configuration parameters for the Connect worker and Replicator, but we will show you only the significant ones. For an example with complete configurations, refer to the Connect worker backed to origin Docker configuration and the Replicator reading from an on-premises cluster configuration file.

../../_images/onprem-ccloud-origin.png

Configure Kafka Connect

The Connect worker is backed to the origin on-premises Kafka cluster, so set the replication factor required for the origin on-premises cluster:

replication.factor=<replication-factor-origin>
config.storage.replication.factor=<replication-factor-origin>
offset.storage.replication.factor=<replication-factor-origin>
status.storage.replication.factor=<replication-factor-origin>

The origin on-premises Kafka cluster can have a varied set of security features enabled, but for simplicity in this example we show no security configurations, just PLAINTEXT (see this page for more Replicator security configuration options). The Connect worker’s admin client requires connection information to the on-premises cluster.

bootstrap.servers=<bootstrap-servers-origin>

Finally, configure the Connect worker to allow overrides, because Replicator needs to override the default behavior of the Connect worker’s embedded producer.

connector.client.config.override.policy=All

Configure Replicator

The origin cluster in this case is your on-premises Kafka cluster, and Replicator needs to know how to connect to this origin cluster which can be set by using the prefix src. for these configuration parameters. The origin cluster can have a varied set of security features enabled, but for simplicity this example shows no security configurations, just PLAINTEXT (see this page for more Replicator security configuration options).

src.kafka.bootstrap.servers=<bootstrap-servers-origin>

The destination cluster is your Confluent Cloud cluster, and Replicator needs to know how to connect to it. Use the prefix dest. to set these configuration parameters.

# Confluent Replicator license topic must have replication factor set to 3 for |ccloud|
confluent.topic.replication.factor=3

# New user topics that |crep-full| creates must have replication factor set to 3 for |ccloud|
dest.topic.replication.factor=3

# Connection information to Confluent Cloud
dest.kafka.bootstrap.servers=<bootstrap-servers-destination>
dest.kafka.security.protocol=SASL_SSL
dest.kafka.sasl.mechanism=PLAIN
dest.kafka.sasl.jaas.config=org.apache.kafka.common.security.plain.PlainLoginModule required username="<api-key-destination>" password="<api-secret-destination>";

If your deployment has Confluent Control Center end-to-end streams monitoring setup to gather data in Confluent Cloud, then you also need to setup the Confluent Monitoring Interceptors to send data to your Confluent Cloud cluster, which also requires appropriate connection information set for the embedded consumer with the prefix src.consumer.

src.consumer.interceptor.classes=io.confluent.monitoring.clients.interceptor.MonitoringConsumerInterceptor
src.consumer.confluent.monitoring.interceptor.bootstrap.servers=<bootstrap-servers-destination>
src.consumer.confluent.monitoring.interceptor.security.protocol=SASL_SSL
src.consumer.confluent.monitoring.interceptor.sasl.mechanism=PLAIN
src.consumer.confluent.monitoring.interceptor.sasl.jaas.config=org.apache.kafka.common.security.plain.PlainLoginModule required username="<api-key-destination>" password="<api-secret-destination>";

Since the Connect workers are backed to the origin cluster, its embedded producers would write to the origin cluster, which is not desired in this case. To override the embedded producers, configure Replicator to write to the destination Confluent Cloud cluster by adding connection information to Confluent Cloud with the prefix producer.override.:

producer.override.bootstrap.servers=<bootstrap-servers-destination>
producer.override.security.protocol=SASL_SSL
producer.override.sasl.mechanism=PLAIN
producer.override.sasl.login.callback.handler.class=org.apache.kafka.common.security.authenticator.AbstractLogin$DefaultLoginCallbackHandler
producer.override.sasl.jaas.config=org.apache.kafka.common.security.plain.PlainLoginModule required username="<api-key-destination>" password="<api-secret-destination>";

Configure ACLs

Replicator must have authorization to read Kafka data from the origin cluster and write Kafka data in the destination Confluent Cloud cluster. Replicator should be run with a Confluent Cloud service account, not super user credentials, so use the Confluent CLI to configure appropriate ACLs for the service account id corresponding to Replicator in Confluent Cloud.

For details on how to configure these ACLs for Replicator, see Security and ACL Configurations.

Confluent Cloud to Confluent Cloud with Connect Backed to Origin

In this example, Replicator copies data from one Confluent Cloud cluster to another Confluent Cloud cluster, and Replicator runs on a Connect cluster backed to the origin on-prem cluster.

See also

There are many other configuration parameters for the Connect worker and Replicator, but we will show you only the significant ones. For an example with complete configurations, refer to the Connect worker backed to origin Docker configuration and the Replicator reading from Confluent Cloud configuration file.

../../_images/ccloud-ccloud-origin.png

Configure Kafka Connect

Set the management topics to replication factor of 3 as required by Confluent Cloud.

replication.factor=3
config.storage.replication.factor=3
offset.storage.replication.factor=3
status.storage.replication.factor=3

The Connect worker’s admin client requires connection information to the origin Confluent Cloud.

# Configuration for embedded admin client
bootstrap.servers=<bootstrap-servers-destination>
sasl.mechanism=PLAIN
security.protocol=SASL_SSL
sasl.jaas.config=org.apache.kafka.common.security.plain.PlainLoginModule required username="<api-key-origin>" password="<api-secret-origin>";

The Connect worker’s embedded producer requires connection information to the origin Confluent Cloud.

# Configuration for embedded producer
producer.sasl.mechanism=PLAIN
producer.security.protocol=SASL_SSL
producer.sasl.jaas.config=org.apache.kafka.common.security.plain.PlainLoginModule required username="<api-key-origin>" password="<api-secret-origin>";

If you are using Confluent Control Center and doing stream monitoring then the embedded producer’s monitoring interceptors require connection information to the origin Confluent Cloud.

# Configuration for embedded producer.confluent.monitoring.interceptor
producer.confluent.monitoring.interceptor.sasl.mechanism=PLAIN
producer.confluent.monitoring.interceptor.security.protocol=SASL_SSL
producer.confluent.monitoring.interceptor.sasl.jaas.config=org.apache.kafka.common.security.plain.PlainLoginModule required username="<api-key-origin>" password="<api-secret-origin>";

Finally, configure the Connect worker to allow overrides, because Replicator needs to override the default behavior of the Connect worker’s embedded producer.

connector.client.config.override.policy=All

Configure Replicator

The origin cluster in this case is a Confluent Cloud cluster, and Replicator admin client needs to know how to connect to this origin cluster, which can be configured by using the prefix src.kafka. for these connection configuration parameters.

src.kafka.bootstrap.servers=<bootstrap-servers-origin>
src.kafka.security.protocol=SASL_SSL
src.kafka.sasl.mechanism=PLAIN
src.kafka.sasl.jaas.config=org.apache.kafka.common.security.plain.PlainLoginModule required username="<api-key-origin>" password="<api-secret-origin>";

The destination cluster is your Confluent Cloud cluster, and Replicator needs to know how to connect to it. Use the prefix dest. to set these configuration parameters.

# Confluent Replicator license topic must have replication factor set to 3 for |ccloud|
confluent.topic.replication.factor=3

# New user topics that |crep-full| creates must have replication factor set to 3 for |ccloud|
dest.topic.replication.factor=3

# Connection information to Confluent Cloud
dest.kafka.bootstrap.servers=<bootstrap-servers-destination>
dest.kafka.security.protocol=SASL_SSL
dest.kafka.sasl.mechanism=PLAIN
dest.kafka.sasl.jaas.config=org.apache.kafka.common.security.plain.PlainLoginModule required username="<api-key-destination>" password="<api-secret-destination>";

If your deployment has Confluent Control Center end-to-end streams monitoring setup to gather data in Confluent Cloud, then you also need to setup the Confluent Monitoring Interceptors to send data to your Confluent Cloud cluster, which also requires appropriate connection information set for the embedded consumer with the prefix src.consumer.

src.consumer.interceptor.classes=io.confluent.monitoring.clients.interceptor.MonitoringConsumerInterceptor
src.consumer.confluent.monitoring.interceptor.bootstrap.servers=<bootstrap-servers-destination>
src.consumer.confluent.monitoring.interceptor.security.protocol=SASL_SSL
src.consumer.confluent.monitoring.interceptor.sasl.mechanism=PLAIN
src.consumer.confluent.monitoring.interceptor.sasl.jaas.config=org.apache.kafka.common.security.plain.PlainLoginModule required username="<api-key-destination>" password="<api-secret-destination>";

Since the Connect workers are backed to the origin cluster, its embedded producers would write to the origin cluster, which is not desired in this case. To override the embedded producers, configure Replicator to write to the destination Confluent Cloud cluster by adding connection information to Confluent Cloud with the prefix producer.override.:

producer.override.bootstrap.servers=<bootstrap-servers-destination>
producer.override.security.protocol=SASL_SSL
producer.override.sasl.mechanism=PLAIN
producer.override.sasl.login.callback.handler.class=org.apache.kafka.common.security.authenticator.AbstractLogin$DefaultLoginCallbackHandler
producer.override.sasl.jaas.config=org.apache.kafka.common.security.plain.PlainLoginModule required username="<api-key-destination>" password="<api-secret-destination>";

Configure ACLs

Replicator must have authorization to read Kafka data from the origin cluster and write Kafka data in the destination Confluent Cloud cluster. Replicator should be run with a Confluent Cloud service account, not super user credentials, so use the Confluent CLI to configure appropriate ACLs for the service account id corresponding to Replicator. Since the origin cluster and destination cluster in this example are both Confluent Cloud, configure appropriate ACLs for the service account ids corresponding to Replicator in both Confluent Cloud clusters.

For details on how to configure these ACLs for Replicator, see Security and ACL Configurations.

Any Confluent Cloud example uses real Confluent Cloud resources. After you are done running a Confluent Cloud example, manually verify that all Confluent Cloud resources are destroyed to avoid unexpected charges.

Additional Resources

  • For additional considerations on running Replicator to Confluent Cloud, refer to Migrate Topics on Confluent Cloud Clusters.
  • To run a Confluent Cloud demo that showcases a hybrid Kafka cluster: one cluster is a self-managed Kafka cluster running locally, the other is a Confluent Cloud cluster, see cp-demo.
  • To find additional Confluent Cloud demos, see Confluent Cloud Tutorials.
  • For a practical guide to configuring, monitoring, and optimizing your Kafka client applications, see Developing Client Applications on Confluent Cloud.
  • To run a Replicator tutorial with an active-active multi-datacenter design, with two instances of Confluent Replicator that copy data bidirectionally between the datacenters, see Replicator Demo on Docker.