Migrate Topics Across Confluent Cloud Clusters

Confluent Cloud now offers a choice of dedicated and standard clusters.

Standard clusters do not have any hourly cost (pricing is based on usage) and can elastically scale up and down from 0 to 100 MB/s of throughput.

If you want to migrate topic data from one Confluent Cloud cluster to another, you can use Confluent Replicator to do so. This document describes how to use Replicator to migrate an existing Confluent Cloud dedicated cluster to a standard cluster with minimal application downtime.

Tip

This procedure covers topic data only. You must migrate other aspects of the cluster separately, such as running connectors, ACLs, Streams apps, KSQL streams, and schemas. This is noted again at applicable points in the workflow.

Replicator Deployment Options

Migrating topic data is achieved by running Replicator in one of three modes. They are functionally equivalent, but you might prefer one over the other based on your starting point.

Replicator Mode Advantages and Scenarios
As a connector within a distributed Connect cluster (on a VM) Ideal if you already have a Connect cluster in use with the destination cluster.
As a packaged executable on a VM Isolates three easy-to-use config files (for replicator, consumer, producer), and avoids having to explicitly configure the Connect cluster.
As a packaged executable on Kubernetes Similar to the above, but might be easier to start as a single isolated task. Ideal if you are already managing tasks within Kubernetes.

Prerequisite Tasks

Before you start topic migration, be sure to plan for network and security considerations, save non-topic metadata, and gather cluster information. These prerequisites are described below.

Choose a location

The virtual machine (VM) or Kubernetes (k8s) instances must be deployed in a virtual private cloud (VPC) that has network access to your Confluent Cloud clusters.

Note

If one of your Confluent Cloud clusters is VPC-peered, then the Replicator must be deployed in one of the VPCs that is peered to Confluent Cloud.

Determine cluster information

Replicator configuration requires that you know:

  • API keys and secrets for source and destination cluster.

    To retrieve these, navigate to Cluster Settings -> API Access on the Confluent Cloud web UI.

  • The bootstrap.servers URL.

    To get this, navigate to Cluster Settings on the Confluent Cloud web UI, and note down the URL next to bootstrap.servers.

Save non-topic metadata and set up your new cluster before cutover

Replicator will not sync KSQL jobs, schemas, ACLs, connectors, or service accounts to your new cluster.

Make note of all such non-topic metadata that you want to migrate, and manually recreate these on the new cluster before turning on Replicator to replicate topic data.

Configure and Run Replicator to migrate topics

After accomplishing all prerequisites, configure and run Replicator to migrate topics.

Choose one of the following methods to do so, based on your preference for Replicator modes.

Run Replicator as an executable

Tip

  • This applies to both the VM and Kubernetes based approaches.
  • You can run Replicator executable on any virtual/cloud node or on a physical machine (desktop or laptop) that is connected to the network.

Configure properties

There are three config files for the executable (consumer, producer, and replication), and the minimal configuration changes for these are shown below.

  • consumer.properties

    ssl.endpoint.identification.algorithm=https
    sasl.mechanism=PLAIN
    request.timeout.ms=20000
    bootstrap.servers=<source bootstrap-server>
    retry.backoff.ms=500
    sasl.jaas.config=org.apache.kafka.common.security.plain.PlainLoginModule required username="<api-key>" password="<secret>";
    security.protocol=SASL_SSL
    
  • producer.properties

    ssl.endpoint.identification.algorithm=https
    sasl.mechanism=PLAIN
    request.timeout.ms=20000
    bootstrap.servers=<destination bootstrap-server>
    retry.backoff.ms=500
    sasl.jaas.config=org.apache.kafka.common.security.plain.PlainLoginModule required username="<api-key>" password="<secret>";
    security.protocol=SASL_SSL
    
  • replication.properties - No special configuration is required in replication.properties.

Run Replicator

Run the Replicator executable to migrate topics.

./bin/replicator --cluster.id replicator --consumer.config consumer.properties --producer.config producer.properties --topic.regex '.*'

Run Replicator as a connector

The connector JSON should look like this:

{
"name": "replicate-topic",
"config": {
    "connector.class": "io.confluent.connect.replicator.ReplicatorSourceConnector",
    "key.converter": "io.confluent.connect.replicator.util.ByteArrayConverter",
    "value.converter": "io.confluent.connect.replicator.util.ByteArrayConverter",
    "src.kafka.ssl.endpoint.identification.algorithm":"https",
    "src.kafka.sasl.mechanism":"PLAIN",
    "src.kafka.request.timeout.ms":"20000",
    "src.kafka.bootstrap.servers":"<source bootstrap server>",
    "src.kafka.retry.backoff.ms":"500",
    "src.kafka.sasl.jaas.config":"org.apache.kafka.common.security.plain.PlainLoginModule required username=\"<api-key>\" password=\"<secret>\";",
    "src.kafka.security.protocol":"SASL_SSL",
    "dest.kafka.ssl.endpoint.identification.algorithm":"https",
    "dest.kafka.sasl.mechanism":"PLAIN",
    "dest.kafka.request.timeout.ms":"20000",
    "dest.kafka.bootstrap.servers":"<destination bootstrap server>",
    "dest.kafka.retry.backoff.ms":"500",
    "dest.kafka.sasl.jaas.config":"org.apache.kafka.common.security.plain.PlainLoginModule required username=\"<api-key>\" password=\"<secret>\";",
    "dest.kafka.security.protocol":"SASL_SSL",
    "topic.regex":".*"
    }
}

If you have not already done so, configure the distributed Connect cluster correctly as shown here.

ssl.endpoint.identification.algorithm=https
sasl.mechanism=PLAIN
request.timeout.ms=20000
bootstrap.servers=<dest bootstrap server>
retry.backoff.ms=500
sasl.jaas.config=org.apache.kafka.common.security.plain.PlainLoginModule required username="<api-key>" password="<secret>";
security.protocol=SASL_SSL
producer.ssl.endpoint.identification.algorithm=https
producer.sasl.mechanism=PLAIN
producer.request.timeout.ms=20000
producer.bootstrap.servers=<dest bootstrap server>
producer.retry.backoff.ms=500
producer.sasl.jaas.config=org.apache.kafka.common.security.plain.PlainLoginModule required username="<api-key>" password="<secret>";
producer.security.protocol=SASL_SSL
key.converter=org.apache.kafka.connect.storage.StringConverter
value.converter=org.apache.kafka.connect.json.JsonConverter
group.id=connect-replicator
config.storage.topic=connect-configs1
offset.storage.topic=connect-offsets1
status.storage.topic=connect-statuses1
plugin.path=<confluent install dir>/share/java

Run Replicator Docker Container with Kubernetes

  1. Delete existing secret (if it exists).

    kubectl delete secret replicator-secret-props
    
  2. Regenerate configs, if changed. (See Quick Start.)

  3. Upload the new secret.

    kubectl create secret generic replicator-secret-props --from-file=/tmp/replicator/
    
  4. Reload pods.

    kubectl apply -f container/replicator-deployment.yaml
    

    Here is an example replicator-deployment.yaml.

    apiVersion: extensions/v1beta1
    kind: Deployment
    metadata:
      name: repl-exec-connect-cluster
    spec:
      replicas: 1
      template:
        metadata:
          labels:
            app: replicator-app
        spec:
          containers:
            - name: confluent-replicator
              image: confluentinc/cp-enterprise-replicator-executable
              env:
                - name: CLUSTER_ID
                  value: "replicator-k8s"
                - name: CLUSTER_THREADS
                  value: "1"
                - name: CONNECT_GROUP_ID
                  value: "containerized-repl"
    
                  # Note: This is to avoid _overlay errors_ . You could use /etc/replicator/ here instead.
                - name: REPLICATION_CONFIG
                  value: "/etc/replicator-config/replication.properties"
                - name: PRODUCER_CONFIG
                  value: "/etc/replicator-config/producer.properties"
                - name: CONSUMER_CONFIG
                  value: "/etc/replicator-config/consumer.properties"
              volumeMounts:
                - name: replicator-properties
                  mountPath: /etc/replicator-config/
          volumes:
            - name: replicator-properties
              secret:
                secretName: "replicator-secret-props"
                defaultMode: 0666
    
  5. Verify status.

    kubectl get pods kubectl logs <pod-name> -f
    

Tip

See also, this examples demo of Confluent Platform deployed on Google Kubernetes Engine (GKE) that replicates data to a Confluent Cloud cluster.

Cutover clients to new cluster

After Replicator is set up, cut over clients to the new cluster as follows.

  1. Confirm that data is replicating from the source to the destination by viewing the replication lag on the Replication tab on Confluent Cloud web UI.

  2. Stop producers.

  3. Wait for consumer lag to go to zero.

    To monitor this, run bin/kafka-consumer-groups as described in here and/or navigate to Consumer Lag on the Confluent Cloud web UI and select a consumer group.

  4. Wait for replicator lag to go to zero by looking at the Replicator consumer group lag, then restart consumers to point at the new cluster.

    • To monitor replicator lag, run bin/kafka-consumer-groups as described in here and/or navigate to Consumer Lag on the Confluent Cloud web UI and select a consumer group.
    • After restarting consumers, use last-heartbeat-seconds-ago as described in Consumer Group Metrics to verify that consumers are running.
  5. Wait until all consumers are up.

  6. Start producers in the new cluster.

    Monitor outgoing-byte-rate as described in Producer Global Request Metrics.

  7. Monitor lag on the destination cluster.

    • To monitor this, use the activity monitor for the cluster on the Confluent Cloud web UI.

      ../_images/cloud-migrate-cluster-activity-monitor.png
    • You can also run bin/kafka-consumer-groups on the destination cluster as described in Consumer Group Metrics.

Quick Start

This quick start assumes:

  • You are migrating topics between two Confluent Cloud clusters on Amazon Web Services (AWS).
  • You are running Replicator on an Ubuntu VM in Amazon EC2.
  • You are running Replicator as an executable.

In real-world scenarios, you might migrate clusters from any other cloud platform nodes (for example, GCP and Google Cloud Console Google Cloud Console or Microsoft Azure). The same general procedure applies.

Tip

This quick start takes you through the full manual setup and run steps. For scripted deployments on RHEL and Ubuntu, see the scripts on GitHub.

Set up a Cloud instance

  1. Install Java.

    sudo apt-get install default-jre
    
  2. Use APT to install the full Confluent Platform as described in Manual Install using Systemd on Ubuntu and Debian.

Configure properties

There are three config files for consumer, producer, and replication. The minimal configuration changes for these are shown below.

Tip

Replace bootstrap.servers and sasl.jaas.config here with the corresponding values for the Confluent Cloud clusters.

  • consumer.properties

    bootstrap.servers=<source bootstrap server>
    ssl.endpoint.identification.algorithm=https
    sasl.mechanism=PLAIN
    request.timeout.ms=20000
    retry.backoff.ms=500
    sasl.jaas.config=org.apache.kafka.common.security.plain.PlainLoginModule required username="<username>" password="<password>";
    security.protocol=SASL_SSL
    
  • producer.properties

    bootstrap.servers=<destination bootstrap server>
    ssl.endpoint.identification.algorithm=https
    sasl.mechanism=PLAIN
    request.timeout.ms=20000
    retry.backoff.ms=500
    sasl.jaas.config=org.apache.kafka.common.security.plain.PlainLoginModule required username="<username>" password="<password>";
    security.protocol=SASL_SSL
    
  • replication.properties

    Replace “Movies” for the topic.whitelist with the topics you want to replicate from the source cluster.

    topic.whitelist=Movies
    
    topic.rename.format=${topic}-replica
    topic.auto.create=true
    topic.timestamp.type=CreateTime
    

Tip

In the example above, topic.rename.format, topic.auto.create, and topic.timestamp.type are set to the defaults, and therefore not really necessary to include. If you want to change the values for these, include them in replication.properties with custom values.

Run Replicator

Run the Replicator executable to migrate topics.

confluent-5.3.0/bin/replicator \
  --consumer.config ./diyrepl/consumer.properties \
  --producer.config ./diyrepl/producer.properties \
  --cluster.id replicator  \
  --replication.config ./diyrepl/replication.properties

Verify Topic Migration

To verify topic migration is successful, Log in to the Confluent Cloud CLI and run commands to: verify that your topic data has been replicated onto the target cluster.

After checking the current contents of source and destination clusters, perform the cutover of clients to the new cluster, and run consumers to read from the destination topic on the new cluster.

Check current contents of destination topic

  1. List the clusters and note the destination cluster ID.

    ccloud kafka cluster list
    
  2. Select the destination cluster.

    ccloud kafka cluster use <ID-for-destination-cluster>
    
  3. Run the consumer to read from your topic on the destination (for example, Movies-replica).

    ccloud kafka topic consume --from-beginning 'Movies-replica'
    

Check current contents of source topic

  1. Select the destination cluster.

    ccloud kafka cluster use <ID-for-source-cluster>
    
  2. Run the consumer to read from your topic on the source (for example, Movies).

    ccloud kafka topic consume --from-beginning 'Movies'
    

Add something new to the source topic

Produce to the source topic.

ccloud kafka topic produce Movies

Cutover to new cluster

Switch producers and consumers to the new cluster, as described in Cutover clients to new cluster.

Check logs and destination topic

  1. Check output of running replicator task for logs indicating processing.

  2. Run the consumer to read from your destination topic again.

    ccloud kafka topic consume --from-beginning 'Movies'