Commands for Cluster Linking¶
Looking for Confluent Cloud Cluster Linking docs? You are currently viewing Confluent Platform documentation. If you are looking for Confluent Cloud docs, check out Cluster Linking on Confluent Cloud.
Important
This feature is available as a preview feature. A preview feature is a component of Confluent Platform that is being introduced to gain early feedback from developers. This feature can be used for evaluation and non-production testing purposes or to provide feedback to Confluent.
Confluent Enterprise 6.0.0 includes a new Kafka command, kafka-cluster-links. In order for this command to work, the destination cluster must be running Confluent Platform 6.0.0 or later, and Cluster Linking must be enabled on the destination cluster.
This section describes the Cluster Linking commands and examples, along with CLI Tools for Confluent Platform relevant to Cluster Linking.
Configuration options for the cluster links are available as values
for flags on the kafka-cluster-links
command. Some of these options
are shown below in the context of command examples. A full list is provided
in reference form in Link Properties.
kafka-cluster-links command¶
Use kafka-cluster-links
to create and manage links across clusters.
As with other Kafka commands in Confluent Platform, --bootstrap-server
is a required flag for the kafka-cluster-links
command.
--bootstrap-server
(Required) The connection string for the broker(s) in a cluster in the form
host:port
(which can be a comma-separated list for multiple brokers). You must specify the destination cluster where you plan to create mirror topics. The destination cluster must be running Confluent Platform 6.0.0 or later, which is required to support Cluster Linking.- Type: string
- Default: empty string
Use --bootstrap-server
in all of the following implementations of kafka-cluster-links
.
Tip
In most cases, you can provide --bootstrap-server
at the beginning (immediately after kafka-cluster-links
) or at the end of the command.
The examples in the following sections show --bootstrap-server
used immediately after kafka-cluster-links
.
The examples in Tutorial: Using Cluster Linking for Topic Data Sharing show it at the end of the commands. An exception to this is
the command to create a cluster link, which requires specifying at least two different values for --bootstrap-server
.
Creating a Cluster Link¶
Example Command
kafka-cluster-links --bootstrap-server localhost:9092 \
--create \
--link-name example-link \
--config bootstrap.servers=example-1:9092,example-2:9092,example-3:9092
Example Output
Cluster link 'example-link' creation successfully completed.
To create a cluster link, use kafka-cluster-links
along with bootstrap-server and the following flags.
--link-name
(Required) The name of the cluster link to create. Must be a unique name within the cluster.
- Type: string
(Required) One of the following parameters must be provided (not both) to specify how the destination cluster
should communicate with the source. The available configurations are those that would be used to configure a client,
including the required bootstrap.servers
and other necessary security and authorization properties.
--config
Comma-separated configurations to be applied to the cluster link on creation of the form “key=value”. When you use this flag, the configurations are specified directly on the command line (as opposed to in a file, as described for the next flag). You can use square brackets to group values that contain commas. For a full list of available configurations, see Link Properties.
- Type: string
--config-file
Property file containing configurations for the cluster link. This is the recommended way to specify cluster link configurations.
- Type: string
--consumer-group-filters-json-file
Path to JSON file to use for configuration of
consumer.offset.group.filters
. To learn more, see Migrating Consumer Groups from Source to Destination Cluster.- Type: string
You must have ALTER CLUSTER
authorization to create a cluster link, as described in Authorization (ACLs) (subsection, ACLs for Link on Source Cluster).
--validate-only
- If provided, validates that the cluster link can be created as specified, but does not create it.
For example, if you specify the following configuration for a secure cluster link in a file named link-config.properties
:
bootstrap.servers=example-1:9092,example-2:9092,example-3:9092
sasl.mechanism=PLAIN
sasl.jaas.config=org.apache.kafka.common.security.plain.PlainLoginModule required username="example-user" password="example-password"
security.protocol=SASL_SSL
ssl.endpoint.identification.algorithm=https
Then, you can create the cluster link example-link
with the following command:
kafka-cluster-links --bootstrap-server localhost:9092 --create --link-name example-link --config-file link-config.properties
Listing Cluster Links¶
Example Command
kafka-cluster-links --list --bootstrap-server localhost:9092
Example Output
Link name: 'example-link', link ID: '123-some-link-id', cluster ID: '123-some-cluster-id'
You can list existing cluster links. The command returns the link name, link ID (an internally allocated unique ID), and cluster ID of the linked cluster.
Tip
To list cluster links, use kafka-cluster-links
along with bootstrap-server and the following flags.
--link-name
If provided, only lists the specified cluster link.
- Type: string
--command-config
If provided, includes a list of all topics that are linked.
- Type: string
--include-topics
If provided, includes a list of all topics that are linked (mirror topics).
- Type: string
You must have DESCRIBE CLUSTER
authorization to list cluster links.
Listing Mirror Topics¶
Example Command
kafka-cluster-links --list --link-name example-link --include-topics --bootstrap-server localhost:9092
Example Output
Link name: 'example-link', link ID: '123-some-link-id', cluster ID: '123-some-cluster-id', topics: [example-topic]
You can use the use kafka-cluster-links --list
to derive a
list of mirror topics. The options are described in the previous section.
This example uses the --include-topics
flag to list cluster links.
Describing a Cluster Link¶
Example Command
kafka-configs --bootstrap-server localhost:9092 \
--describe \
--cluster-link example-link
Example Output
Dynamic configs for cluster-link example-link are:
metadata.max.age.ms=300000 sensitive=false synonyms={}
...
To describe a cluster link, use kafka-cluster-links
along with bootstrap-server and these flags.
--cluster-link
(Required) The name of the cluster link to describe.
- Type: string
--command-config
Property file containing configurations to be passed to the AdminClient.
- Type: string
You must have DESCRIBE CLUSTER
authorization to describe a cluster link.
Altering a Cluster Link¶
Example Command
kafka-configs --bootstrap-server localhost:9092 \
--alter \
--cluster-link example-link \
--add-config cluster.link.retry.timeout.ms=10000 \
--delete-config request.timeout.ms
Example Output
Completed updating config for cluster-link example-link.
To alter an existing link, use kafka-cluster-links
along with bootstrap-server and these flags.
--alter
- (Required) Alter a link.
--cluster-link
(Required) The name of the cluster link to alter.
- Type: string
At least one of the following must be provided:
--add-config
Configurations to add in the form of “key=value” directly on the command line. You can use square brackets to group values which contain commas. For a full list of available configurations, see Link Properties.
- Type: string
--add-config-file
Path to properties file containing configurations to add.
- Type: string
--delete-config
Comma-separated list of configuration keys to delete.
- Type: string
You must have ALTER CLUSTER
authorization to modify the cluster associated with a link, as described in Authorization (ACLs).
Tip
You can change several aspects of a cluster link configuration, but you cannot change its source cluster (bootstrap servers, ID, and so on) or the link name.
Pause and resume the link to verify the new configuration¶
For dynamically updatable configurations (such as SSL) it is possible that some
connections are using an older configuration while others are using a newer one.
The system avoids recreating connections that that are not required for an
action. The API and CLI output will show the latest persisted configurations,
but these may not represent the true state of the configurations after a
kafka-configs --alter
operation.
To confirm that your new configurations have been applied, use cluster.link.paused
to pause and unpause the link to restart it.
To learn more, see cluster.link.paused
in Configuration Options for Cluster Linking.
Deleting a Cluster Link¶
Example Command
kafka-cluster-links --bootstrap-server localhost:9092 \
--delete \
--link-name example-link
Example Output
Cluster link 'example-link' deletion successfully completed.
To delete an existing link, use kafka-cluster-links
along with bootstrap-server and these flags.
--link-name
(Required) The name of the cluster link to describe.
- Type: string
--command-config
Property file containing configurations to be passed to the AdminClient.
- Type: string
--validate-only
- If provided, validates the cluster link deletion but doesn’t apply the delete.
--force
- Force deletion of a link even if there are topics are currently linked with it.
Tip
If the cluster link has outstanding linked topics, then the request may fail
unless the deletion is forced (--force
), in which case the topics that were
linked will be unlinked in the background.
You must have ALTER CLUSTER
authorization to delete a cluster link, as described in Authorization (ACLs).
Note
When a cluster link is deleted, so is the history of any STOPPED
topics. If you need the Last Source Fetch Offset
or the Status Time
of your promoted or failed-over mirror topics, make sure to save those before you delete the cluster link.
Creating a Mirror Topic¶
A mirror topic is a read-only topic that reflects all the data and metadata in another topic.
Creating a mirror topic is similar to creating a normal topic, but requires extra flags. Once a mirror topic is created, the mirror automatically begins fetching data from the source and continues to do so for as long as mirroring is enabled on the topic.
If the source topic is deleted while mirroring is ongoing, then the mirror topic enters into a failed state, where mirroring must either be stopped or the topic deleted.
Example Command
kafka-topics --bootstrap-server localhost:9092 \
--create \
--topic example-topic \
--link-name example-link \
--mirror-topic example-topic
Example Output
Created topic example-topic.
To create a mirror topic, use kafka-cluster-links
along with bootstrap-server and the following flags.
--topic
(Required) The name of the topic to create.
- Type: string
--link-name
(Required) The name of the cluster link used to pull data from the source topic.
- Type: string
--mirror-topic
(Required) The name of the source topic to mirror over the cluster link.
- Type: string
Tip
- For the preview, the value for
--mirror-topic
must match the original--topic
name. - You can list mirrored topics using
kafka-cluster-links --list
along with the--include-topics
flag.
You must have ALTER CLUSTER
authorization to create a mirror topic.
kafka-mirrors command¶
Use kafka-mirrors
to describe and manage topics that are mirrored over a cluster link.
As with other Kafka commands in Confluent Platform, --bootstrap-server
is a required flag for the kafka-mirrors
command.
--bootstrap-server
(Required) The connection string for the broker(s) in a cluster is in the form
host:port
(which can be a comma-separated list for multiple brokers). You must specify the destination cluster where you plan to create mirror topics. The destination cluster must be running Confluent Platform 6.0.0 or later, which is required to support Cluster Linking.- Type: string
- Default: empty string
Use --bootstrap-server
in all of the following implementations of kafka-mirrors
.
Stopping Mirroring on a Topic¶
You can use either the kafka-mirrors --failover
or kafka-mirrors --promote
command to stop mirroring on a topic. Both commands convert the mirror topic
into a regular, writable topic, but each is designed for a specific use case.
- The
promote
command is intended for migrations and, therefore, performs some additional operations before stopping the mirroring. - The
failover
command is intended for disaster recovery, and so takes effect immediately and succeeds automatically. - The
dry-run
option allows you to preview the results of a command before actually executing it.
Examples are shown below.
Tip
kafka-topics --alter --mirror-action stop
is deprecated in Confluent Platform 6.2.0
in favor of promote
and failover
, and will not be available starting
in Confluent Platform 7.0.0. Previous versions of the documentation show the command
syntax for this, but you are encouraged to use promote
and failover
instead.
Promote a topic¶
Use kafka-mirrors --promote
to stop mirroring and convert a mirror topic to a
regular topic in a graceful process typically appropriate for migration scenarios.
This command checks that there is 0 lag between the source topic and
the mirror topic, and does one final sync of the metadata (consumer group offsets
and topic configs) before converting the mirror topic into a regular topic.
Note
The promote
command does not stop data from being produced to the source topic.
It is possible for producers to produce more data to the source topic after the
check for 0 lag succeeded. In that case, the two topics will have diverged, and
that data will not be mirrored to the regular topic (formerly mirrored).
Examples
You can use promote
to specify one topic or multiple topics on the same cluster link
to stop mirroring. The command syntax is:
kafka-mirrors --promote --topics <destination-topic-1> .. <destination-topic-n> \
--link <link name> --bootstrap-server <host:port>
Here is an example of using kafka-mirrors --promote
to stop mirroring on
a topic called example-topic
which uses a link named example-link
.
kafka-mirrors --promote --topics example-topic --link example-link --bootstrap-server localhost:9092
Failover a topic¶
Use kafka-mirrors failover
to stop mirroring and convert a mirror topic to a
regular topic immediately. This is typically appropriate for disaster recovery
where the source cluster fails unexpectedly. The failover
command does not perform
additional checks or syncs. It works like a “forced” version of promote
.
You can specify one topic or multiple topics to stop mirroring on.
Examples
You can use failover
to specify one topic or multiple topics on the same cluster link
to stop mirroring. The command syntax is:
kafka-mirrors --failover --topics <destination-topic-1> .. <destination-topic-n> \
--link <link name> --bootstrap-server <host:port>
Here is an example of using kafka-mirrors --failover
to stop mirroring on
a topic called example-topic
which uses a link named example-link
.
kafka-mirrors --failover --topics example-topic --link example-link --bootstrap-server localhost:9092
Dry run¶
To test the results of either promote
or failover
before executing the commands,
add the --dry-run
flag. For example:
kafka-mirrors --failover --topics example-topic --link example-link --bootstrap-server localhost:9092 --dry-run
Pausing and Resuming a Mirror Topic¶
You can use the pause (kafka-mirrors --pause
) and resume (kafka-mirrors --unpause
)
commands to temporarily pause and resume mirroring.
To pause a mirror topic:
kafka-mirrors --pause --topics example-topic --link example-link --bootstrap-server localhost:9092
To resume a mirror topic use --unpause
:
kafka-mirrors --unpause --topics example-topic --link example-link --bootstrap-server localhost:9092
Migrating Consumer Groups from Source to Destination Cluster¶
To migrate a consumer group across the link, set consumer.offset.sync.enable=true
in your link configuration, specify a group filter in a JSON file, and pass the name of the
file as the value for the --consumer-group-filters-json-file
flag on the kafka-cluster-links command.
You can set this at the time you create the link, or as an update to an existing configuration.
Note
Consumer group filters should only include groups that are not being used on the destination. This will help ensure that the system does not override offsets committed by other consumers on the destination. The system attempts to work around filters containing groups that are also used on the destination, but in these cases there are no guarantees; offsets may be overwritten. For mirror topic “promotion” to work, the system must be able to roll back offsets, which cannot be done if the group is being used by destination consumers.
This example assumes you are migrating group “someGroup” from cluster “broker-west” to cluster “broker-east” and the state before the migration is executed is that you are currently migration all offsets with the following filter set.
{"groupFilters": [
{
"name": "*",
"patternType": "LITERAL",
"filterType": "INCLUDE"
}
]}
To migrate a consumer group from a source cluster to a destination cluster, follow these steps.
Stop the consumer on the source cluster.
Wait for a period of 2x
consumer.offset.sync.ms
.Make sure that Cluster Linking replication is beyond the latest committed offset. You can confirm this with the following commands.
Check the CURRENT-OFFSET on the source cluster.
kafka-consumer-groups --bootstrap-server broker-west:19091 --describe --group someGroup
Your output should resemble the following:
GROUP TOPIC PARTITION CURRENT-OFFSET LOG-END-OFFSET LAG CONSUMER-ID HOST CLIENT-ID someGroup west-offsets 0 100 100 0 - - -
Check the LOG-END-OFFSET on the destination cluster and ensure it is equal or larger than the CURRENT-OFFSET recorded above.
kafka-consumer-groups --bootstrap-server broker-east:19092 --describe --group someGroup
Your output should resemble the following:
GROUP TOPIC PARTITION CURRENT-OFFSET LOG-END-OFFSET LAG CONSUMER-ID HOST CLIENT-ID someGroup west-offsets 0 100 100 0
Verify the current offset is consistent in source and destination using the following commands.
Check the CURRENT-OFFSET on the source cluster.
kafka-consumer-groups --bootstrap-server broker-west:19091 --describe --group someGroup
Your output should resemble the following:
GROUP TOPIC PARTITION CURRENT-OFFSET LOG-END-OFFSET LAG CONSUMER-ID HOST CLIENT-ID someGroup west-offsets 0 100 100 0 - - -
Check the CURRENT-OFFSET on the destination cluster.
kafka-consumer-groups --bootstrap-server broker-east:19092 --describe --group someGroup
Your output should resemble the following:
GROUP TOPIC PARTITION CURRENT-OFFSET LOG-END-OFFSET LAG CONSUMER-ID HOST CLIENT-ID someGroup west-offsets 0 100 100 0
Update the offset migration filters to remove the group from the migration process.
echo "consumer.offset.group.filters={\"groupFilters\": [ \ { \ \"name\": \"*\", \ \"patternType\": \"LITERAL\", \ \"filterType\": \"INCLUDE\" \ }, \ { \ \"name\": \"someGroup\", \ \"patternType\": \"LITERAL\", \ \"filterType\": \"EXCLUDE\" \ } \ ]}" > newFilters.properties kafka-configs --bootstrap-server broker-east:19092 --alter --cluster-link offsets-cluster-link --add-config-file newFilters.properties
Start the consumer on the destination cluster.
Suggested Reading¶
- Blog post: How Krake Makes Floating Workloads on Confluent Platform
- Configuration Options for Cluster Linking
- CLI Tools for Confluent Platform
- Changing Broker Configurations Dynamically (to learn more about using the
kafka-configs
command) - Kafka Consumer Group Command Tool (to learn more about using the
kafka-consumer-groups
command) - Tutorial: Using Cluster Linking for Topic Data Sharing
- Cluster Linking Security