Commands for Cluster Linking¶
Looking for Confluent Cloud Cluster Linking docs? You are currently viewing Confluent Platform documentation. If you are looking for Confluent Cloud docs, check out Cluster Linking on Confluent Cloud.
Important
This feature is available as a preview feature. A preview feature is a component of Confluent Platform that is being introduced to gain early feedback from developers. This feature can be used for evaluation and non-production testing purposes or to provide feedback to Confluent.
Confluent Enterprise 6.0.0 includes a new Kafka command, kafka-cluster-links. In order for this command to work, the destination cluster must be running Confluent Platform 6.0.0 or later, and Cluster Linking must be enabled on the destination cluster.
This section describes the Cluster Linking commands and examples, along with CLI Tools for Confluent Platform relevant to Cluster Linking.
Configuration options for the cluster links are available as values
for flags on the kafka-cluster-links
command. Some of these options
are shown below in the context of command examples. A full list is provided
in reference form in Link Properties.
kafka-cluster-links command¶
Use kafka-cluster-links
to create and manage links across clusters.
As with other Kafka commands in Confluent Platform, --bootstrap-server
is a required flag for the kafka-cluster-links
command.
--bootstrap-server
(Required) The connection string for the broker(s) in a cluster in the form
host:port
(which can be a comma-separated list for multiple brokers). You must specify the destination cluster where you plan to create mirror topics. The destination cluster must be running Confluent Platform 6.0.0 or later, which is required to support Cluster Linking.- Type: string
- Default: empty string
Use --bootstrap-server
in all of the following incarnations of kafka-cluster-links
.
Tip
In most cases, you can provide --bootstrap-server
at the beginning (immediately after kafka-cluster-links
) or at the end of the command.
The examples in the following sections show --bootstrap-server
used immediately after kafka-cluster-links
.
The examples in Cluster Linking Tutorial show it at the end of the commands. An exception to this is
the command to create a cluster link, which requires specifying at least two different values for --bootstrap-server
.
Creating a Cluster Link¶
Example Command
kafka-cluster-links --bootstrap-server localhost:9092 \
--create \
--link-name example-link \
--config bootstrap.servers=example-1:9092,example-2:9092,example-3:9092
Example Output
Cluster link 'example-link' creation successfully completed.
To create a cluster link, use kafka-cluster-links
along with bootstrap-server and the following flags.
--link-name
(Required) The name of the cluster link to create. Must be a unique name within the cluster.
- Type: string
(Required) One of the following parameters must be provided (not both) to specify how the destination cluster
should communicate with the source. The available configurations are those that would be used to configure a client,
including the required bootstrap.servers
and other necessary security and authorization properties.
--config
Comma-separated configurations to be applied to the cluster link on creation of the form “key=value”. When you use this flag, the configurations are specified directly on the command line (as opposed to in a file, as described for the next flag). You can use square brackets to group values that contain commas. For a full list of available configurations, see Link Properties.
- Type: string
--config-file
Property file containing configurations for the cluster link. This is the recommended way to specify cluster link configurations.
- Type: string
--consumer-group-filters-json-file
Path to JSON file to use for configuration of
consumer.offset.group.filters
. To learn more, see Migrating Consumer Groups from Source to Destination Cluster.- Type: string
You must have ALTER CLUSTER
authorization to create a cluster link, as described in Authorization (ACLs) (subsection, ACLs for Link on Source Cluster).
--validate-only
- If provided, validates that the cluster link can be created as specified, but does not create it.
For example, if you specify the following configuration for a secure cluster link in a file named link-config.properties
:
bootstrap.servers=example-1:9092,example-2:9092,example-3:9092
sasl.mechanism=PLAIN
sasl.jaas.config=org.apache.kafka.common.security.plain.PlainLoginModule required username="example-user" password="example-password"
security.protocol=SASL_SSL
ssl.endpoint.identification.algorithm=https
Then, you can create the cluster link example-link
with the following command:
kafka-cluster-links --bootstrap-server localhost:9092 --create --link-name example-link --config-file link-config.properties
Listing Cluster Links¶
Example Command
kafka-cluster-links --list --bootstrap-server localhost:9092
Example Output
Link name: 'example-link', link ID: '123-some-link-id', cluster ID: '123-some-cluster-id'
You can list existing cluster links. The command returns the link name, link ID (an internally allocated unique ID), and cluster ID of the linked cluster.
Tip
To list cluster links, use kafka-cluster-links
along with bootstrap-server and the following flags.
--link-name
If provided, only lists the specified cluster link.
- Type: string
--command-config
If provided, includes a list of all topics that are linked.
- Type: string
--include-topics
If provided, includes a list of all topics that are linked (mirror topics).
- Type: string
You must have DESCRIBE CLUSTER
authorization to list cluster links.
Listing Mirror Topics¶
Example Command
kafka-cluster-links --list --link-name example-link --include-topics --bootstrap-server localhost:9092
Example Output
Link name: 'example-link', link ID: '123-some-link-id', cluster ID: '123-some-cluster-id', topics: [example-topic]
You can use the use kafka-cluster-links --list
to derive a
list of mirror topics. The options are described in the previous section.
This example uses the --include-topics
flag to list cluster links.
Describing a Cluster Link¶
Example Command
kafka-configs --bootstrap-server localhost:9092 \
--describe \
--cluster-link example-link
Example Output
Dynamic configs for cluster-link example-link are:
metadata.max.age.ms=300000 sensitive=false synonyms={}
...
To describe a cluster link, use kafka-cluster-links
along with bootstrap-server and these flags.
--cluster-link
(Required) The name of the cluster link to describe.
- Type: string
--command-config
Property file containing configurations to be passed to the AdminClient.
- Type: string
You must have DESCRIBE CLUSTER
authorization to describe a cluster link.
Altering a Cluster Link¶
Example Command
kafka-configs --bootstrap-server localhost:9092 \
--alter \
--cluster-link example-link \
--add-config cluster.link.retry.timeout.ms=10000 \
--delete-config request.timeout.ms
Example Output
Completed updating config for cluster-link example-link.
To alter an existing link, use kafka-cluster-links
along with bootstrap-server and these flags.
--alter
- (Required) Alter a link.
--cluster-link
(Required) The name of the cluster link to alter.
- Type: string
At least one of the following must be provided:
--add-config
Configurations to add in the form of “key=value” directly on the command line. You can use square brackets to group values which contain commas. For a full list of available configurations, see Link Properties.
- Type: string
--add-config-file
Path to properties file containing configurations to add.
- Type: string
--delete-config
Comma-separated list of configuration keys to delete.
- Type: string
You must have ALTER CLUSTER
authorization to modify the cluster associated with a link, as described in Authorization (ACLs).
Pause and resume the link to verify the new configuration¶
For dynamically updatable configurations (such as SSL) it is possible that some
connections are using an older configuration while others are using a newer one.
The system avoids recreating connections that that are not required for an
action. The API and CLI output will show the latest persisted configurations,
but these may not represent the true state of the configurations after a
kafka-configs --alter
operation.
To confirm that your new configurations have been applied, use cluster.link.paused
to pause and unpause the link to restart it.
To learn more, see cluster.link.paused
in Configuration Options for Cluster Linking.
Deleting a Cluster Link¶
Example Command
kafka-cluster-links --bootstrap-server localhost:9092 \
--delete \
--link-name example-link
Example Output
Cluster link 'example-link' deletion successfully completed.
To delete an existing link, use kafka-cluster-links
along with bootstrap-server and these flags.
--link-name
(Required) The name of the cluster link to describe.
- Type: string
--command-config
Property file containing configurations to be passed to the AdminClient.
- Type: string
--validate-only
- If provided, validates the cluster link deletion but doesn’t apply the delete.
--force
- Force deletion of a link even if there are topics are currently linked with it.
Tip
If the cluster link has outstanding linked topics, then the request may fail
unless the deletion is forced (--force
), in which case the topics that were
linked will be unlinked in the background.
You must have ALTER CLUSTER
authorization to delete a cluster link, as described in Authorization (ACLs).
Creating a Mirror Topic¶
A topic mirror is a read-only topic that reflects all the data and metadata in another topic.
Creating a mirror topic is similar to creating a normal topic, but requires extra flags. Once a mirror topic is created, the mirror automatically begins fetching data from the source and continues to do so for as long as mirroring is enabled on the topic.
If the source topic is deleted while mirroring is ongoing, then the mirror topic enters into a failed state, where mirroring must either be stopped or the topic deleted.
Example Command
kafka-topics --bootstrap-server localhost:9092 \
--create \
--topic example-topic \
--link-name example-link \
--mirror-topic example-topic
Example Output
Created topic example-topic.
To create a mirror topic, use kafka-cluster-links
along with bootstrap-server and the following flags.
--topic
(Required) The name of the topic to create.
- Type: string
--link-name
(Required) The name of the cluster link used to pull data from the source topic.
- Type: string
--mirror-topic
(Required) The name of the source topic to mirror over the cluster link.
- Type: string
Tip
- For the preview, the value for
--mirror-topic
must match the original--topic
name. - You can list mirrored topics using
kafka-cluster-links --list
along with the--include-topics
flag.
You must have ALTER CLUSTER
authorization to create a mirror topic.
Stopping Mirroring on a Topic¶
Example Command
kafka-topics --bootstrap-server localhost:9092 \
--alter --mirror-action stop \
--topic example-topic
Example Output
Topic 'example-topic's mirror was successfully stopped.
To stop mirroring a topic, use kafka-cluster-links
along with bootstrap-server and the following flags.
--topic
(Required) The name of the topic to create.
- Type: string
You must have ALTER CLUSTER
authorization to stop mirroring a topic.
Migrating Consumer Groups from Source to Destination Cluster¶
To migrate a consumer group across the link, set consumer.offset.sync.enable=true
in your link configuration, specify a group filter in a JSON file, and pass the name of the
file as the value for the --consumer-group-filters-json-file
flag on the kafka-cluster-links command.
You can set this at the time you create the link, or as an update to an existing configuration.
This example assumes you are migrating group “someGroup” from cluster “broker-west” to cluster “broker-east” and the state before the migration is executed is that you are currently migration all offsets with the following filter set.
{"groupFilters": [
{
"name": "*",
"patternType": "LITERAL",
"filterType": "INCLUDE"
}
]}
To migrate a consumer group from a source cluster to a destination cluster, follow these steps.
Stop the consumer on the source cluster.
Wait for a period of 2x
consumer.offset.sync.ms
.Make sure that Cluster Linking replication is beyond the latest committed offset. You can confirm this with the following commands.
Check the CURRENT-OFFSET on the source cluster.
kafka-consumer-groups --bootstrap-server broker-west:19091 --describe --group someGroup
Your output should resemble the following:
GROUP TOPIC PARTITION CURRENT-OFFSET LOG-END-OFFSET LAG CONSUMER-ID HOST CLIENT-ID someGroup west-offsets 0 100 100 0 - - -
Check the LOG-END-OFFSET on the destination cluster and ensure it is equal or larger than the CURRENT-OFFSET recorded above.
kafka-consumer-groups --bootstrap-server broker-east:19092 --describe --group someGroup
Your output should resemble the following:
GROUP TOPIC PARTITION CURRENT-OFFSET LOG-END-OFFSET LAG CONSUMER-ID HOST CLIENT-ID someGroup west-offsets 0 100 100 0
Verify the current offset is consistent in source and destination using the following commands.
Check the CURRENT-OFFSET on the source cluster.
kafka-consumer-groups --bootstrap-server broker-west:19091 --describe --group someGroup
Your output should resemble the following:
GROUP TOPIC PARTITION CURRENT-OFFSET LOG-END-OFFSET LAG CONSUMER-ID HOST CLIENT-ID someGroup west-offsets 0 100 100 0 - - -
Check the CURRENT-OFFSET on the destination cluster.
kafka-consumer-groups --bootstrap-server broker-east:19092 --describe --group someGroup
Your output should resemble the following:
GROUP TOPIC PARTITION CURRENT-OFFSET LOG-END-OFFSET LAG CONSUMER-ID HOST CLIENT-ID someGroup west-offsets 0 100 100 0
Update the offset migration filters to remove the group from the migration process.
echo "consumer.offset.group.filters={\"groupFilters\": [ \ { \ \"name\": \"*\", \ \"patternType\": \"LITERAL\", \ \"filterType\": \"INCLUDE\" \ }, \ { \ \"name\": \"someGroup\", \ \"patternType\": \"LITERAL\", \ \"filterType\": \"EXCLUDE\" \ } \ ]}" > newFilters.properties kafka-configs --bootstrap-server broker-east:19092 --alter --cluster-link offsets-cluster-link --add-config-file newFilters.properties
Start the consumer on the destination cluster.
Suggested Reading¶
- Blog post: How Krake Makes Floating Workloads on Confluent Platform
- Configuration Options for Cluster Linking
- CLI Tools for Confluent Platform
- Changing Broker Configurations Dynamically (to learn more about using the
kafka-configs
command) - Kafka Consumer Group Command Tool (to learn more about using the
kafka-consumer-groups
command)
- Changing Broker Configurations Dynamically (to learn more about using the
- Cluster Linking Tutorial
- Cluster Linking Security