Command Reference for Cluster Linking on Confluent Platform

Confluent Enterprise now includes a feature called Cluster Linking that syncs data and metadata from Confluent Platform, Confluent Cloud, or Kafka clusters to Confluent Platform and Confluent Cloud clusters.

Cluster Linking geo-replicates data using two key abstractions:

  • a cluster link, which connects a “source cluster” and a “destination cluster”
  • mirror topics, which are topics on a cluster link’s destination cluster that are identical copies of topics on the source cluster.

Cluster links and mirror topics can be created and managed using either a REST API in the Kafka REST v3 Proxy, or using CLI commands in a terminal. This section describes the commands for creating and managing cluster links and mirror topics.

CLI commands

Configuration options for the cluster links are available as values for flags on the kafka-cluster-links command. Some of these options are shown below in the context of command examples. A full list is provided in reference form in Link Properties.

Use kafka-cluster-links to create and manage links across clusters.

About bootstrap-server

As with other Kafka commands in Confluent Platform, --bootstrap-server is a required flag for the kafka-cluster-links command.

--bootstrap-server

(Required) The connection string for the broker(s) in a cluster in the form host:port (which can be a comma-separated list for multiple brokers). You must specify the destination cluster where you plan to create mirror topics. The destination cluster must be running Confluent Platform 7.0.0 or later, which is required to support Cluster Linking.

  • Type: string
  • Default: empty string

Use --bootstrap-server in all of the following implementations of kafka-cluster-links.

For Cluster Linking, you should typically specify the --bootstrap-server on the destination cluster (with one exception, noted below). Here is a quick summary:

  • For kafka-mirrors, --bootstrap-server is always the destination cluster.
  • For kafka-cluster-links, --bootstrap-server is usually the destination cluster.

The only exception for a source-initiated link (recommended for Cluster Linking from a Confluent Platform source to a Confluent Cloud destination). In that case, you would create a cluster link object twice and the second time requires executing kafka-cluster-links with the source cluster as the bootstrap server.

For common use cases, see:

For examples of use cases with source-initiated links, see: - Tutorial: Link Confluent Platform and Confluent Cloud Clusters

Tip

In most cases, you can provide --bootstrap-server at the beginning (immediately after kafka-cluster-links) or at the end of the command. The examples in the following sections show --bootstrap-server used immediately after kafka-cluster-links.

Creating a mirror topic

A mirror topic is a read-only topic that reflects all the data and metadata in another topic.

Creating a mirror topic with the CLI uses the kafka-mirrors tool. Once a mirror topic is created, the mirror automatically begins fetching data from the source topic.

For more information, see Mirror Topics.

Example Command

kafka-mirrors --create --mirror-topic example-topic \
--link demo-link \
--bootstrap-server localhost:9093

Example Output

Created topic example-topic.

To create a mirror topic, use kafka-cluster-links along with bootstrap-server and the following flags.

--mirror-topic

(Required) The name of the mirror topic to create. This must match exactly the name of the source topic to mirror over the cluster link.

  • Type: string
--link

(Required) The name of the cluster link used to pull data from the source topic.

  • Type: string

Tip

  • The value for --mirror-topic must match the original --topic name.
  • You can list mirrored topics using kafka-cluster-links --list along with the --include-topics flag.
--command-config
Property file containing configurations to be passed to the AdminClient. For example, with security credentials for authorization and authentication.

The following are optional configurations when creating a mirror topic:

--config

A comma-separated list of configs to override when creating the mirror topic. Each config to override should be specified as name=value. For more information about which configurations can be set on a mirror topic, see Configurations in Mirror Topics.

  • Type: string
--replication-factor

The replication factor of the mirror topic being created. If not supplied, defaults to the destination cluster’s default, not the source topic’s replication factor.

  • Type: string
--source-topic

The name of the source topic to mirror. Required if the cluster link has a prefix configured. To learn more, see Prefixing Mirror Topics and Consumer Group Names.

  • Type: string

You must have ALTER CLUSTER authorization to create a mirror topic.

Listing mirror topics

To list mirror topics on a cluster across all cluster links, use kafka-mirrors --list. This command can either list the mirror topics for a specific cluster link, or can list all mirror topics across all cluster links on the cluster.

Example Command

kafka-mirrors --list --bootstrap-server localhost:9093

Example Output

topic1
topic2
topic3
topic4

These parameters can be added to the command:

--link

(optional) The name of the cluster link to filter on. If provided, only mirror topics on this cluster link will be listed.

  • Type: string
--include-stopped
(optional) If this flag is added, the list will include any topics that were formerly mirror topics but were stopped by a promote or failover command. This flag does not take an argument.

Describing mirror topics

Use kafka-mirrors to describe and manage topics that are mirrored over a cluster link.

kafka-mirrors --describe --bootstrap-server pkc-nwnyk.us-west-2.aws.confluent.cloud:9092 --command-config lkc-rn220.config --link onprem-to-cloud

Topic: web.orders.modified  LinkName: onprem-to-cloud LinkId: 2c3dee72-f54b-4bb3-9694-cf29caaf3637  MirrorTopic: web.orders.modified  State: ACTIVE StateTime: 2021-11-10 15:33:29
  Partition: 0  State: ACTIVE DestLogEndOffset: 114123  LastFetchSourceHighWatermark: 114123  Lag: 0  TimeSinceLastFetchMs: 8385101
  Partition: 1  State: ACTIVE DestLogEndOffset: 115278  LastFetchSourceHighWatermark: 115278  Lag: 0  TimeSinceLastFetchMs: 8387954
  Partition: 2  State: ACTIVE DestLogEndOffset: 112210  LastFetchSourceHighWatermark: 112210  Lag: 0  TimeSinceLastFetchMs: 8508856
  Partition: 3  State: ACTIVE DestLogEndOffset: 120887  LastFetchSourceHighWatermark: 120887  Lag: 0  TimeSinceLastFetchMs: 8389749
  Partition: 4  State: ACTIVE DestLogEndOffset: 109225  LastFetchSourceHighWatermark: 109225  Lag: 0  TimeSinceLastFetchMs: 8385101
  Partition: 5  State: ACTIVE DestLogEndOffset: 111669  LastFetchSourceHighWatermark: 111669  Lag: 0  TimeSinceLastFetchMs: 8387954
Topic: web.orders LinkName: onprem-to-cloud LinkId: 2c3dee72-f54b-4bb3-9694-cf29caaf3637  MirrorTopic: web.orders State: ACTIVE StateTime: 2021-11-10 15:33:29
  Partition: 0  State: ACTIVE DestLogEndOffset: 294760  LastFetchSourceHighWatermark: 294760  Lag: 0  TimeSinceLastFetchMs: 8387954
  Partition: 1  State: ACTIVE DestLogEndOffset: 285862  LastFetchSourceHighWatermark: 285862  Lag: 0  TimeSinceLastFetchMs: 8508856
  Partition: 2  State: ACTIVE DestLogEndOffset: 284891  LastFetchSourceHighWatermark: 284891  Lag: 0  TimeSinceLastFetchMs: 8389749
  Partition: 3  State: ACTIVE DestLogEndOffset: 285982  LastFetchSourceHighWatermark: 285982  Lag: 0  TimeSinceLastFetchMs: 8385101
  Partition: 4  State: ACTIVE DestLogEndOffset: 277379  LastFetchSourceHighWatermark: 277379  Lag: 0  TimeSinceLastFetchMs: 8387954
  Partition: 5  State: ACTIVE DestLogEndOffset: 283731  LastFetchSourceHighWatermark: 283731  Lag: 0  TimeSinceLastFetchMs: 8508856
Topic: inventory.shipments  LinkName: onprem-to-cloud LinkId: 2c3dee72-f54b-4bb3-9694-cf29caaf3637  MirrorTopic: inventory.shipments  State: ACTIVE StateTime: 2021-11-10 15:51:21

As with other Kafka commands in Confluent Platform, --bootstrap-server is a required flag for the kafka-mirrors command.

--bootstrap-server

(Required) The connection string for the broker(s) in a cluster is in the form host:port (which can be a comma-separated list for multiple brokers). You must specify the destination cluster where you plan to create mirror topics. The destination cluster must be running Confluent Platform 6.0.0 or later, which is required to support Cluster Linking.

  • Type: string
  • Default: empty string

These parameters can be added to the command:

--link

(optional) The name of the cluster link to filter on. If provided, only mirror topics on this cluster link will be listed.

  • Type: string
--include-stopped
(optional) If this flag is added, the list will include any topics that were formerly mirror topics but were stopped by a promote or failover command. This flag does not take an argument.

Stopping mirroring on a topic

You can use either the kafka-mirrors --failover or kafka-mirrors --promote command to stop mirroring on a topic. Both commands permanently convert the mirror topic into a regular, writable topic, but each is designed for a specific use case.

  • The promote command is intended for migrations and, therefore, performs some additional verification and operations before stopping the mirroring.
  • The failover command is intended for disaster recovery, and so takes effect immediately and always succeeds. No additional operations are performed.
  • The dry-run option allows you to preview the results of a command before actually executing it.

You can perform promote and failover commands on multiple topics at the same time by passing in a list of topic names using the --topics flag, and a comma-separated list of topic names. Even if you are only promoting 1 topic, you still use the --topics (plural) flag. For example:

--topics topic1

--topics topic1,topic2,topic3

More examples are shown below.

Promote a topic

Use kafka-mirrors --promote to stop mirroring and convert a mirror topic to a regular topic in a graceful process typically appropriate for migration scenarios. This command checks that there is 0 lag between the source topic and the mirror topic, and does one final sync of the metadata (consumer group offsets and topic configs) before converting the mirror topic into a regular topic.

Note

  • The promote command does not stop data from being produced to the source topic. It is possible for producers to produce more data to the source topic after the check for 0 lag succeeded. In that case, the two topics will have diverged, and that data will not be mirrored to the regular topic (formerly mirrored).
  • If there is lag between the source topic and the mirror topic at the time that the promote command is run, the promote command will fail.

Examples

You can use promote to specify one topic or multiple topics on the same cluster link to stop mirroring. The command syntax is:

kafka-mirrors --promote --topics <destination-topic-1> .. <destination-topic-n> --bootstrap-server <host:port>

Here is an example of using kafka-mirrors --promote to stop mirroring on a topic called example-topic which uses a link named example-link.

kafka-mirrors --promote --topics example-topic --bootstrap-server localhost:9093

Failover a topic

Use kafka-mirrors failover to stop mirroring and convert a mirror topic to a regular topic immediately. This is typically appropriate for disaster recovery where the source cluster fails unexpectedly. The failover command does not perform additional checks or syncs. It works like a “forced” version of promote. You can specify one topic or multiple topics to stop mirroring on.

Examples

You can use failover to specify one topic or multiple topics on the same cluster link to stop mirroring. The command syntax is:

kafka-mirrors --failover --topics <destination-topic-1> .. <destination-topic-n> --bootstrap-server <host:port>

Here is an example of using kafka-mirrors --failover to stop mirroring on a topic called example-topic which uses a link named example-link. The link will be inferred; it should not be included in the command, as shown below.

kafka-mirrors --failover --topics example-topic --bootstrap-server localhost:9093

Validate only (dry run)

To test the results of either promote or failover before executing the commands, add the --validate-only flag. For example:

kafka-mirrors --failover --topics example-topic --bootstrap-server localhost:9093 --validate-only

Pausing and resuming a mirror topic

You can use the pause (kafka-mirrors --pause) and resume (kafka-mirrors --unpause) commands to temporarily pause and resume mirroring.

To pause a mirror topic:

kafka-mirrors --pause --topics example-topic --bootstrap-server localhost:9093

To resume a mirror topic use --unpause:

kafka-mirrors --unpause --topics example-topic --bootstrap-server localhost:9093

Migrating consumer groups from source to destination cluster

To migrate a consumer group across the link, set consumer.offset.sync.enable=true in your link configuration, specify a group filter in a JSON file, and pass the name of the file as the value for the --consumer-group-filters-json-file flag on the CLI commands. You can set this at the time you create the link, or as an update to an existing configuration.

Note

Consumer group filters should only include groups that are not being used on the destination. This will help ensure that the system does not override offsets committed by other consumers on the destination. The system attempts to work around filters containing groups that are also used on the destination, but in these cases there are no guarantees; offsets may be overwritten. For mirror topic “promotion” to work, the system must be able to roll back offsets, which cannot be done if the group is being used by destination consumers.

This example assumes you are migrating group “someGroup” from cluster “broker-west” to cluster “broker-east” and the state before the migration is executed is that you are currently migration all offsets with the following filter set.

{"groupFilters": [
  {
    "name": "*",
    "patternType": "LITERAL",
    "filterType": "INCLUDE"
  }
]}

To migrate a consumer group from a source cluster to a destination cluster, follow these steps.

  1. Stop the consumer on the source cluster.

  2. Wait for a period of 2x consumer.offset.sync.ms.

  3. Make sure that Cluster Linking replication is beyond the latest committed offset. You can confirm this with the following commands.

    • Check the CURRENT-OFFSET on the source cluster.

      kafka-consumer-groups --bootstrap-server broker-west:19091 --describe --group someGroup
      

      Your output should resemble the following:

      GROUP           TOPIC           PARTITION  CURRENT-OFFSET  LOG-END-OFFSET  LAG             CONSUMER-ID     HOST            CLIENT-ID
      someGroup       west-offsets    0          100             100             0               -               -               -
      
    • Check the LOG-END-OFFSET on the destination cluster and ensure it is equal or larger than the CURRENT-OFFSET recorded above.

      kafka-consumer-groups --bootstrap-server broker-east:19092 --describe --group someGroup
      

      Your output should resemble the following:

      GROUP           TOPIC           PARTITION  CURRENT-OFFSET  LOG-END-OFFSET  LAG             CONSUMER-ID     HOST            CLIENT-ID
      someGroup       west-offsets    0          100             100             0
      
  4. Verify the current offset is consistent in source and destination using the following commands.

    • Check the CURRENT-OFFSET on the source cluster.

      kafka-consumer-groups --bootstrap-server broker-west:19091 --describe --group someGroup
      

      Your output should resemble the following:

      GROUP           TOPIC           PARTITION  CURRENT-OFFSET  LOG-END-OFFSET  LAG             CONSUMER-ID     HOST            CLIENT-ID
      someGroup       west-offsets    0          100             100             0               -               -               -
      
    • Check the CURRENT-OFFSET on the destination cluster.

      kafka-consumer-groups --bootstrap-server broker-east:19092 --describe --group someGroup
      

      Your output should resemble the following:

      GROUP           TOPIC           PARTITION  CURRENT-OFFSET  LOG-END-OFFSET  LAG             CONSUMER-ID     HOST            CLIENT-ID
      someGroup       west-offsets    0          100             100             0
      
  5. Update the offset migration filters to remove the group from the migration process.

    echo "consumer.offset.group.filters={\"groupFilters\": [ \
      { \
        \"name\": \"*\", \
        \"patternType\": \"LITERAL\", \
        \"filterType\": \"INCLUDE\" \
      }, \
      { \
        \"name\": \"someGroup\", \
        \"patternType\": \"LITERAL\", \
        \"filterType\": \"EXCLUDE\" \
      } \
    ]}" > newFilters.properties
    kafka-configs --bootstrap-server broker-east:19092 --alter --cluster-link offsets-cluster-link --add-config-file newFilters.properties
    
  6. Start the consumer on the destination cluster.

REST API commands

The Cluster Linking REST API is documented in the REST API Proxy v3 Documentation.