Transfer Data into Confluent Cloud¶
You can transfer data to Confluent Cloud from existing datacenters or cloud instances. This topic provides general guidelines for transferring your data.
Determining What Data is Important¶
Confluent Cloud subscriptions are sized for a specified retention time. Determine how much storage is allocated to your subscription level and then determine how much data that you want to copy.
Transfer Using Confluent Replicator¶
Confluent Cloud does not expose the ZooKeeper configuration to users. Confluent Replicator does not require a direct connection to Apache ZooKeeper™.
In this example, the source is the current cluster and the destination is Confluent Cloud. A separate node called a worker node is also configured to run Confluent Replicator.
Configure the destination endpoint using the following configurations:
confluent.topic.replication.factor=3
to define the replication factor for confluent-licensing.- Use
confluent.topic.
prefix to pass any required properties to the admin client used by confluent-licensing (e.g. security properties).
Send the required bootstrap server information to the admin client used by replicator for the Confluent Cloud destination cluster. Add the
dest.kafka.
prefix to thebootstrap.servers
parameter. For example:dest.kafka.bootstrap.servers=[your Confluent Cloud bootstrap broker list]
Add the following required security parameters. These allow Confluent Replicator to communicate with the Confluent Cloud destination cluster.
ssl.endpoint.identification.algorithm=https sasl.mechanism=PLAIN security.protocol=SASL_SSL # REQUIRED: Specifies Confluent Cloud authentication. Refer to the SASL # properties in ``~/.ccloud/config.json`` to get this info. sasl.jaas.config=<sasl.jaas.config> request.timeout.ms=20000 retry.backoff.ms=500 producer.ssl.endpoint.identification.algorithm=https producer.sasl.mechanism=PLAIN producer.security.protocol=SASL_SSL # REQUIRED: Specifies Confluent Cloud authentication. Refer to the SASL # properties in ``~/.ccloud/config.json`` to get this info. producer.sasl.jaas.config=<sasl.jaas.config> producer.request.timeout.ms=20000 producer.retry.backoff.ms=500
Configure the origin endpoint. Decide where the Confluent Replicator metadata will be stored. This will largely be determined by the version of Confluent Platform or Apache Kafka® that the origin endpoint is using.
- If the origin endpoint is using Confluent Platform 3.3.1 or earlier or Kafka 0.11.0 or earlier, the Replicator metadata
must be stored in Apache ZooKeeper™. Add the
src.zookeeper
prefix for to thesrc.kafka.bootstrap.servers
parameter, as shown in the Confluent Replicator Quick Start. - If the origin endpoint is Confluent Platform 4.0.0 or later or Kafka 1.0.0 or later, the Replicator metadata can be stored
directly in Kafka. Use the
src.kafka.bootstrap.servers
configuration to connect to the origin. - If the origin endpoint is secure, configure access to the origin cluster by following the Confluent Replicator Documentation.
Here is an example configuration file with a Confluent Platform or Kafka origin cluster:
name=replicator-source connector.class=io.confluent.connect.replicator.ReplicatorSourceConnector tasks.max=4 confluent.topic.replication.factor=3 key.converter=io.confluent.connect.replicator.util.ByteArrayConverter value.converter=io.confluent.connect.replicator.util.ByteArrayConverter src.kafka.bootstrap.servers=localhost:9082 dest.kafka.bootstrap.servers=bootstrap.confluent.cloud:9092 topic.whitelist=test-topic topic.rename.format=${topic}.replica ssl.endpoint.identification.algorithm=https sasl.mechanism=PLAIN # REQUIRED: Specifies Confluent Cloud authentication. Refer to the SASL # properties in ``~/.ccloud/config.json`` to get this info. security.protocol=SASL_SSL sasl.jaas.config=<sasl.jaas.config> request.timeout.ms=20000 retry.backoff.ms=500 producer.ssl.endpoint.identification.algorithm=https producer.sasl.mechanism=PLAIN producer.security.protocol=SASL_SSL # REQUIRED: Specifies Confluent Cloud authentication. Refer to the SASL # properties in ``~/.ccloud/config.json`` to get this info. producer.sasl.jaas.config=<sasl.jaas.config> producer.request.timeout.ms=20000 producer.retry.backoff.ms=500
- If the origin endpoint is using Confluent Platform 3.3.1 or earlier or Kafka 0.11.0 or earlier, the Replicator metadata
must be stored in Apache ZooKeeper™. Add the
(Optional) Add configs for Confluent Cloud Schema Registry per the example in replicator-to-ccloud-producer.delta on GitHub at ccloud/examples/template_delta_configs.
# Confluent Schema Registry configuration basic.auth.credentials.source=USER_INFO schema.registry.basic.auth.user.info=<SCHEMA_REGISTRY_API_KEY>:<SCHEMA_REGISTRY_API_SECRET> schema.registry.url=https://<SCHEMA_REGISTRY_ENDPOINT>
Run Confluent Replicator until all past data has been copied. Confluent Replicator reports lag in the JMX metrics that are exposed by the replicator worker JVMs. Each client emits its offset via an MBean with the following name:
kafka.consumer:type=consumer-fetch-manager-metrics,client-id={client.id}``
Within the MBean, the following metrics measure lag:
{topic}-{partition number}.records-lag
The lag metrics should gradually approach zero and stabilize as Confluent Replicator runs.
At this point, the destination cluster is caught up to the origin cluster, and Confluent Replicator is copying data that is currently being produced to your origin cluster in near real-time. You can begin transitioning your workload to Confluent Cloud as described below.
Transfer Using Kafka MirrorMaker¶
If you are running an older version of Confluent Platform or migrating data from an Kafka version without access to Replicator, you can use Kafka MirrorMaker.
Manually create the topics in Confluent Cloud that will be migrated with exactly the same configs as in the origin cluster. For more information on how to do this, see Confluent Cloud CLI Quick Start.
If you use ACLs on topics make sure you set those in the Confluent Cloud cluster as well.
Run Kafka MirrorMaker.
Upload
kafka-<version>.tgz
to the origin instance of Kafka that you will use to run MirrorMaker. Place the file in/tmp
.Extract
kafka-<version>.tgz
to/usr/local
:tar xvzf /tmp/kafka-<version>.tgz -C /usr/local
Change ownership of the new Kafka installation to the non-root user ID that will be used to run Kafka (represented here by
user1
):sudo chown -R user1:user1 /usr/local/kafka_<version>
Define MirrorMaker’s producer and consumer configs. The consumer config should include the security credentials that correspond to your origin cluster, while the producer config should include your credentials in Confluent Cloud.
Run Kafka MirrorMaker directly with this command.
/usr/local/kafka_<version>/bin/kafka-mirror-maker.sh --consumer.config \ /home/user1/mm-consumer.config --producer.config \ /home/user1/mm-producer.config --whitelist <your-topics>
Tip
- Change
user1
to the appropriate local Kafka user ID and specify the topics to be copied in--whitelist
. - If you are transferring a large amount of data, you can create a long running
systemd
service to manage the copying.
- Change
Wait for Kafka MirrorMaker to copy all of the data. You can use the following command to track the progress of the copy:
bin/kafka-consumer-offset-checker.sh --group KafkaMirror \ --bootstrap-server bootstrap.confluent.cloud:9092 --topic test-topic
As MirrorMaker runs, the numbers in the Lag column should gradually approach zero and stabilize. For more information, see the Apache Kafka MirrorMaker documentation.
At this point, the destination cluster is caught up to the origin cluster, and Kafka MirrorMaker is copying data that is currently being produced to your origin cluster in near real-time. You can begin transitioning your workload to Confluent Cloud as described below.
Transitioning to Confluent Cloud¶
After your existing data has been replicated to your new Confluent Cloud instance you must point your existing applications at that instance.
Validate that the data was copied correctly.
- Review the previous steps to determine the lag, but do not only rely on those statistics.
- Use console consumer commands to validate that the data you expect has been copied to the Confluent Cloud.
Bring down producers in the origin cluster and take note of the time. This time is used in a later step.
Stop the consumers. This should be done soon after stopping the producers, and after the consumers have processed the last written messages. No new data is being sent to the origin cluster.
Reset the offsets for all of the consumer groups for your applications in Confluent Cloud based on the producer stop timestamp. You can also get this information by using the
kafka-consumer-groups
command. For more information, see KIP-122.Confirm that the offsets were reset successfully using this command:
kafka-consumer-groups
Start the consumers and point them towards Confluent Cloud.
Start the producers and point them towards Confluent Cloud
Check either Confluent Replicator or Kafka MirrorMaker to ensure that all data has been copied.
Stop either Confluent Replicator or Kafka MirrorMaker.
Stop the origin cluster.
Dual Production of Data¶
If you don’t want to copy the existing data to the Confluent Cloud cluster, you can continue running your origin cluster and produce messages to both clusters. You can run your origin cluster for at least as long as the Confluent Cloud retention time (specified during provisioning). After all producers and consumers have been transitioned you can then shutdown the origin cluster.
Important
- Connect your consumers to only one cluster at a time.
- Understand your environment and test before committing to this process.
Suggested Reading¶
- For more about Replicator, see Replicator for Multi-Datacenter Replication.
- To view a working example of hybrid Apache Kafka® clusters from self-hosted to Confluent Cloud, see the Confluent Cloud demo.
- For example configs for all Confluent Platform components and clients connecting to Confluent Cloud, see template examples for components.
- To look at all the code used in the Confluent Cloud demo, see the Confluent Cloud demo examples.