By the end of this tutorial, you will have configured two clusters and
successfully used Cluster Linking to create a mirror topic and share topic data
across the clusters. You will also learn how to stop mirroring to make the topic
writable, and verify that the two topics have diverged.
These instructions assume you have a local installation of Confluent Platform 7.0.0 or later,
and Java 1.8 or 1.11 (needed for Confluent Platform). If you are new to Confluent Platform, work through the Quick Start for Confluent Platform first, and then return to these tutorials.
The examples assume that your properties files are in the default locations on your Confluent Platform installation, except as otherwise noted.
This should make it easier to copy/paste example commands directly into your terminal in most cases.
With Confluent Platform is installed, Confluent CLI commands themselves can be run from any directory (kafka-topics, kafka-console-producer, kafka-console-consumer),
but for commands that access properties files in $CONFLUENT_HOME (kafka-server-start), the examples show running these from within that directory.
Confluent CLI commands can specify the bootstrap server at the beginning or end of the command: kafka-topics--list--bootstrap-serverlocalhost:9092 is the same
as kafka-topics--bootstrap-serverlocalhost:9092--list. In these tutorials, the target bootstrap server is specified at the end of commands.
The rest of the tutorial refers to $CONFLUENT_HOME, which represents etc/kafka
within your Confluent Platform install directory. Set this as an environment variable, for example:
In $CONFLUENT_HOME/etc/kafka/, configure the following files to set up the source and destination clusters. You can copy
and modify the existing zookeeper.properties and server.properties files to use as a starting point. Be sure to
uncomment any line for which you change a value.
Note that you must add confluent.http.server.listeners and configure it to be unique to each broker.
Files
Configurations
zookeeper-src.properties
dataDir=/tmp/zookeeper-1
clientPort=2181 (this is the default, no changes needed)
zookeeper-dst.properties
dataDir=/tmp/zookeeper-2
clientPort=2182
server-src.properties
listeners=PLAINTEXT://:9092 (this is the default, leave commented out, no changes needed)
log.dirs=/tmp/kafka-logs-1
zookeeper.connect=localhost:2181 (this is the default, no changes needed)
Add the following listener configuration to specify the REST endpoint unique to this broker:
Pre 7.0.0 Confluent Platform releases required adding confluent.cluster.link.enable=true) to the destination server properties file to enable Cluster Linking.
Starting with Confluent Platform 7.0.0, Cluster Linking is enabled by default, so this setting is no longer needed and not shown in the configs above.
The output shows both the source and destination’s replicas as distinguished by the ClusterLink field,
where - means local and the link name (demo-link in this case) indicates the replica(s) over the cluster link.
Run the following command to populate values for “MirrorState”, “MirrorLastFetchTimeMs”, “MirrorLastFetchHighWatermark”:
This is one of the configurations that will mirror only if it is explicitly set on the source. (Note that retention.ms was not included in the output for the previous step.)
This change may a few minutes to propagate because the default metadata refresh is at 5 minutes.
This can be improved by setting config metadata.max.age.ms when creating the cluster link. For example:
Calculating max offset and ms lag for mirror topics: [demo]
Finished calculating max offset lag and max lag ms for mirror topics: [demo]
Request for stopping topic inventory.arrivals's mirror was successfully scheduled.
Use the describe command with the --pending-stopped-only option to monitor progress.
Copy
Verify that the mirror topic has stopped mirroring.
Run consumers to read from both topics. Your output will show they have diverged.
Run a consumer to read messages from the original topic on the source. (Or simply view the output on this window, if you still have this consumer running)
The output on the source topic now includes the old message.
first
second
third
old
Copy
Run a consumer to read messages from the topic on the destination. (Or simply view the output on this window, if you still have this consumer running.)
If there are mirror topics using the link you want to delete, you must either first unlink the topics by stopping the
mirror for each one, or use the --force option with the kafka-cluster-linksdelete--link command.
Stop consumers and producers with Ctl-C in their respective command windows.
Stop all of the other components with Ctl-C in their respective command windows, in reverse order in which you started them. For example, stop the Kafka brokers, then their respective ZooKeepers.