By the end of this tutorial, you will have configured two clusters and
successfully used Cluster Linking to create a mirror topic and share topic data
across the clusters. You will also learn how to stop mirroring to make the topic
writable, and verify that the two topics have diverged.
These instructions assume you have a local installation of Confluent Platform 7.0.0 or later, the Confluent CLI, and Java 8, 11, or 17 (recommended).
For details on Java requirements, see Java in System Requirements for Confluent Platform.
If you are new to Confluent Platform, you may want to work through the Quick Start for Confluent Platform first, and then return to this tutorial.
The examples assume that your properties files are in the default locations on your Confluent Platform installation, except as otherwise noted.
This should make it easier to copy/paste example commands directly into your terminal in most cases.
With Confluent Platform is installed, Confluent CLI commands themselves can be run from any directory (kafka-topics, kafka-console-producer, kafka-console-consumer),
but for commands that access properties files in $CONFLUENT_HOME (kafka-server-start), the examples show running these from within that directory.
A reference for these open source utilities is provided in Kafka Command-Line Interface (CLI) Tools.
A reference for Confluent premium command line tools and utilities is provided in CLI Tools for Confluent Platform.
Confluent CLI commands can specify the bootstrap server at the beginning or end of the command: kafka-topics--list--bootstrap-serverlocalhost:9092 is the same
as kafka-topics--bootstrap-serverlocalhost:9092--list. In these tutorials, the target bootstrap server is specified at the end of commands.
The rest of the tutorial refers to $CONFLUENT_HOME to indicate your Confluent Platform install directory.
Set this as an environment variable, for example:
As of Confluent Platform 7.5, ZooKeeper is deprecated for new deployments. Confluent recommends KRaft mode for new deployments.
To learn more about running Kafka in KRaft mode, see KRaft Overview for Confluent Platform
and the KRaft steps in the Platform Quick Start.
This tutorial provides examples for both KRaft mode and ZooKeeper mode.
For KRaft, the examples show a combined mode configuration, where for each cluster the broker and controller run on the same server.
Currently, combined mode is not intended for production use but is shown here to simplify the tutorial.
If you want to run controllers and brokers on separate servers, use KRaft in isolated mode. To learn more, see KRaft Overview for Confluent Platform
and Kraft mode under Configure Confluent Platform for production.
The example deployment in this tutorial uses the following port and feature configurations, and assumes that services will run on localhost.
Source
Destination
Kafka brokers
9092 (location of original demo topic)
9093 (location of demo mirror topic)
KRaft controllers
9094
9095
HTTP listeners
8090
8091
Cluster link
Cluster link is enabled on the destination
Source
Destination
Kafka brokers
9092 (location of original demo topic)
9093 (location of demo mirror topic)
ZooKeeper
2181
2182
HTTP listeners
8090
8091
Cluster link
Cluster link is enabled on the destination
Configure Kafka brokers, controllers, and ZooKeeper files¶
Configure the following files to set up the source and destination clusters. You can copy and
modify the existing properties files to use as a starting point.
Tip
Note that you must add confluent.http.server.listeners and configure it to be unique to each broker.
In KRaft mode, you must set confluent.cluster.link.enable=true on the destination, as shown in the examples for configuring the destination cluster.
Pre 7.0.0 Confluent Platform releases in ZooKeeper mode required adding confluent.cluster.link.enable=true to the destination server properties file to enable Cluster Linking.
Starting with Confluent Platform 7.0.0, Cluster Linking is enabled by default, so this setting is no longer needed and not shown in the configs above.
The source cluster configurations (ports, data directories, and so on) are largely based on the defaults for the
template properties files, with only a few changes. The source cluster must be Confluent Platform 5.4 or later (Apache Kafka® 2.4).
Change directories to the location where Confluent Platform is installed:
cd$CONFLUENT_HOME
Copy
Create a directory to be used for all of your example files:
mkdirmy-examples
Copy
Copy etc/kafka/kraft/server.properties into the examples directory and rename it to match its purpose:
While you configured the source cluster to run on default ports, you must set up
the destination cluster on different ports to avoid collisions, as summarized in
ports mapping. Copy the configuration
files to your examples directory and modify them as shown below.
The destination cluster must be Confluent Platform 7.0.0 or later.
Change directories to the location where Confluent Platform is installed:
cd$CONFLUENT_HOME
Copy
Copy my-examples/server-src.properties (the file you just created) in the examples directory and rename the copy to match its purpose:
Add the following line to server-dst.properties to set the replication factor to 1 for the cluster link topic metadata.
(This configuration is set to simplify this demo. The recommended replication factor in production is 3, which is the default.)
The kafka-storage command is run only once per broker/controller. You cannot use this command to update an existing cluster.
If you make a mistake in configurations at this point, you must recreate the directories from scratch, and work through the steps again.
Start the server for the source cluster by running this command in its own terminal.
Cluster Linking must be enabled on the destination cluster (in /etc/kafka/server-dst.properties), or this will fail.
In ZooKeeper mode on Confluent Platform 7.0.0 and later, Cluster Linking is enabled by default. In KRaft mode, you must explicitly enable it,
as shown in the example configurations.
Initialize the mirror topic.
The following command establishes a mirror topic of the source demo topic, using the cluster link demo-link.
The output shows both the source and destination’s replicas as distinguished by the ClusterLink field,
where - means local and the link name (demo-link in this case) indicates the replica(s) over the cluster link.
Run the following command to populate values for “MirrorState”, “MirrorLastFetchTimeMs”, “MirrorLastFetchHighWatermark”:
This is one of the configurations that will mirror only if it is explicitly set on the source. (Note that retention.ms was not included in the output for the previous step.)
This change may a few minutes to propagate because the default metadata refresh is at 5 minutes.
This can be improved by setting config metadata.max.age.ms when creating the cluster link. For example:
Calculating max offset and ms lag for mirror topics: [demo]
Finished calculating max offset lag and max lag ms for mirror topics: [demo]
Request for stopping topic inventory.arrivals's mirror was successfully scheduled.
Use the describe command with the --pending-stopped-only option to monitor progress.
Copy
Verify that the mirror topic has stopped mirroring.
Run consumers to read from both topics. Your output will show they have diverged.
Run a consumer to read messages from the original topic on the source. (Or simply view the output on this window, if you still have this consumer running)
The output on the source topic now includes the old message.
first
second
third
old
Copy
Run a consumer to read messages from the topic on the destination. (Or simply view the output on this window, if you still have this consumer running.)
If there are mirror topics using the link you want to delete, you must either first unlink the topics by stopping the
mirror for each one, or use the --force option with the kafka-cluster-linksdelete--link command.
Stop all of the other components with Ctl-C in their respective command windows, in reverse order in which you started them.
Stop consumers and producers.
For KRaft mode, stop the destination server first, then stop the source server.
For ZooKeeper mode, stop the Kafka brokers, then their respective ZooKeepers.
Delete the log directories from /tmp.
This will clear out the metadata from your system and enable you to configure and run new local deployments
with no collisions with the legacy metadata.