Looking for Confluent Cloud Cluster Linking docs? This page describes Cluster Linking on Confluent Platform. If you are looking for Confluent Cloud documentation, check out Cluster Linking on Confluent Cloud.

Commands for Cluster Linking

Important

This feature is available as a preview feature. A preview feature is a component of Confluent Platform that is being introduced to gain early feedback from developers. This feature can be used for evaluation and non-production testing purposes or to provide feedback to Confluent.

Confluent Enterprise 6.0.0 includes a new Kafka command, kafka-cluster-links. In order for this command to work, the destination cluster must be running Confluent Platform 6.0.0 or later, and Cluster Linking must be enabled on the destination cluster.

This section describes the Cluster Linking commands and examples, along with CLI Tools for Confluent Platform relevant to Cluster Linking.

Configuration options for the cluster links are available as values for flags on the kafka-cluster-links command. Some of these options are shown below in the context of command examples. A full list is provided in reference form in Link Properties.

kafka-mirrors command

Use kafka-mirrors to describe and manage topics that are mirrored over a cluster link.

As with other Kafka commands in Confluent Platform, --bootstrap-server is a required flag for the kafka-mirrors command.

--bootstrap-server

(Required) The connection string for the broker(s) in a cluster is in the form host:port (which can be a comma-separated list for multiple brokers). You must specify the destination cluster where you plan to create mirror topics. The destination cluster must be running Confluent Platform 6.0.0 or later, which is required to support Cluster Linking.

  • Type: string
  • Default: empty string

Use --bootstrap-server in all of the following implementations of kafka-mirrors.

Stopping Mirroring on a Topic

You can use either the kafka-mirrors --failover or kafka-mirrors --promote command to stop mirroring on a topic. Both commands convert the mirror topic into a regular, writable topic, but each is designed for a specific use case.

  • The promote command is intended for migrations and, therefore, performs some additional operations before stopping the mirroring.
  • The failover command is intended for disaster recovery, and so takes effect immediately and succeeds automatically.
  • The dry-run option allows you to preview the results of a command before actually executing it.

Examples are shown below.

Tip

kafka-topics --alter --mirror-action stop is deprecated in Confluent Platform 6.2.0 in favor of promote and failover, and will not be available starting in Confluent Platform 7.0.0. Previous versions of the documentation show the command syntax for this, but you are encouraged to use promote and failover instead.

Promote a topic

Use kafka-mirrors --promote to stop mirroring and convert a mirror topic to a regular topic in a graceful process typically appropriate for migration scenarios. This command checks that there is 0 lag between the source topic and the mirror topic, and does one final sync of the metadata (consumer group offsets and topic configs) before converting the mirror topic into a regular topic.

Note

The promote command does not stop data from being produced to the source topic. It is possible for producers to produce more data to the source topic after the check for 0 lag succeeded. In that case, the two topics will have diverged, and that data will not be mirrored to the regular topic (formerly mirrored).

Examples

You can use promote to specify one topic or multiple topics on the same cluster link to stop mirroring. The command syntax is:

kafka-mirrors --promote --topics <destination-topic-1> .. <destination-topic-n> \
--link <link name> --bootstrap-server <host:port>

Here is an example of using kafka-mirrors --promote to stop mirroring on a topic called example-topic which uses a link named example-link.

kafka-mirrors --promote --topics example-topic --link example-link --bootstrap-server localhost:9092

Failover a topic

Use kafka-mirrors failover to stop mirroring and convert a mirror topic to a regular topic immediately. This is typically appropriate for disaster recovery where the source cluster fails unexpectedly. The failover command does not perform additional checks or syncs. It works like a “forced” version of promote. You can specify one topic or multiple topics to stop mirroring on.

Examples

You can use failover to specify one topic or multiple topics on the same cluster link to stop mirroring. The command syntax is:

kafka-mirrors --failover --topics <destination-topic-1> .. <destination-topic-n> \
--link <link name> --bootstrap-server <host:port>

Here is an example of using kafka-mirrors --failover to stop mirroring on a topic called example-topic which uses a link named example-link.

kafka-mirrors --failover --topics example-topic --link example-link --bootstrap-server localhost:9092

Dry run

To test the results of either promote or failover before executing the commands, add the --dry-run flag. For example:

kafka-mirrors --failover --topics example-topic --link example-link --bootstrap-server localhost:9092 --dry-run

Pausing and Resuming a Mirror Topic

You can use the pause (kafka-mirrors --pause) and resume (kafka-mirrors --unpause) commands to temporarily pause and resume mirroring.

To pause a mirror topic:

kafka-mirrors --pause --topics example-topic --link example-link --bootstrap-server localhost:9092

To resume a mirror topic use --unpause:

kafka-mirrors --unpause --topics example-topic --link example-link --bootstrap-server localhost:9092

Migrating Consumer Groups from Source to Destination Cluster

To migrate a consumer group across the link, you specify a group filter in a JSON file and pass the name of the file as the value for the --consumer-group-filters-json-file flag on the kafka-cluster-links command. You can set this at the time you create the link, or as an update to an existing configuration.

Note

Consumer group filters should only include groups that are not being used on the destination. This will help ensure that the system does not override offsets committed by other consumers on the destination. The system attempts to work around filters containing groups that are also used on the destination, but in these cases there are no guarantees; offsets may be overwritten. For mirror topic “promotion” to work, the system must be able to roll back offsets, which cannot be done if the group is being used by destination consumers.

This example assumes you are migrating group “someGroup” from cluster “broker-west” to cluster “broker-east” and the state before the migration is executed is that you are currently migration all offsets with the following filter set.

{"groupFilters": [
  {
    "name": "*",
    "patternType": "LITERAL",
    "filterType": "INCLUDE"
  }
]}

To migrate a consumer group from a source cluster to a destination cluster, follow these steps.

  1. Stop the consumer on the source cluster.

  2. Wait for a period of 2x consumer.offset.sync.ms.

  3. Make sure that Cluster Linking replication is beyond the latest committed offset. You can confirm this with the following commands.

    • Check the CURRENT-OFFSET on the source cluster.

      kafka-consumer-groups --bootstrap-server broker-west:19091 --describe --group someGroup
      

      Your output should resemble the following:

      GROUP           TOPIC           PARTITION  CURRENT-OFFSET  LOG-END-OFFSET  LAG             CONSUMER-ID     HOST            CLIENT-ID
      someGroup       west-offsets    0          100             100             0               -               -               -
      
    • Check the LOG-END-OFFSET on the destination cluster and ensure it is equal or larger than the CURRENT-OFFSET recorded above.

      kafka-consumer-groups --bootstrap-server broker-east:19092 --describe --group someGroup
      

      Your output should resemble the following:

      GROUP           TOPIC           PARTITION  CURRENT-OFFSET  LOG-END-OFFSET  LAG             CONSUMER-ID     HOST            CLIENT-ID
      someGroup       west-offsets    0          100             100             0
      
  4. Verify the current offset is consistent in source and destination using the following commands.

    • Check the CURRENT-OFFSET on the source cluster.

      kafka-consumer-groups --bootstrap-server broker-west:19091 --describe --group someGroup
      

      Your output should resemble the following:

      GROUP           TOPIC           PARTITION  CURRENT-OFFSET  LOG-END-OFFSET  LAG             CONSUMER-ID     HOST            CLIENT-ID
      someGroup       west-offsets    0          100             100             0               -               -               -
      
    • Check the CURRENT-OFFSET on the destination cluster.

      kafka-consumer-groups --bootstrap-server broker-east:19092 --describe --group someGroup
      

      Your output should resemble the following:

      GROUP           TOPIC           PARTITION  CURRENT-OFFSET  LOG-END-OFFSET  LAG             CONSUMER-ID     HOST            CLIENT-ID
      someGroup       west-offsets    0          100             100             0
      
  5. Update the offset migration filters to remove the group from the migration process.

    echo "consumer.offset.group.filters={\"groupFilters\": [ \
      { \
        \"name\": \"*\", \
        \"patternType\": \"LITERAL\", \
        \"filterType\": \"INCLUDE\" \
      }, \
      { \
        \"name\": \"someGroup\", \
        \"patternType\": \"LITERAL\", \
        \"filterType\": \"EXCLUDE\" \
      } \
    ]}" > newFilters.properties
    kafka-configs --bootstrap-server broker-east:19092 --alter --cluster-link offsets-cluster-link --add-config-file newFilters.properties
    
  6. Start the consumer on the destination cluster.