Migrate Topics on Confluent Cloud Clusters¶
Confluent Cloud now offers multiple cluster types. If you want to migrate topic data from one Confluent Cloud cluster to another, you have two different options for doing so:
- Use Cluster Linking as described in Data Migration with Cluster Linking on Confluent Cloud (preferred)
- Use Confluent Replicator as described in the sections below (legacy)
This document describes how to use Replicator to migrate an existing Confluent Cloud dedicated cluster to a standard cluster with minimal application downtime.
For more information on the available cluster types, see Confluent Cloud Features and Limits by Cluster Type.
Tip
- This procedure covers topic data only. You must migrate other aspects of the cluster separately, such as running connectors, ACLs, Streams apps, ksqlDB streams, and schemas. This is noted again at applicable points in the workflow.
- For testing purposes, you can also migrate topics from self-managed development clusters to Confluent Cloud.
Note
Newer versions of Replicator cannot be used to replicate data from early version Kafka clusters to Confluent Cloud. Specifically, Replicator version 5.4.0 or later cannot be used to replicate data from clusters Apache Kafka® v0.10.2 or earlier nor from Confluent Platform v3.2.0 or earlier, to Confluent Cloud. If you have clusters on these earlier versions, use Replicator 5.0.x to replicate to Confluent Cloud until you can upgrade. Keep in mind the following, and plan your upgrades accordingly:
- Kafka Connect workers included in Confluent Platform 3.2 and later are compatible with any Kafka broker that is included in Confluent Platform 3.0 and later as documented in Cross-Component Compatibility.
- Confluent Platform 5.0.x has an end-of-support date of July 31, 2020 as documented in Supported Versions and Interoperability.
Replicator Deployment Options¶
Migrating topic data is achieved by running Replicator in one of three modes. They are functionally equivalent, but you might prefer one over the other based on your starting point.
Replicator Mode | Advantages and Scenarios |
---|---|
As a connector within a distributed Connect cluster (on a VM) | Ideal if you already have a Connect cluster in use with the destination cluster. |
As a packaged executable on a VM | Isolates three easy-to-use config files (for replicator, consumer, producer), and avoids having to explicitly configure the Connect cluster. |
As a packaged executable on Kubernetes | Similar to the above, but might be easier to start as a single isolated task. Ideal if you are already managing tasks within Kubernetes. |
Prerequisite Tasks¶
Before you start topic migration, be sure to plan for network and security considerations, save non-topic metadata, and gather cluster information. These prerequisites are described below.
Choose a location¶
The virtual machine (VM) or Kubernetes (k8s) instances must be deployed in a virtual private cloud (VPC) that has network access to your Confluent Cloud clusters. As a best practice, run Replicator as close to the destination cluster as possible. For Confluent Cloud this means the Replicator cluster should run in the same region as the destination Confluent Cloud deployment.
Note
If one of your Confluent Cloud clusters is VPC-peered, then the Replicator must be deployed in one of the VPCs that is peered to Confluent Cloud.
Determine cluster information¶
Replicator configuration requires that you know:
API keys and secrets for source and destination cluster.
To retrieve these, navigate to Kafka API keys on the Confluent Cloud Console.
The
bootstrap.servers
URL.To get this, navigate to Cluster Settings on the Cloud Console, and note down the URL next to
bootstrap.servers
.
Save non-topic metadata and set up your new cluster before cutover¶
Replicator will not sync ksqlDB jobs, schemas, ACLs, connectors, or service accounts to your new cluster.
Make note of all such non-topic metadata that you want to migrate, and manually recreate these on the new cluster before turning on Replicator to replicate topic data.
Configure consumers with timestamp interceptor for offset translation¶
Configure Java consumers applications with ConsumerTimestampsInterceptor
,
which automatically translates offsets using timestamps. This enables consumers
to start consuming data in the destination cluster where they left off on the
origin cluster.
interceptor.classes=io.confluent.connect.replicator.offsets.ConsumerTimestampsInterceptor
Important
- You can only set the
timestamp-interceptor
for consumers. - Do not set the interceptor for Replicator.
- Use discretion when setting the interceptor on Connect workers; for the interceptor to be useful, the worker must be running sink connectors and these must use Kafka for their offset management.
This interceptor is located under kafka-connect-replicator
in (timestamp-interceptor-current.jar
). This JAR must be available on the CLASSPATH
of the consumer.
Tip
- The location of the JAR file is dependent on your platform and type of Confluent install. For example, on Mac OS install with zip.tar, by default,
timestamp-interceptor-<version>.jar
is in the Confluent directory in/share/java/kafka-connect-replicator
.
The timestamp-interceptor
is located in the Confluent Maven repository:
<repository>
<id>confluent</id>
<url>https://packages.confluent.io/maven/</url>
</repository>
For Maven projects, include the following dependency:
<dependency>
<groupId>io.confluent</groupId>
<artifactId>timestamp-interceptor</artifactId>
<version>current</version>
</dependency>
The ConsumerTimestampsInterceptor
will write to a new topic __consumer_timestamps
to preserve
the required timestamp information for the offset translation. The Replicator reads from this new topic
and will use the timestamps to determine the correct offsets in the secondary cluster.
Important
The ConsumerTimestampsInterceptor
is a producer to the __consumer_timestamps
topic on the source
cluster and as such requires appropriate security configurations. These should be provided with the
timestamps.producer.
prefix. for example, timestamps.producer.security.protocol=SSL
. For more information on security
configurations see:
The interceptor also requires ACLs for the
__consumer_timestamps
topic. The consumer principal requires WRITE and DESCRIBE operations on the __consumer_timestamps
topic.
To learn more, see:
- Understanding Consumer Offset Translation
- Discussion on consumer offsets and timestamp preservation in the whitepaper on Disaster Recovery for Multi-Datacenter Apache Kafka Deployments.
Configure ACLs¶
Replicator must have authorization to read Kafka data from the origin cluster and write Kafka data in the destination Confluent Cloud cluster. Replicator should be run as a Confluent Cloud service account, not with super user credentials, so use the Confluent CLI to configure the appropriate ACLs for the service account id corresponding to Replicator in Confluent Cloud. For more details on Replicator ACLs, see Security.
confluent kafka acl create --allow --service-account <service-account-id> --operation CREATE --topic <replicated-topic>
confluent kafka acl create --allow --service-account <service-account-id> --operation WRITE --topic <replicated-topic>
confluent kafka acl create --allow --service-account <service-account-id> --operation READ --topic <replicated-topic>
confluent kafka acl create --allow --service-account <service-account-id> --operation DESCRIBE --topic <replicated-topic>
confluent kafka acl create --allow --service-account <service-account-id> --operation DESCRIBE-CONFIGS --topic <replicated-topic>
confluent kafka acl create --allow --service-account <service-account-id> --operation ALTER-CONFIGS --topic <replicated-topic>
confluent kafka acl create --allow --service-account <service-account-id> --operation DESCRIBE --cluster-scope
Additionally, you need to set the following license topic ACLs on the internal topic, _confluent-command
.
confluent kafka acl create --allow --service-account <service-account-id> --operation READ --topic _confluent-command
confluent kafka acl create --allow --service-account <service-account-id> --operation WRITE --topic _confluent-command
confluent kafka acl create --allow --service-account <service-account-id> --operation DESCRIBE --topic _confluent-command
The license topic should be created by default, but if for some reason this topic does not already exist, you must set
CREATE along with DESCRIBE ACLs on the cluster. You already set DESCRIBE above, but it is mentioned again here, to clarify its purpose.
Set these ACLs on the --cluster-scope
; this flag takes no arguments. It sets the ACLs on whichever cluster is in use. (To determine
which cluster is in use, type confluent kafka cluster list
. The cluster with the asterisk by it is in use. To use a different cluster,
type confluent kafka cluster use <cluster-id>
.)
confluent kafka acl create --allow --service-account <service-account-id> --operation CREATE --cluster-scope
confluent kafka acl create --allow --service-account <service-account-id> --operation DESCRIBE --cluster-scope
Tip
- If you are running Replicator as an executable, Replicator runs on an internal Connect worker so you need the ACLs described in Worker ACL Requirements.
- If you are running Replicator as a connector, the Replicator configuration, if not indicated, will inherit the configuration from the Connect cluster (which requires the same ACLs as indicated above).
Configure and Run Replicator to migrate topics¶
After accomplishing all prerequisites, configure and run Replicator to migrate topics.
Choose one of the following methods to do so, based on your preference for Replicator modes.
- Run Replicator as an executable
- Run Replicator as a connector
- Run Replicator Docker Container with Kubernetes
Tip
For any of the deployment methods listed below, Replicator can be further tuned using the guide here.
Run Replicator as an executable¶
Tip
- This applies to both the VM and Kubernetes based approaches.
- You can run Replicator executable on any virtual/cloud node or on a physical machine (desktop or laptop) that is connected to the network.
Configure properties¶
There are three config files for the executable (consumer, producer, and replication), and the minimal configuration changes for these are shown below.
consumer.properties
ssl.endpoint.identification.algorithm=https sasl.mechanism=PLAIN bootstrap.servers=<source bootstrap-server> retry.backoff.ms=500 sasl.jaas.config=org.apache.kafka.common.security.plain.PlainLoginModule required username="<api-key>" password="<secret>"; security.protocol=SASL_SSL
producer.properties
ssl.endpoint.identification.algorithm=https sasl.mechanism=PLAIN bootstrap.servers=<destination bootstrap-server> sasl.jaas.config=org.apache.kafka.common.security.plain.PlainLoginModule required username="<api-key>" password="<secret>"; security.protocol=SASL_SSL
replication.properties
- No special configuration is required inreplication.properties
.
Run Replicator¶
Run the Replicator executable to migrate topics.
./bin/replicator --cluster.id replicator --consumer.config consumer.properties --producer.config producer.properties --topic.regex '.*'
Tip
See also, Run Replicator as an executable and Command Line Parameters of Replicator Executable in the Replicator documentation.
Run Replicator as a connector¶
The connector JSON should look like this:
{
"name": "replicate-topic",
"config": {
"connector.class": "io.confluent.connect.replicator.ReplicatorSourceConnector",
"key.converter": "io.confluent.connect.replicator.util.ByteArrayConverter",
"value.converter": "io.confluent.connect.replicator.util.ByteArrayConverter",
"src.kafka.ssl.endpoint.identification.algorithm":"https",
"src.kafka.sasl.mechanism":"PLAIN",
"src.kafka.request.timeout.ms":"20000",
"src.kafka.bootstrap.servers":"<source bootstrap server>",
"src.kafka.retry.backoff.ms":"500",
"src.kafka.sasl.jaas.config":"org.apache.kafka.common.security.plain.PlainLoginModule required username=\"<api-key>\" password=\"<secret>\";",
"src.kafka.security.protocol":"SASL_SSL",
"dest.kafka.ssl.endpoint.identification.algorithm":"https",
"dest.kafka.sasl.mechanism":"PLAIN",
"dest.kafka.request.timeout.ms":"20000",
"dest.kafka.bootstrap.servers":"<destination bootstrap server>",
"dest.kafka.retry.backoff.ms":"500",
"dest.kafka.sasl.jaas.config":"org.apache.kafka.common.security.plain.PlainLoginModule required username=\"<api-key>\" password=\"<secret>\";",
"dest.kafka.security.protocol":"SASL_SSL",
"dest.topic.replication.factor":"3",
"topic.regex":".*"
}
}
If you have not already done so, configure the distributed Connect cluster correctly as shown here.
ssl.endpoint.identification.algorithm=https
sasl.mechanism=PLAIN
bootstrap.servers=<dest bootstrap server>
sasl.jaas.config=org.apache.kafka.common.security.plain.PlainLoginModule required username="<api-key>" password="<secret>";
security.protocol=SASL_SSL
producer.ssl.endpoint.identification.algorithm=https
producer.sasl.mechanism=PLAIN
producer.bootstrap.servers=<dest bootstrap server>
producer.sasl.jaas.config=org.apache.kafka.common.security.plain.PlainLoginModule required username="<api-key>" password="<secret>";
producer.security.protocol=SASL_SSL
key.converter=org.apache.kafka.connect.storage.StringConverter
value.converter=org.apache.kafka.connect.json.JsonConverter
group.id=connect-replicator
config.storage.topic=connect-configs1
offset.storage.topic=connect-offsets1
status.storage.topic=connect-statuses1
plugin.path=<confluent install dir>/share/java
Tip
See also, Run Replicator as a Connector in the Replicator documentation.
Run Replicator Docker Container with Kubernetes¶
Delete existing secret (if it exists).
kubectl delete secret replicator-secret-props
Regenerate configs, if changed. (See Quick Start.)
Upload the new secret.
kubectl create secret generic replicator-secret-props --from-file=/tmp/replicator/
Reload pods.
kubectl apply -f container/replicator-deployment.yaml
Here is an example
replicator-deployment.yaml
.apiVersion: extensions/v1beta1 kind: Deployment metadata: name: repl-exec-connect-cluster spec: replicas: 1 template: metadata: labels: app: replicator-app spec: containers: - name: confluent-replicator image: confluentinc/cp-enterprise-replicator-executable env: - name: CLUSTER_ID value: "replicator-k8s" - name: CLUSTER_THREADS value: "1" - name: CONNECT_GROUP_ID value: "containerized-repl" # Note: This is to avoid _overlay errors_ . You could use /etc/replicator/ here instead. - name: REPLICATION_CONFIG value: "/etc/replicator-config/replication.properties" - name: PRODUCER_CONFIG value: "/etc/replicator-config/producer.properties" - name: CONSUMER_CONFIG value: "/etc/replicator-config/consumer.properties" volumeMounts: - name: replicator-properties mountPath: /etc/replicator-config/ volumes: - name: replicator-properties secret: secretName: "replicator-secret-props" defaultMode: 0666
Verify status.
kubectl get pods kubectl logs <pod-name> -f
Cutover clients to new cluster¶
After Replicator is set up, cut over clients to the new cluster as follows.
Confirm that data is replicating from the source to the destination by viewing the replication lag on the Replication tab on Confluent Cloud Console.
Stop producers.
Wait for consumer lag to go to zero.
To monitor this, run
bin/kafka-consumer-groups
as described in here and/or navigate to Consumer Lag on the Cloud Console and select a consumer group.Wait for replicator lag to go to zero by looking at the Replicator consumer group lag, then restart consumers to point at the new cluster.
- To monitor replicator lag, run
bin/kafka-consumer-groups
as described in here and/or navigate to Consumer Lag on the Cloud Console and select a consumer group. - After restarting consumers, use
last-heartbeat-seconds-ago
as described in Consumer Group Metrics to verify that consumers are running.
- To monitor replicator lag, run
Wait until all consumers are up.
Start producers in the new cluster.
Monitor
outgoing-byte-rate
as described in producer outgoing-byte-rate metrics.Monitor lag on the destination cluster.
To monitor this, use the activity monitor for the cluster on the Cloud Console.
You can also run
bin/kafka-consumer-groups
on the destination cluster as described in Consumer Group Metrics.
Quick Start¶
This quick start assumes:
- You are migrating topics between two Confluent Cloud clusters on Amazon Web Services (AWS).
- You are running Replicator on an Ubuntu VM in Amazon EC2.
- You are running Replicator as an executable.
In real-world scenarios, you might migrate clusters from any other cloud platform nodes (for example, GCP and Google Cloud Console Google Cloud Console or Microsoft Azure). The same general procedure applies.
Set up a Cloud instance¶
Install Java.
sudo apt-get install default-jre
Use APT to install the full Confluent Platform as described in Manual Install using Systemd on Ubuntu and Debian.
Configure properties¶
There are three config files for consumer, producer, and replication. The minimal configuration changes for these are shown below.
Tip
Replace bootstrap.servers
and sasl.jaas.config
here with the corresponding values for the Confluent Cloud clusters.
consumer.properties
bootstrap.servers=<source bootstrap server> ssl.endpoint.identification.algorithm=https sasl.mechanism=PLAIN sasl.jaas.config=org.apache.kafka.common.security.plain.PlainLoginModule required username="<username>" password="<password>"; security.protocol=SASL_SSL
producer.properties
bootstrap.servers=<destination bootstrap server> ssl.endpoint.identification.algorithm=https sasl.mechanism=PLAIN sasl.jaas.config=org.apache.kafka.common.security.plain.PlainLoginModule required username="<username>" password="<password>"; security.protocol=SASL_SSL
replication.properties
Replace “Movies” for the
topic.whitelist
with the topics you want to replicate from the source cluster.topic.whitelist=Movies topic.rename.format=${topic}-replica topic.auto.create=true topic.timestamp.type=CreateTime dest.topic.replication.factor=3
Tip
In the example above, topic.rename.format
, topic.auto.create
, and topic.timestamp.type
are set to the defaults, and therefore not really necessary to include. If you want to change the values for these, include them in replication.properties
with custom values.
Run Replicator¶
Run the Replicator executable to migrate topics.
confluent-5.3.0/bin/replicator \
--consumer.config ./diyrepl/consumer.properties \
--producer.config ./diyrepl/producer.properties \
--cluster.id replicator \
--replication.config ./diyrepl/replication.properties
Verify Topic Migration¶
To verify topic migration is successful, log in to the Confluent CLI and run commands to: verify that your topic data has been replicated onto the target cluster.
After checking the current contents of source and destination clusters, perform the cutover of clients to the new cluster, and run consumers to read from the destination topic on the new cluster.
Check current contents of destination topic¶
List the clusters and note the destination cluster ID.
confluent kafka cluster list
Select the destination cluster.
confluent kafka cluster use <ID-for-destination-cluster>
Run the consumer to read from your topic on the destination (for example,
Movies-replica
).confluent kafka topic consume --from-beginning 'Movies-replica'
Check current contents of source topic¶
Select the destination cluster.
confluent kafka cluster use <ID-for-source-cluster>
Run the consumer to read from your topic on the source (for example,
Movies
).confluent kafka topic consume --from-beginning 'Movies'
Add something new to the source topic¶
Produce to the source topic.
confluent kafka topic produce Movies
Cutover to new cluster¶
Switch producers and consumers to the new cluster, as described in Cutover clients to new cluster.
Check logs and destination topic¶
Check output of running replicator task for logs indicating processing.
Run the consumer to read from your destination topic again.
confluent kafka topic consume --from-beginning 'Movies'
Migrate Topics from Dev Clusters to Confluent Cloud¶
Development clusters are a great place to experiment with Confluent Platform features before deploying to Confluent Cloud. However, these clusters usually have relaxed resiliency requirements that can introduce challenges when migrating to Confluent Cloud.
If you want to migrate topic data from a dev cluster to Confluent Cloud, you can use Replicator to do so. The procedure is the same as that for migrating topics across cloud deployments here but requires the following additional considerations.
Migrate Topics from Confluent Cloud¶
You can use Replicator to migrate topics and schemas from Confluent Cloud.
Important
When replicating topics from Confluent Cloud, you must pre-create the topics on the
destination cluster and set topic.auto.create=false
and
topic.config.sync=false
.
Resiliency¶
Confluent Cloud topics are tuned for resiliency with the following configurations:
replication.factor=3
min.insync.replicas=2
As of 5.5, you should set dest.topic.replication.factor
to be 3, and enable topic.auto.create
.
Note
By default, Replicator preserves topic configurations when replicating topic data and it is unusual
for a development cluster to meet the Confluent Cloud requirements (for instance, at least 3 brokers
are required to support a replication.factor
of 3).
To address this, disable the topic configuration synchronization and auto creation features of Replicator:
topic.config.sync=false
topic.auto.create=false
Licensing¶
By default, Replicator stores the Confluent Platform license on the destination cluster.