Tutorial: Use Cluster Linking to Share Data Across Topics

This tutorial provides a hands-on look at how to use Cluster Linking for a topic data sharing use case.

What the Tutorial Covers

By the end of this tutorial, you will have configured two clusters and successfully used Cluster Linking to create a mirror topic and share topic data across the clusters. You will also learn how to stop mirroring to make the topic writable, and verify that the two topics have diverged.

../../_images/kafka-basics-multi-cluster.png

About Prerequisites and Command Examples

  • These instructions assume you have a local installation of Confluent Platform 7.0.0 or later, and Java 1.8 or 1.11 (needed for Confluent Platform). If you are new to Confluent Platform, work through the Quick Start for Confluent Platform first, and then return to these tutorials.
  • The examples assume that your properties files are in the default locations on your Confluent Platform installation, except as otherwise noted. This should make it easier to copy/paste example commands directly into your terminal in most cases.
  • With Confluent Platform is installed, Confluent CLI commands themselves can be run from any directory (kafka-topics, kafka-console-producer, kafka-console-consumer), but for commands that access properties files in $CONFLUENT_HOME (kafka-server-start), the examples show running these from within that directory.
  • Confluent CLI commands can specify the bootstrap server at the beginning or end of the command: kafka-topics --list --bootstrap-server localhost:9092 is the same as kafka-topics --bootstrap-server localhost:9092 --list. In these tutorials, the target bootstrap server is specified at the end of commands.

The rest of the tutorial refers to $CONFLUENT_HOME, which represents etc/kafka within your Confluent Platform install directory. Set this as an environment variable, for example:

export CONFLUENT_HOME=$HOME/confluent-7.3.10
PATH=$CONFLUENT_HOME/bin:$PATH

Ports and Configuration Mapping

The example deployment in this tutorial uses the following port and feature configurations, and assumes that services will run on localhost.

Source Destination
Kafka Brokers 9092 (location of original demo topic) 9093 (location of demo mirror topic)
ZooKeeper 2181 2182
HTTP listeners 8090 8091
Cluster link   Cluster link is enabled on the destination

Configure Kafka and ZooKeeper files

In $CONFLUENT_HOME/etc/kafka/, configure the following files to set up the source and destination clusters. You can copy and modify the existing zookeeper.properties and server.properties files to use as a starting point. Be sure to uncomment any line for which you change a value.

Note that you must add confluent.http.server.listeners and configure it to be unique to each broker.

Files Configurations
zookeeper-src.properties

dataDir=/tmp/zookeeper-1

clientPort=2181 (this is the default, no changes needed)

zookeeper-dst.properties

dataDir=/tmp/zookeeper-2

clientPort=2182

server-src.properties

listeners=PLAINTEXT://:9092 (this is the default, leave commented out, no changes needed)

log.dirs=/tmp/kafka-logs-1

zookeeper.connect=localhost:2181 (this is the default, no changes needed)

Add the following listener configuration to specify the REST endpoint unique to this broker:

confluent.http.server.listeners=http://localhost:8090

server-dst.properties

listeners=PLAINTEXT://localhost:9093 (uncomment and update)

log.dirs=/tmp/kafka-logs-2

zookeeper.connect=localhost:2182

Add the following listener configuration to specify the REST endpoint unique to this broker:

confluent.http.server.listeners=http://localhost:8091

Tip

Pre 7.0.0 Confluent Platform releases required adding confluent.cluster.link.enable=true) to the destination server properties file to enable Cluster Linking. Starting with Confluent Platform 7.0.0, Cluster Linking is enabled by default, so this setting is no longer needed and not shown in the configs above.

Start the source cluster

Run each of the following commands in separate command windows from $CONFLUENT_HOME.

  1. Start the ZooKeeper server for the source cluster.

    ./bin/zookeeper-server-start etc/kafka/zookeeper-src.properties
    
  2. Start a Kafka broker for the source cluster.

    ./bin/kafka-server-start etc/kafka/server-src.properties
    

Note

The source cluster must be Confluent Platform 5.4 or later (Apache Kafka® 2.4).

Start the destination cluster

Run each of the following commands in separate command windows from $CONFLUENT_HOME.

  1. Start the ZooKeeper server for the destination cluster.

    ./bin/zookeeper-server-start etc/kafka/zookeeper-dst.properties
    
  2. Start a Kafka broker for the destination cluster.

    ./bin/kafka-server-start etc/kafka/server-dst.properties
    

Note

The destination cluster must be Confluent Platform 7.0.0 or later.

Populate the source cluster

  1. Create a topic on the source cluster.

    kafka-topics --create --topic demo --bootstrap-server localhost:9092
    

    You should get confirmation that the topic was successfully created.

    Created topic demo.
    

    Tip

    You can get a list of existing topics as follows:

    kafka-topics --list --bootstrap-server localhost:9092
    

    And get detailed information on a topic with the --describe option:

    kafka-topics --describe --topic demo --bootstrap-server localhost:9092
    
  2. Run the producer in a new command window to send some messages to the demo topic on the source cluster, and fill it with data.

    kafka-console-producer --topic demo --bootstrap-server localhost:9092
    

    When the producer starts, you will get a prompt >.

    Type some messages at the prompt. Press return after each message. The producer window will look like this:

    >first
    >second
    >third
    

    Tip

    Do not copy-paste the above example into your producer, but rather type the messages and press return after each message.

  3. Consume from the topic on the source cluster.

    In a new terminal, run a consumer to consume messages from the demo topic.

    kafka-console-consumer --topic demo --from-beginning --bootstrap-server localhost:9092
    

    If the topic successfully consumes the messages, your output will be:

    first
    second
    third
    

Check the replica status on the destination

  1. Run the following command to monitor the replicas on the destination cluster.

    kafka-replica-status --topics demo --include-linked --bootstrap-server localhost:9093
    

    Your output should resemble the following example.

    Topic Partition Replica ClusterLink IsLeader IsObserver IsIsrEligible IsInIsr IsCaughtUp LastCaughtUpLagMs LastFetchLagMs LogStartOffset LogEndOffset
    demo  0         0       -           true     false      true          true    true       0                 0              0              3
    demo  0         0       demo-link   true     false      true          true    true       2                 2              0              3
    

    The output shows both the source and destination’s replicas as distinguished by the ClusterLink field, where - means local and the link name (demo-link in this case) indicates the replica(s) over the cluster link.

  2. Run the following command to populate values for “MirrorState”, “MirrorLastFetchTimeMs”, “MirrorLastFetchHighWatermark”:

    kafka-replica-status --topics demo --include-mirror --bootstrap-server localhost:9093
    

    Your output should resemble the following:

    Topic Partition Replica IsLeader IsObserver IsIsrEligible IsInIsr IsCaughtUp LastCaughtUpLagMs LastFetchLagMs LogStartOffset LogEndOffset LeaderEpoch MirrorState MirrorLastFetchTimeMs MirrorLastFetchHighWatermark
    demo  0         1       true     false      true          true    true       0                 0              0              3            0           ACTIVE      -1                    -1
    

Verify that the mirror topic is read-only

Attempt to produce messages to the mirror topic on the destination as a way of verifying that the mirror topic is read-only.

  1. Start a producer, targeting the mirror topic.

    kafka-console-producer --topic demo --bootstrap-server localhost:9093
    
  2. Type a message at the > prompt, and press return.

    You should get an error similar to the following example because the mirror topic is not writable.

    >hi
    2020-08-13 16:55:53,571] ERROR Error when sending message to topic demo with key: null, value: 2 bytes with error: (org.apache.kafka.clients.producer.internals.ErrorLoggingCallback)
    

Change the Source Topic Configuration

  1. Describe the mirror topic to verify the destination and the mirror topic configs.

    kafka-configs --describe --topic demo --bootstrap-server localhost:9093
    

    The output will show parameters that are set for strictly mirrored configs.

    Dynamic configs for topic demo are:
    compression.type=producer sensitive=false synonyms={DYNAMIC_TOPIC_CONFIG:compression.type=producer, DEFAULT_CONFIG:compression.type=producer}
    cleanup.policy=delete sensitive=false synonyms={DYNAMIC_TOPIC_CONFIG:cleanup.policy=delete, DEFAULT_CONFIG:log.cleanup.policy=delete}
    segment.bytes=1073741824 sensitive=false synonyms={DYNAMIC_TOPIC_CONFIG:segment.bytes=1073741824, STATIC_BROKER_CONFIG:log.segment.bytes=1073741824, DEFAULT_CONFIG:log.segment.bytes=1073741824}
    max.message.bytes=1048588 sensitive=false synonyms={DYNAMIC_TOPIC_CONFIG:max.message.bytes=1048588, DEFAULT_CONFIG:message.max.bytes=1048588}
    
  2. Modify the source set retention.ms.

    This is one of the configurations that will mirror only if it is explicitly set on the source. (Note that retention.ms was not included in the output for the previous step.)

    kafka-configs --alter --topic demo --add-config retention.ms=123456890 --bootstrap-server localhost:9092
    

    The confirmation message looks like this:

    Completed updating config for topic demo.
    
  3. Rerun the kafka-configs command to verify that retention.ms is now on the mirror topic.

    kafka-configs --describe --topic demo --bootstrap-server localhost:9093
    

    The dynamic configurations now include retention.ms.

    Dynamic configs for topic demo are:
    compression.type=producer sensitive=false synonyms={DYNAMIC_TOPIC_CONFIG:compression.type=producer, DEFAULT_CONFIG:compression.type=producer}
    cleanup.policy=delete sensitive=false synonyms={DYNAMIC_TOPIC_CONFIG:cleanup.policy=delete, DEFAULT_CONFIG:log.cleanup.policy=delete}
    segment.bytes=1073741824 sensitive=false synonyms={DYNAMIC_TOPIC_CONFIG:segment.bytes=1073741824, STATIC_BROKER_CONFIG:log.segment.bytes=1073741824, DEFAULT_CONFIG:log.segment.bytes=1073741824}
    retention.ms=123456890 sensitive=false synonyms={DYNAMIC_TOPIC_CONFIG:retention.ms=123456890}
    max.message.bytes=1048588 sensitive=false synonyms={DYNAMIC_TOPIC_CONFIG:max.message.bytes=1048588, DEFAULT_CONFIG:message.max.bytes=1048588}
    

    Tip

    It may take up to 5 seconds to propagate the change made in the previous step to set retention.ms on the source topic.

Change the Source Topic’s Partitions

  1. Alter the number of partitions on the source topic.

    kafka-topics --alter --topic demo --partitions 8 --bootstrap-server localhost:9092
    
  2. Verify the change on the source topic.

    kafka-topics --describe --topic demo --bootstrap-server localhost:9092
    

    The output will be:

    Topic: demo       PartitionCount: 8       ReplicationFactor: 1    Configs: segment.bytes=1073741824,retention.ms=123456890
      Topic: demo     Partition: 0    Leader: 0       Replicas: 0     Isr: 0  Offline:
      Topic: demo     Partition: 1    Leader: 0       Replicas: 0     Isr: 0  Offline:
      Topic: demo     Partition: 2    Leader: 0       Replicas: 0     Isr: 0  Offline:
      Topic: demo     Partition: 3    Leader: 0       Replicas: 0     Isr: 0  Offline:
      Topic: demo     Partition: 4    Leader: 0       Replicas: 0     Isr: 0  Offline:
      Topic: demo     Partition: 5    Leader: 0       Replicas: 0     Isr: 0  Offline:
      Topic: demo     Partition: 6    Leader: 0       Replicas: 0     Isr: 0  Offline:
      Topic: demo     Partition: 7    Leader: 0       Replicas: 0     Isr: 0  Offline:
    
  3. Verify the change on the mirror topic.

    kafka-topics --describe --topic demo --bootstrap-server localhost:9093
    

    The output of this same command for the mirror topic should match the original topic exactly.

    Topic: demo       PartitionCount: 8       ReplicationFactor: 1    Configs: segment.bytes=1073741824,retention.ms=123456890
      Topic: demo     Partition: 0    Leader: 0       Replicas: 0     Isr: 0  Offline:
      Topic: demo     Partition: 1    Leader: 0       Replicas: 0     Isr: 0  Offline:
      Topic: demo     Partition: 2    Leader: 0       Replicas: 0     Isr: 0  Offline:
      Topic: demo     Partition: 3    Leader: 0       Replicas: 0     Isr: 0  Offline:
      Topic: demo     Partition: 4    Leader: 0       Replicas: 0     Isr: 0  Offline:
      Topic: demo     Partition: 5    Leader: 0       Replicas: 0     Isr: 0  Offline:
      Topic: demo     Partition: 6    Leader: 0       Replicas: 0     Isr: 0  Offline:
      Topic: demo     Partition: 7    Leader: 0       Replicas: 0     Isr: 0  Offline:
    

    Tip

    This change may a few minutes to propagate because the default metadata refresh is at 5 minutes. This can be improved by setting config metadata.max.age.ms when creating the cluster link. For example:

    ./bin/kafka-cluster-links --bootstrap-server localhost:9093 --create --link demo-link \
    --config bootstrap.servers=localhost:9092,metadata.max.age.ms=5000
    

List mirror topics

To list mirror topics run kafka-cluster-links --list with the --include-topics option.

kafka-cluster-links --list --link demo-link --include-topics --bootstrap-server localhost:9093

Your output should resemble the following.

Link name: 'demo-link', link ID: '123-some-link-id', remote cluster ID: '123-some-cluster-id', local cluster ID: '456-some-other-cluster-id', remote cluster available: 'true', topics: [demo]

Cut over the mirror topic to make it writable

  1. Stop the mirror topic for the demo topic on the destination.

    kafka-mirrors --promote --topics demo --bootstrap-server localhost:9093
    

    You will get the following confirmation.

    Calculating max offset and ms lag for mirror topics: [demo]
    Finished calculating max offset lag and max lag ms for mirror topics: [demo]
    Request for stopping topic inventory.arrivals's mirror was successfully scheduled.
    Please use the describe command with the --pending-stopped-only option to monitor progress.
    
  2. Verify that the mirror topic has stopped mirroring.

    kafka-mirrors --describe --topics demo --pending-stopped-only --bootstrap-server localhost:9093
    

    You will get the following confirmation that the topic has entered the STOPPED state.

    Topic: demo  LinkName: demo-link  LinkId: 123-some-link-id MirrorTopic: demo  State: STOPPED StateTime: 2021-11-15 14:06:26
    

Produce to both topics to verify divergence

  1. Run a producer to send messages to the original topic on the source. (Or switch back to this command window if you still have this producer running.)

    kafka-console-producer --topic demo --bootstrap-server localhost:9092
    

    Type the message old at the prompt.

    > old
    
  2. Run a producer to send messages to the topic on the destination, which is no longer a mirror topic.

    kafka-console-producer --topic demo --bootstrap-server localhost:9093
    

    Type the message new at the prompt.

    > new
    
  3. Run consumers to read from both topics. Your output will show they have diverged.

    • Run a consumer to read messages from the original topic on the source. (Or simply view the output on this window, if you still have this consumer running)

      kafka-console-consumer --topic demo --from-beginning --bootstrap-server localhost:9092
      

      The output on the source topic now includes the old message.

      first
      second
      third
      old
      
    • Run a consumer to read messages from the topic on the destination. (Or simply view the output on this window, if you still have this consumer running.)

      kafka-console-consumer --topic demo --from-beginning --bootstrap-server localhost:9093
      

      The output on the destination topic includes the new message.

      first
      second
      third
      new
      

Teardown

Run shutdown and cleanup tasks.

  • Stop consumers and producers with Ctl-C in their respective command windows.
  • Stop all of the other components with Ctl-C in their respective command windows, in reverse order in which you started them. For example, stop the Kafka brokers, then their respective ZooKeepers.