Tutorial: Use Cluster Linking to Share Data Across Topics¶
This tutorial provides a hands-on look at how to use Cluster Linking for a topic data sharing use case.
What the Tutorial Covers¶
By the end of this tutorial, you will have configured two clusters and successfully used Cluster Linking to create a mirror topic and share topic data across the clusters. You will also learn how to stop mirroring to make the topic writable, and verify that the two topics have diverged.
About Prerequisites and Command Examples¶
- These instructions assume you have a local installation of Confluent Platform 7.0.0 or later, and Java 1.8 or 1.11 (needed for Confluent Platform). If you are new to Confluent Platform, work through the Quick Start for Confluent Platform first, and then return to these tutorials.
- The examples assume that your properties files are in the default locations on your Confluent Platform installation, except as otherwise noted. This should make it easier to copy/paste example commands directly into your terminal in most cases.
- With Confluent Platform is installed, Confluent CLI commands themselves can be run from any directory (
kafka-topics
,kafka-console-producer
,kafka-console-consumer
), but for commands that access properties files in$CONFLUENT_HOME
(kafka-server-start
), the examples show running these from within that directory. - Confluent CLI commands can specify the bootstrap server at the beginning or end of the command:
kafka-topics --list --bootstrap-server localhost:9092
is the same askafka-topics --bootstrap-server localhost:9092 --list
. In these tutorials, the target bootstrap server is specified at the end of commands.
The rest of the tutorial refers to $CONFLUENT_HOME, which represents etc/kafka
within your Confluent Platform install directory. Set this as an environment variable, for example:
export CONFLUENT_HOME=$HOME/confluent-7.3.10
PATH=$CONFLUENT_HOME/bin:$PATH
Ports and Configuration Mapping¶
The example deployment in this tutorial uses the following port and feature configurations, and assumes that services will run on localhost
.
Source | Destination | |
---|---|---|
Kafka Brokers | 9092 (location of original demo topic) |
9093 (location of demo mirror topic) |
ZooKeeper | 2181 | 2182 |
HTTP listeners | 8090 | 8091 |
Cluster link | Cluster link is enabled on the destination |
Configure Kafka and ZooKeeper files¶
In $CONFLUENT_HOME/etc/kafka/
, configure the following files to set up the source and destination clusters. You can copy
and modify the existing zookeeper.properties
and server.properties
files to use as a starting point. Be sure to
uncomment any line for which you change a value.
Note that you must add confluent.http.server.listeners
and configure it to be unique to each broker.
Files | Configurations |
---|---|
zookeeper-src.properties |
|
zookeeper-dst.properties |
|
server-src.properties |
Add the following listener configuration to specify the REST endpoint unique to this broker:
|
server-dst.properties |
Add the following listener configuration to specify the REST endpoint unique to this broker:
|
Tip
Pre 7.0.0 Confluent Platform releases required adding confluent.cluster.link.enable=true
) to the destination server properties file to enable Cluster Linking.
Starting with Confluent Platform 7.0.0, Cluster Linking is enabled by default, so this setting is no longer needed and not shown in the configs above.
Start the source cluster¶
Run each of the following commands in separate command windows from $CONFLUENT_HOME
.
Start the ZooKeeper server for the source cluster.
./bin/zookeeper-server-start etc/kafka/zookeeper-src.properties
Start a Kafka broker for the source cluster.
./bin/kafka-server-start etc/kafka/server-src.properties
Note
The source cluster must be Confluent Platform 5.4 or later (Apache Kafka® 2.4).
Start the destination cluster¶
Run each of the following commands in separate command windows from $CONFLUENT_HOME
.
Start the ZooKeeper server for the destination cluster.
./bin/zookeeper-server-start etc/kafka/zookeeper-dst.properties
Start a Kafka broker for the destination cluster.
./bin/kafka-server-start etc/kafka/server-dst.properties
Note
The destination cluster must be Confluent Platform 7.0.0 or later.
Populate the source cluster¶
Create a topic on the source cluster.
kafka-topics --create --topic demo --bootstrap-server localhost:9092
You should get confirmation that the topic was successfully created.
Created topic demo.
Tip
You can get a list of existing topics as follows:
kafka-topics --list --bootstrap-server localhost:9092
And get detailed information on a topic with the
--describe
option:kafka-topics --describe --topic demo --bootstrap-server localhost:9092
Run the producer in a new command window to send some messages to the
demo
topic on the source cluster, and fill it with data.kafka-console-producer --topic demo --bootstrap-server localhost:9092
When the producer starts, you will get a prompt
>
.Type some messages at the prompt. Press return after each message. The producer window will look like this:
>first >second >third
Tip
Do not copy-paste the above example into your producer, but rather type the messages and press return after each message.
Consume from the topic on the source cluster.
In a new terminal, run a consumer to consume messages from the
demo
topic.kafka-console-consumer --topic demo --from-beginning --bootstrap-server localhost:9092
If the topic successfully consumes the messages, your output will be:
first second third
Create the cluster link and the mirror topic¶
Create the cluster link on the destination cluster.
kafka-cluster-links --bootstrap-server localhost:9093 \ --create --link demo-link --config bootstrap.servers=localhost:9092
The confirmation message looks like this:
Cluster link 'demo-link' creation successfully completed.
Tip
Cluster Linking must be enabled on the destination cluster (in
/etc/kafka/server-dst.properties
), or this will fail.Initialize the mirror topic.
The following command establishes a mirror topic of the source
demo
topic, using the cluster linkdemo-link
.kafka-mirrors --create --mirror-topic demo --link demo-link \ --bootstrap-server localhost:9093
The confirmation message looks like this:
Created topic demo.
Tip
The mirror topic name must match its source topic name. To learn more, see Mirror Topic Creation
Consume from the mirror topic on the destination cluster to verify it.
In a new terminal, run a consumer to consume messages from the mirror topic.
kafka-console-consumer --topic demo --from-beginning --bootstrap-server localhost:9093
Your output should be:
first second third
Check the replica status on the destination¶
Run the following command to monitor the replicas on the destination cluster.
kafka-replica-status --topics demo --include-linked --bootstrap-server localhost:9093
Your output should resemble the following example.
Topic Partition Replica ClusterLink IsLeader IsObserver IsIsrEligible IsInIsr IsCaughtUp LastCaughtUpLagMs LastFetchLagMs LogStartOffset LogEndOffset demo 0 0 - true false true true true 0 0 0 3 demo 0 0 demo-link true false true true true 2 2 0 3
The output shows both the source and destination’s replicas as distinguished by the
ClusterLink
field, where-
means local and the link name (demo-link
in this case) indicates the replica(s) over the cluster link.Run the following command to populate values for “MirrorState”, “MirrorLastFetchTimeMs”, “MirrorLastFetchHighWatermark”:
kafka-replica-status --topics demo --include-mirror --bootstrap-server localhost:9093
Your output should resemble the following:
Topic Partition Replica IsLeader IsObserver IsIsrEligible IsInIsr IsCaughtUp LastCaughtUpLagMs LastFetchLagMs LogStartOffset LogEndOffset LeaderEpoch MirrorState MirrorLastFetchTimeMs MirrorLastFetchHighWatermark demo 0 1 true false true true true 0 0 0 3 0 ACTIVE -1 -1
Verify that the mirror topic is read-only¶
Attempt to produce messages to the mirror topic on the destination as a way of verifying that the mirror topic is read-only.
Start a producer, targeting the mirror topic.
kafka-console-producer --topic demo --bootstrap-server localhost:9093
Type a message at the
>
prompt, and press return.You should get an error similar to the following example because the mirror topic is not writable.
>hi 2020-08-13 16:55:53,571] ERROR Error when sending message to topic demo with key: null, value: 2 bytes with error: (org.apache.kafka.clients.producer.internals.ErrorLoggingCallback)
Change the Source Topic Configuration¶
Describe the mirror topic to verify the destination and the mirror topic configs.
kafka-configs --describe --topic demo --bootstrap-server localhost:9093
The output will show parameters that are set for strictly mirrored configs.
Dynamic configs for topic demo are: compression.type=producer sensitive=false synonyms={DYNAMIC_TOPIC_CONFIG:compression.type=producer, DEFAULT_CONFIG:compression.type=producer} cleanup.policy=delete sensitive=false synonyms={DYNAMIC_TOPIC_CONFIG:cleanup.policy=delete, DEFAULT_CONFIG:log.cleanup.policy=delete} segment.bytes=1073741824 sensitive=false synonyms={DYNAMIC_TOPIC_CONFIG:segment.bytes=1073741824, STATIC_BROKER_CONFIG:log.segment.bytes=1073741824, DEFAULT_CONFIG:log.segment.bytes=1073741824} max.message.bytes=1048588 sensitive=false synonyms={DYNAMIC_TOPIC_CONFIG:max.message.bytes=1048588, DEFAULT_CONFIG:message.max.bytes=1048588}
Modify the source set
retention.ms
.This is one of the configurations that will mirror only if it is explicitly set on the source. (Note that
retention.ms
was not included in the output for the previous step.)kafka-configs --alter --topic demo --add-config retention.ms=123456890 --bootstrap-server localhost:9092
The confirmation message looks like this:
Completed updating config for topic demo.
Rerun the
kafka-configs
command to verify thatretention.ms
is now on the mirror topic.kafka-configs --describe --topic demo --bootstrap-server localhost:9093
The dynamic configurations now include
retention.ms
.Dynamic configs for topic demo are: compression.type=producer sensitive=false synonyms={DYNAMIC_TOPIC_CONFIG:compression.type=producer, DEFAULT_CONFIG:compression.type=producer} cleanup.policy=delete sensitive=false synonyms={DYNAMIC_TOPIC_CONFIG:cleanup.policy=delete, DEFAULT_CONFIG:log.cleanup.policy=delete} segment.bytes=1073741824 sensitive=false synonyms={DYNAMIC_TOPIC_CONFIG:segment.bytes=1073741824, STATIC_BROKER_CONFIG:log.segment.bytes=1073741824, DEFAULT_CONFIG:log.segment.bytes=1073741824} retention.ms=123456890 sensitive=false synonyms={DYNAMIC_TOPIC_CONFIG:retention.ms=123456890} max.message.bytes=1048588 sensitive=false synonyms={DYNAMIC_TOPIC_CONFIG:max.message.bytes=1048588, DEFAULT_CONFIG:message.max.bytes=1048588}
Tip
It may take up to 5 seconds to propagate the change made in the previous step to set
retention.ms
on the source topic.
Change the Source Topic’s Partitions¶
Alter the number of partitions on the source topic.
kafka-topics --alter --topic demo --partitions 8 --bootstrap-server localhost:9092
Verify the change on the source topic.
kafka-topics --describe --topic demo --bootstrap-server localhost:9092
The output will be:
Topic: demo PartitionCount: 8 ReplicationFactor: 1 Configs: segment.bytes=1073741824,retention.ms=123456890 Topic: demo Partition: 0 Leader: 0 Replicas: 0 Isr: 0 Offline: Topic: demo Partition: 1 Leader: 0 Replicas: 0 Isr: 0 Offline: Topic: demo Partition: 2 Leader: 0 Replicas: 0 Isr: 0 Offline: Topic: demo Partition: 3 Leader: 0 Replicas: 0 Isr: 0 Offline: Topic: demo Partition: 4 Leader: 0 Replicas: 0 Isr: 0 Offline: Topic: demo Partition: 5 Leader: 0 Replicas: 0 Isr: 0 Offline: Topic: demo Partition: 6 Leader: 0 Replicas: 0 Isr: 0 Offline: Topic: demo Partition: 7 Leader: 0 Replicas: 0 Isr: 0 Offline:
Verify the change on the mirror topic.
kafka-topics --describe --topic demo --bootstrap-server localhost:9093
The output of this same command for the mirror topic should match the original topic exactly.
Topic: demo PartitionCount: 8 ReplicationFactor: 1 Configs: segment.bytes=1073741824,retention.ms=123456890 Topic: demo Partition: 0 Leader: 0 Replicas: 0 Isr: 0 Offline: Topic: demo Partition: 1 Leader: 0 Replicas: 0 Isr: 0 Offline: Topic: demo Partition: 2 Leader: 0 Replicas: 0 Isr: 0 Offline: Topic: demo Partition: 3 Leader: 0 Replicas: 0 Isr: 0 Offline: Topic: demo Partition: 4 Leader: 0 Replicas: 0 Isr: 0 Offline: Topic: demo Partition: 5 Leader: 0 Replicas: 0 Isr: 0 Offline: Topic: demo Partition: 6 Leader: 0 Replicas: 0 Isr: 0 Offline: Topic: demo Partition: 7 Leader: 0 Replicas: 0 Isr: 0 Offline:
Tip
This change may a few minutes to propagate because the default metadata refresh is at 5 minutes. This can be improved by setting config
metadata.max.age.ms
when creating the cluster link. For example:./bin/kafka-cluster-links --bootstrap-server localhost:9093 --create --link demo-link \ --config bootstrap.servers=localhost:9092,metadata.max.age.ms=5000
List mirror topics¶
To list mirror topics run kafka-cluster-links --list
with the --include-topics
option.
kafka-cluster-links --list --link demo-link --include-topics --bootstrap-server localhost:9093
Your output should resemble the following.
Link name: 'demo-link', link ID: '123-some-link-id', remote cluster ID: '123-some-cluster-id', local cluster ID: '456-some-other-cluster-id', remote cluster available: 'true', topics: [demo]
Cut over the mirror topic to make it writable¶
Stop the mirror topic for the
demo
topic on the destination.kafka-mirrors --promote --topics demo --bootstrap-server localhost:9093
You will get the following confirmation.
Calculating max offset and ms lag for mirror topics: [demo] Finished calculating max offset lag and max lag ms for mirror topics: [demo] Request for stopping topic inventory.arrivals's mirror was successfully scheduled. Please use the describe command with the --pending-stopped-only option to monitor progress.
Verify that the mirror topic has stopped mirroring.
kafka-mirrors --describe --topics demo --pending-stopped-only --bootstrap-server localhost:9093
You will get the following confirmation that the topic has entered the
STOPPED
state.Topic: demo LinkName: demo-link LinkId: 123-some-link-id MirrorTopic: demo State: STOPPED StateTime: 2021-11-15 14:06:26
Produce to both topics to verify divergence¶
Run a producer to send messages to the original topic on the source. (Or switch back to this command window if you still have this producer running.)
kafka-console-producer --topic demo --bootstrap-server localhost:9092
Type the message
old
at the prompt.> old
Run a producer to send messages to the topic on the destination, which is no longer a mirror topic.
kafka-console-producer --topic demo --bootstrap-server localhost:9093
Type the message
new
at the prompt.> new
Run consumers to read from both topics. Your output will show they have diverged.
Run a consumer to read messages from the original topic on the source. (Or simply view the output on this window, if you still have this consumer running)
kafka-console-consumer --topic demo --from-beginning --bootstrap-server localhost:9092
The output on the source topic now includes the
old
message.first second third old
Run a consumer to read messages from the topic on the destination. (Or simply view the output on this window, if you still have this consumer running.)
kafka-console-consumer --topic demo --from-beginning --bootstrap-server localhost:9093
The output on the destination topic includes the
new
message.first second third new
Delete the link¶
List cluster links.
kafka-cluster-links --list --bootstrap-server localhost:9093
The output should resemble the following example.
Link name: 'demo-link', link ID: '123-some-link-id', cluster ID: '123-some-cluster-id'
Delete the cluster link.
kafka-cluster-links --bootstrap-server localhost:9093 --delete --link demo-link
If the command is successful, the output is:
Cluster link 'demo-link' deletion successfully completed.
Tip
If there are mirror topics using the link you want to delete, you must either first unlink the topics by stopping the mirror for each one, or use the
--force
option with thekafka-cluster-links delete --link
command.
Teardown¶
Run shutdown and cleanup tasks.
- Stop consumers and producers with Ctl-C in their respective command windows.
- Stop all of the other components with Ctl-C in their respective command windows, in reverse order in which you started them. For example, stop the Kafka brokers, then their respective ZooKeepers.