Use Replicator to Migrate Topics on Confluent Cloud Clusters

Confluent Cloud offers multiple Kafka cluster types (Basic, Standard, Enterprise and Dedicated).

If you want to migrate topic data from one Confluent Cloud cluster to another, you have these two options for doing so:

Replicator Deployment Options

Migrating topic data is achieved by running Replicator in one of three modes. They are functionally equivalent, but you might prefer one over the other based on your starting point.

Replicator Mode Advantages and Scenarios
As a connector within a distributed Connect cluster (on a VM) Ideal if you already have a Connect cluster in use with the destination cluster.
As a packaged executable on a VM

Isolates three easy-to-use config files (for replicator, consumer, producer), and avoids having to explicitly configure the Connect cluster.

The Quick Start walks through an example of running Replicator as this type of executable.

As a packaged executable on Kubernetes Similar to the above, but might be easier to start as a single isolated task. Ideal if you are already managing tasks within Kubernetes.
As a fully-managed custom connector for Confluent Cloud

To learn more about custom connectors, see:

Prerequisite Considerations

  • Confluent Platform is required for these workflows because Replicator is bundled with Confluent Platform.
  • Before you start topic migration, be sure to plan for network and security considerations, save non-topic metadata, and gather cluster information. These prerequisites are described below.
  • These procedures cover topic data only. You must migrate other aspects of the cluster separately, such as running connectors, ACLs, Streams apps, ksqlDB streams, and schemas. This is noted again at applicable points in the workflow.

Choose a location

The virtual machine (VM) or Kubernetes (k8s) instances must be deployed in a virtual private cloud (VPC) that has network access to your Confluent Cloud clusters. As a best practice, run Replicator as close to the destination cluster as possible. For Confluent Cloud this means the Replicator cluster should run in the same region as the destination Confluent Cloud deployment.

If one of your Confluent Cloud clusters is VPC-peered, then the Replicator must be deployed in one of the VPCs that is peered to Confluent Cloud.

Determine cluster information

Replicator configuration requires that you know:

  • API keys and secrets for the source and destination clusters.

  • The bootstrap.servers URL.

    To get this, navigate to Cluster Settings on the Cloud Console, and note down the URL next to bootstrap.servers.

Save non-topic metadata and set up your new cluster before cutover

Replicator will not sync ksqlDB jobs, schemas, ACLs, connectors, or service accounts to your new cluster.

Make note of all such non-topic metadata that you want to migrate, and manually recreate these on the new cluster before turning on Replicator to replicate topic data.

Configure consumers with timestamp interceptor for offset translation

Configure Java consumers applications with ConsumerTimestampsInterceptor, which automatically translates offsets using timestamps. This enables consumers to start consuming data in the destination cluster where they left off on the origin cluster.

interceptor.classes=io.confluent.connect.replicator.offsets.ConsumerTimestampsInterceptor

Important

  • You can only set the timestamp-interceptor for consumers.
  • Do not set the interceptor for Replicator.
  • Use discretion when setting the interceptor on Connect workers; for the interceptor to be useful, the worker must be running sink connectors and these must use Kafka for their offset management.

This interceptor is located under kafka-connect-replicator in (timestamp-interceptor-<release>.jar), where <release> is the Confluent Platform version. This JAR must be available on the CLASSPATH of the consumer.

The location of the JAR file is dependent on your platform and type of Confluent install. For example, on Mac OS install with zip.tar, by default, timestamp-interceptor-<version>.jar is in the Confluent directory in /share/java/kafka-connect-replicator.

The timestamp-interceptor is located in the Confluent Maven repository:

<repository>
  <id>confluent</id>
  <url>https://packages.confluent.io/maven/</url>
</repository>

For Maven projects, include the following dependency, where release is the Confluent Platform version:

<dependency>
    <groupId>io.confluent</groupId>
    <artifactId>timestamp-interceptor</artifactId>
    <version>release</version>
</dependency>

For example, for Confluent Platform version 7.5.0:

<dependency>
    <groupId>io.confluent</groupId>
    <artifactId>timestamp-interceptor</artifactId>
    <version>7.5.0</version>
</dependency>

The ConsumerTimestampsInterceptor will write to a new topic __consumer_timestamps to preserve the required timestamp information for the offset translation. The Replicator reads from this new topic and will use the timestamps to determine the correct offsets in the secondary cluster.

Important

The ConsumerTimestampsInterceptor is a producer to the __consumer_timestamps topic on the source cluster and as such requires appropriate security configurations. These should be provided with the timestamps.producer. prefix. for example, timestamps.producer.security.protocol=SSL. For more information on security configurations see:

The interceptor also requires ACLs for the __consumer_timestamps topic. The consumer principal requires WRITE and DESCRIBE operations on the __consumer_timestamps topic.

To learn more, see:

Configure ACLs

Replicator must have authorization to read Kafka data from the origin cluster and write Kafka data in the destination Confluent Cloud cluster. Replicator should be run as a Confluent Cloud service account, not with super user credentials, so use the Confluent CLI to configure the appropriate ACLs for the service account id corresponding to Replicator in Confluent Cloud. For more details on Replicator ACLs, see Security.

confluent kafka acl create --allow --service-account <service-account-id> --operations create --topic <replicated-topic>
confluent kafka acl create --allow --service-account <service-account-id> --operations write --topic <replicated-topic>
confluent kafka acl create --allow --service-account <service-account-id> --operations read --topic <replicated-topic>
confluent kafka acl create --allow --service-account <service-account-id> --operations describe --topic <replicated-topic>
confluent kafka acl create --allow --service-account <service-account-id> --operations describe-configs --topic <replicated-topic>
confluent kafka acl create --allow --service-account <service-account-id> --operations alter-configs --topic <replicated-topic>
confluent kafka acl create --allow --service-account <service-account-id> --operations describe --cluster-scope

Additionally, you need to set the following license topic ACLs on the internal topic, _confluent-command.

confluent kafka acl create --allow --service-account <service-account-id> --operations read --topic _confluent-command
confluent kafka acl create --allow --service-account <service-account-id> --operations write --topic _confluent-command
confluent kafka acl create --allow --service-account <service-account-id> --operations describe --topic _confluent-command

The license topic should be created by default, but if for some reason this topic does not already exist, you must set CREATE along with DESCRIBE ACLs on the cluster. You already set DESCRIBE above, but it is mentioned again here, to clarify its purpose. Set these ACLs on the --cluster-scope; this flag takes no arguments. It sets the ACLs on whichever cluster is in use. (To determine which cluster is in use, type confluent kafka cluster list. The cluster with the asterisk by it is in use. To use a different cluster, type confluent kafka cluster use <cluster-id>.)

confluent kafka acl create --allow --service-account <service-account-id> --operations create --cluster-scope
confluent kafka acl create --allow --service-account <service-account-id> --operations describe --cluster-scope

Tip

  • If you are running Replicator as an executable, Replicator runs on an internal Connect worker so you need the ACLs described in Worker ACL Requirements.
  • If you are running Replicator as a connector, the Replicator configuration, if not indicated, will inherit the configuration from the Connect cluster (which requires the same ACLs as indicated above).

Configure and Run Replicator to migrate topics

After accomplishing all prerequisites, configure and run Replicator to migrate topics.

Choose one of the following methods to do so, based on your preference for Replicator modes.

For any of the deployment methods listed below, Replicator can be further tuned using the guide here.

Run Replicator as an executable

This applies to both the VM and Kubernetes based approaches.

You can run Replicator executable on any virtual/cloud node or on a physical machine (desktop or laptop) that is connected to the network.

Configure properties

There are three config files for the executable (consumer, producer, and replication), and the minimal configuration changes for these are shown below.

  • consumer.properties

    ssl.endpoint.identification.algorithm=https
    sasl.mechanism=PLAIN
    bootstrap.servers=<source bootstrap-server>
    retry.backoff.ms=500
    sasl.jaas.config=org.apache.kafka.common.security.plain.PlainLoginModule required username="<api-key>" password="<secret>";
    security.protocol=SASL_SSL
    
  • producer.properties

    ssl.endpoint.identification.algorithm=https
    sasl.mechanism=PLAIN
    bootstrap.servers=<destination bootstrap-server>
    sasl.jaas.config=org.apache.kafka.common.security.plain.PlainLoginModule required username="<api-key>" password="<secret>";
    security.protocol=SASL_SSL
    
  • replication.properties - No special configuration is required in replication.properties.

Run Replicator

Run the Replicator executable to migrate topics.

./bin/replicator --cluster.id replicator --consumer.config consumer.properties --producer.config producer.properties --topic.regex '.*'

To learn more, see Run Replicator as an executable and Command Line Parameters of Replicator Executable in the Replicator documentation.

Run Replicator as a connector

The connector JSON should look like this:

{
"name": "replicate-topic",
"config": {
    "connector.class": "io.confluent.connect.replicator.ReplicatorSourceConnector",
    "key.converter": "io.confluent.connect.replicator.util.ByteArrayConverter",
    "value.converter": "io.confluent.connect.replicator.util.ByteArrayConverter",
    "src.kafka.ssl.endpoint.identification.algorithm":"https",
    "src.kafka.sasl.mechanism":"PLAIN",
    "src.kafka.request.timeout.ms":"20000",
    "src.kafka.bootstrap.servers":"<source bootstrap server>",
    "src.kafka.retry.backoff.ms":"500",
    "src.kafka.sasl.jaas.config":"org.apache.kafka.common.security.plain.PlainLoginModule required username=\"<api-key>\" password=\"<secret>\";",
    "src.kafka.security.protocol":"SASL_SSL",
    "dest.kafka.ssl.endpoint.identification.algorithm":"https",
    "dest.kafka.sasl.mechanism":"PLAIN",
    "dest.kafka.request.timeout.ms":"20000",
    "dest.kafka.bootstrap.servers":"<destination bootstrap server>",
    "dest.kafka.retry.backoff.ms":"500",
    "dest.kafka.sasl.jaas.config":"org.apache.kafka.common.security.plain.PlainLoginModule required username=\"<api-key>\" password=\"<secret>\";",
    "dest.kafka.security.protocol":"SASL_SSL",
    "dest.topic.replication.factor":"3",
    "topic.regex":".*"
    }
}

If you have not already done so, configure the distributed Connect cluster correctly as shown here.

ssl.endpoint.identification.algorithm=https
sasl.mechanism=PLAIN
bootstrap.servers=<dest bootstrap server>
sasl.jaas.config=org.apache.kafka.common.security.plain.PlainLoginModule required username="<api-key>" password="<secret>";
security.protocol=SASL_SSL
producer.ssl.endpoint.identification.algorithm=https
producer.sasl.mechanism=PLAIN
producer.bootstrap.servers=<dest bootstrap server>
producer.sasl.jaas.config=org.apache.kafka.common.security.plain.PlainLoginModule required username="<api-key>" password="<secret>";
producer.security.protocol=SASL_SSL
key.converter=org.apache.kafka.connect.storage.StringConverter
value.converter=org.apache.kafka.connect.json.JsonConverter
group.id=connect-replicator
config.storage.topic=connect-configs1
offset.storage.topic=connect-offsets1
status.storage.topic=connect-statuses1
plugin.path=<confluent install dir>/share/java

To learn more, see Run Replicator as a Connector in the Replicator documentation.

Run Replicator Docker Container with Kubernetes

  1. Delete existing secret (if it exists).

    kubectl delete secret replicator-secret-props
    
  2. Regenerate configs, if changed. (See the Quick Start.)

  3. Upload the new secret.

    kubectl create secret generic replicator-secret-props --from-file=/tmp/replicator/
    
  4. Reload pods.

    kubectl apply -f container/replicator-deployment.yaml
    

    Here is an example replicator-deployment.yaml.

    apiVersion: extensions/v1beta1
    kind: Deployment
    metadata:
      name: repl-exec-connect-cluster
    spec:
      replicas: 1
      template:
        metadata:
          labels:
            app: replicator-app
        spec:
          containers:
            - name: confluent-replicator
              image: confluentinc/cp-enterprise-replicator-executable
              env:
                - name: CLUSTER_ID
                  value: "replicator-k8s"
                - name: CLUSTER_THREADS
                  value: "1"
                - name: CONNECT_GROUP_ID
                  value: "containerized-repl"
    
                  # Note: This is to avoid _overlay errors_ . You could use /etc/replicator/ here instead.
                - name: REPLICATION_CONFIG
                  value: "/etc/replicator-config/replication.properties"
                - name: PRODUCER_CONFIG
                  value: "/etc/replicator-config/producer.properties"
                - name: CONSUMER_CONFIG
                  value: "/etc/replicator-config/consumer.properties"
              volumeMounts:
                - name: replicator-properties
                  mountPath: /etc/replicator-config/
          volumes:
            - name: replicator-properties
              secret:
                secretName: "replicator-secret-props"
                defaultMode: 0666
    
  5. Verify status.

    kubectl get pods kubectl logs <pod-name> -f
    

Cutover clients to new cluster

After Replicator is set up, cut over clients to the new cluster as follows.

  1. Confirm that data is replicating from the source to the destination by viewing the replication lag on the Replication tab on Confluent Cloud Console.

  2. Stop producers.

  3. Wait for consumer lag to go to zero.

    To monitor this, run bin/kafka-consumer-groups as described in here and/or navigate to Consumer Lag on the Cloud Console and select a consumer group.

  4. Wait for replicator lag to go to zero by looking at the Replicator consumer group lag, then restart consumers to point at the new cluster.

    • To monitor replicator lag, run bin/kafka-consumer-groups as described in here and/or navigate to Consumer Lag on the Cloud Console and select a consumer group.
    • After restarting consumers, use last-heartbeat-seconds-ago as described in Consumer Group Metrics to verify that consumers are running.
  5. Wait until all consumers are up.

  6. Start producers in the new cluster.

    Monitor outgoing-byte-rate as described in producer outgoing-byte-rate metrics.

  7. Monitor lag on the destination cluster.

    • To monitor this, use the activity monitor for the cluster on the Cloud Console.

      ../_images/cloud-migrate-cluster-activity-monitor.png
    • You can also run bin/kafka-consumer-groups on the destination cluster as described in Consumer Group Metrics.