Command Reference for Cluster Linking on Confluent Platform¶
Confluent Enterprise now includes a feature called Cluster Linking that syncs data and metadata from Confluent Platform, Confluent Cloud, or Kafka clusters to Confluent Platform and Confluent Cloud clusters.
Cluster Linking geo-replicates data using two key abstractions:
- a cluster link, which connects a “source cluster” and a “destination cluster”
- mirror topics, which are topics on a cluster link’s destination cluster that are identical copies of topics on the source cluster.
Cluster links and mirror topics can be created and managed using either a REST API in the Kafka REST v3 Proxy, or using CLI commands in a terminal. This section describes the commands for creating and managing cluster links and mirror topics.
CLI commands¶
Configuration options for the cluster links are available as values
for flags on the kafka-cluster-links
command. Some of these options
are shown below in the context of command examples. A full list is provided
in reference form in Link Properties.
Use kafka-cluster-links
to create and manage links across clusters.
About bootstrap-server¶
As with other Kafka commands in Confluent Platform, --bootstrap-server
is a required flag for the kafka-cluster-links
command.
--bootstrap-server
(Required) The connection string for the broker(s) in a cluster in the form
host:port
(which can be a comma-separated list for multiple brokers). You must specify the destination cluster where you plan to create mirror topics. The destination cluster must be running Confluent Platform 7.0.0 or later, which is required to support Cluster Linking.- Type: string
- Default: empty string
Use --bootstrap-server
in all of the following implementations of kafka-cluster-links
.
For Cluster Linking, you should typically specify the --bootstrap-server
on the destination cluster (with one exception, noted below).
Here is a quick summary:
- For
kafka-mirrors
,--bootstrap-server
is always the destination cluster. - For
kafka-cluster-links
,--bootstrap-server
is usually the destination cluster.
The only exception for a source-initiated link (recommended for Cluster Linking from a Confluent Platform source to a Confluent Cloud destination).
In that case, you would create a cluster link object twice and the second time requires executing kafka-cluster-links
with the source cluster as the bootstrap server.
For common use cases, see:
- Tutorial: Share Data Across Topics Using Cluster Linking for Confluent Platform
- Tutorial: Migrate Data with Cluster Linking on Confluent Platform
For examples of use cases with source-initiated links, see: - Tutorial: Link Confluent Platform and Confluent Cloud Clusters
Tip
In most cases, you can provide --bootstrap-server
at the beginning (immediately after kafka-cluster-links
) or at the end of the command.
The examples in the following sections show --bootstrap-server
used immediately after kafka-cluster-links
.
Creating a cluster link¶
Example Command
kafka-cluster-links --bootstrap-server localhost:9093 \
--create \
--link example-link \
--config-file example-link.config
Example Output
Cluster link 'example-link' creation successfully completed.
Tip
You must have ALTER CLUSTER
authorization to create a cluster link, as described in Authorization (ACLs)
(subsection, ACLs for link on source cluster). You can configure this using kafka-configs
, as described in Altering a cluster link.
To create a cluster link, use kafka-cluster-links
along with bootstrap-server and the following flags.
--link
(Required) The name of the cluster link to create. Must be a unique cluster link name within the cluster.
- Type: string
--cluster-id
(Required) The ID of the source cluster to link to. You can find a cluster’s ID with the CLI command
kafka-cluster cluster-id
.- Type: string
(Required) One of the following parameters must be provided (not both) to specify how the destination cluster
should communicate with the source. The available configurations are those that would be used to configure a client,
including the required bootstrap.servers
and other necessary security and authorization properties.
--config
Comma-separated configurations to be applied to the cluster link on creation of the form “key=value”. When you use this flag, the configurations are specified directly on the command line (as opposed to in a file, as described for the next flag). You can use square brackets to group values that contain commas. For a full list of available configurations, see Link Properties.
- Type: string
--config-file
Property file containing configurations for the cluster link. This is the recommended way to specify cluster link configurations.
- Type: string
For example, if you specify the following configuration for a secure cluster link in a file named link-config.properties
:
bootstrap.servers=example-1:9092,example-2:9092,example-3:9092
sasl.mechanism=PLAIN
sasl.jaas.config=org.apache.kafka.common.security.plain.PlainLoginModule required username="example-user" password="example-password"
security.protocol=SASL_SSL
ssl.endpoint.identification.algorithm=https
Then, you can create the cluster link example-link
with the following command:
kafka-cluster-links --bootstrap-server localhost:9093 --create --link example-link --config-file link-config.properties --cluster-id pz-s7W72Sdm7A11wzku9gA
Optional configurations:
--command-config
- Property file containing configurations to be passed to the AdminClient. For example, with security credentials for authorization and authentication.
--consumer-group-filters-json
JSON string to use for configuration of
consumer.offset.group.filters
. To learn more, see Migrating consumer groups from source to destination cluster.- Type: string
--consumer-group-filters-json-file
Path to JSON file to use for configuration of
consumer.offset.group.filters
. To learn more, see Migrating consumer groups from source to destination cluster.- Type: string
--acl-filters-json-file
Path to the ACL filters JSON file to use for configuration of
acl.filters
. To learn more, see Migrating ACLs from Source to Destination Cluster.- Type: string
--validate-only
- If provided, validates that the cluster link can be created as specified, but does not create it.
--exclude-validate-link
- If provided, creates the link without validating that the source cluster can be reached. This is helpful only if the source cluster is not yet running or reachable. If the source cluster is running and available, using this option is not recommended, as it skips helpful validations.
--topic-filters-json
- JSON string to use for configuration of
auto.create.mirror.topics.filters
. To learn more, see Mirror Topics. --topic-filters-json-file
- Path to JSON file to use for configuration of
auto.create.mirror.topics.filters
. To learn more, see Mirror Topics.
Listing cluster links¶
Example Command
kafka-cluster-links --list --bootstrap-server localhost:9093
Example Output
Link name: 'example-link', link ID: '123-some-link-id', remote cluster ID: '123-some-cluster-id', local cluster ID: ', local cluster ID: '456-some-other-cluster-id'', remote cluster available: 'true'
You can list existing cluster links. The command returns the link name, link ID (an internally allocated unique ID), the cluster ID of the linked cluster, and whether the linked cluster is available or not.
Tip
To list cluster links, use kafka-cluster-links
along with bootstrap-server and the following flags.
--link
If provided, only lists the specified cluster link.
- Type: string
--command-config
Property file containing configurations to be passed to the AdminClient. For example, with security credentials for authorization and authentication.
- Type: string
--include-topics
If provided, includes a list of all mirror topics on this cluster link.
- Type: string
You must have DESCRIBE CLUSTER
authorization to list cluster links.
Describing a cluster link¶
Example Command
kafka-configs --bootstrap-server localhost:9093 \
--describe \
--cluster-link example-link
Example Output
Dynamic configs for cluster-link example-link are:
metadata.max.age.ms=300000 sensitive=false synonyms={}
...
To describe a cluster link, use kafka-cluster-links
along with bootstrap-server and these flags.
--cluster-link
(Required) The name of the cluster link to describe.
- Type: string
--command-config
Property file containing configurations to be passed to the AdminClient. For example, with security credentials for authorization and authentication.
- Type: string
You must have DESCRIBE CLUSTER
authorization to describe a cluster link.
Altering a cluster link¶
Example Command
kafka-configs --bootstrap-server localhost:9093 \
--alter \
--cluster-link example-link \
--add-config cluster.link.retry.timeout.ms=10000 \
--delete-config request.timeout.ms
Example Output
Completed updating config for cluster-link example-link.
To alter an existing link, use kafka-configs
along with bootstrap-server and these flags.
--alter
- (Required) Alter a link.
--cluster-link
(Required) The name of the cluster link to alter.
- Type: string
--command-config
- Property file containing configurations to be passed to the AdminClient. For example, with security credentials for authorization and authentication.
At least one of the following must be provided:
--add-config
Configurations to add in the form of “key=value” directly on the command line. You can use square brackets to group values which contain commas. For a full list of available configurations, see Link Properties.
- Type: string
--add-config-file
Path to properties file containing configurations to add.
- Type: string
--delete-config
Comma-separated list of configuration keys to delete.
- Type: string
You must have ALTER CLUSTER
authorization to modify the cluster associated with a link, as described in Authorization (ACLs).
Tip
When updating the configuration for an existing cluster link, pass in only those configs that change. Be especially mindful when you are using a config file with
--add-config-file
(where it would be easy to pass in a full set of configs) that it contains only the configs you want to update. For example,my-update-configs.txt
might include:consumer.offset.sync.ms=25384 topic.config.sync.ms=38254
You can change several aspects of a cluster link configuration, but you cannot change its source cluster (source cluster ID), prefix, or the link name.
Pause and resume the link to verify the new configuration¶
For dynamically updatable configurations (such as SSL) it is possible that some
connections are using an older configuration while others are using a newer one.
The system avoids recreating connections that that are not required for an
action. The API and CLI output will show the latest persisted configurations,
but these may not represent the true state of the configurations after a
kafka-configs --alter
operation.
To confirm that your new configurations have been applied, use cluster.link.paused
to pause and unpause the link to restart it.
To learn more, see cluster.link.paused
in Configure Cluster Linking on Confluent Platform.
Deleting a cluster link¶
Example Command
kafka-cluster-links --bootstrap-server localhost:9093 \
--delete \
--link example-link
Example Output
Cluster link 'example-link' deletion successfully completed.
To delete an existing link, use kafka-cluster-links
along with bootstrap-server and these flags.
--link
(Required) The name of the cluster link to describe.
- Type: string
--command-config
Property file containing configurations to be passed to the AdminClient. For example, with security credentials for authorization and authentication.
- Type: string
--validate-only
- If provided, validates the cluster link deletion but doesn’t apply the delete.
--force
- Force deletion of a link even if there are mirror topics are currently linked with it.
Tip
If the cluster link has outstanding mirror topics, then the request may fail
unless the deletion is forced (--force
), in which case the mirror topics that were
associated with the cluster link will be unlinked in the background.
You must have ALTER CLUSTER
authorization to delete a cluster link, as described in Authorization (ACLs).
Important
- When deleting a cluster link, first check that all mirror topics are in the
STOPPED
state. If any are in thePENDING_STOPPED
state, deleting a cluster link can cause irrecoverable errors on those mirror topics due to a temporary limitation. - When a cluster link is deleted, so is the history of any
STOPPED
topics. If you need theLast Source Fetch Offset
or theStatus Time
of your promoted or failed-over mirror topics, make sure to save those before you delete the cluster link. - You cannot delete a cluster link that still has mirror topics on it (the delete operation will fail).
- If you are using Confluent for Kubernetes (CFK), and you delete your cluster link resource, any mirror topics still attached to that cluster link
will be forcibly converted to regular topics by use of the
failover
API. To learn more, see Modify a mirror topic in Cluster Linking using Confluent for Kubernetes.
Creating a mirror topic¶
A mirror topic is a read-only topic that reflects all the data and metadata in another topic.
Creating a mirror topic with the CLI uses the kafka-mirrors
tool.
Once a mirror topic is created, the mirror automatically begins
fetching data from the source topic.
For more information, see Mirror Topics.
Example Command
kafka-mirrors --create --mirror-topic example-topic \
--link demo-link \
--bootstrap-server localhost:9093
Example Output
Created topic example-topic.
To create a mirror topic, use kafka-cluster-links
along with bootstrap-server and the following flags.
--mirror-topic
(Required) The name of the mirror topic to create. This must match exactly the name of the source topic to mirror over the cluster link.
- Type: string
--link
(Required) The name of the cluster link used to pull data from the source topic.
- Type: string
Tip
- The value for
--mirror-topic
must match the original--topic
name. - You can list mirrored topics using
kafka-cluster-links --list
along with the--include-topics
flag.
--command-config
- Property file containing configurations to be passed to the AdminClient. For example, with security credentials for authorization and authentication.
The following are optional configurations when creating a mirror topic:
--config
A comma-separated list of configs to override when creating the mirror topic. Each config to override should be specified as
name=value
. For more information about which configurations can be set on a mirror topic, see Configurations in Mirror Topics.- Type: string
--replication-factor
The replication factor of the mirror topic being created. If not supplied, defaults to the destination cluster’s default, not the source topic’s replication factor.
- Type: string
--source-topic
The name of the source topic to mirror. Required if the cluster link has a prefix configured. To learn more, see Prefixing Mirror Topics and Consumer Group Names.
- Type: string
You must have ALTER CLUSTER
authorization to create a mirror topic.
Listing mirror topics¶
To list mirror topics on a cluster across all cluster links, use kafka-mirrors --list
. This command
can either list the mirror topics for a specific cluster link, or can list all mirror topics across all
cluster links on the cluster.
Example Command
kafka-mirrors --list --bootstrap-server localhost:9093
Example Output
topic1
topic2
topic3
topic4
These parameters can be added to the command:
--link
(optional) The name of the cluster link to filter on. If provided, only mirror topics on this cluster link will be listed.
- Type: string
--include-stopped
- (optional) If this flag is added, the list will include any topics that were formerly mirror topics
but were stopped by a
promote
orfailover
command. This flag does not take an argument.
Describing mirror topics¶
Use kafka-mirrors
to describe and manage topics that are mirrored over a cluster link.
kafka-mirrors --describe --bootstrap-server pkc-nwnyk.us-west-2.aws.confluent.cloud:9092 --command-config lkc-rn220.config --link onprem-to-cloud
Topic: web.orders.modified LinkName: onprem-to-cloud LinkId: 2c3dee72-f54b-4bb3-9694-cf29caaf3637 MirrorTopic: web.orders.modified State: ACTIVE StateTime: 2021-11-10 15:33:29
Partition: 0 State: ACTIVE DestLogEndOffset: 114123 LastFetchSourceHighWatermark: 114123 Lag: 0 TimeSinceLastFetchMs: 8385101
Partition: 1 State: ACTIVE DestLogEndOffset: 115278 LastFetchSourceHighWatermark: 115278 Lag: 0 TimeSinceLastFetchMs: 8387954
Partition: 2 State: ACTIVE DestLogEndOffset: 112210 LastFetchSourceHighWatermark: 112210 Lag: 0 TimeSinceLastFetchMs: 8508856
Partition: 3 State: ACTIVE DestLogEndOffset: 120887 LastFetchSourceHighWatermark: 120887 Lag: 0 TimeSinceLastFetchMs: 8389749
Partition: 4 State: ACTIVE DestLogEndOffset: 109225 LastFetchSourceHighWatermark: 109225 Lag: 0 TimeSinceLastFetchMs: 8385101
Partition: 5 State: ACTIVE DestLogEndOffset: 111669 LastFetchSourceHighWatermark: 111669 Lag: 0 TimeSinceLastFetchMs: 8387954
Topic: web.orders LinkName: onprem-to-cloud LinkId: 2c3dee72-f54b-4bb3-9694-cf29caaf3637 MirrorTopic: web.orders State: ACTIVE StateTime: 2021-11-10 15:33:29
Partition: 0 State: ACTIVE DestLogEndOffset: 294760 LastFetchSourceHighWatermark: 294760 Lag: 0 TimeSinceLastFetchMs: 8387954
Partition: 1 State: ACTIVE DestLogEndOffset: 285862 LastFetchSourceHighWatermark: 285862 Lag: 0 TimeSinceLastFetchMs: 8508856
Partition: 2 State: ACTIVE DestLogEndOffset: 284891 LastFetchSourceHighWatermark: 284891 Lag: 0 TimeSinceLastFetchMs: 8389749
Partition: 3 State: ACTIVE DestLogEndOffset: 285982 LastFetchSourceHighWatermark: 285982 Lag: 0 TimeSinceLastFetchMs: 8385101
Partition: 4 State: ACTIVE DestLogEndOffset: 277379 LastFetchSourceHighWatermark: 277379 Lag: 0 TimeSinceLastFetchMs: 8387954
Partition: 5 State: ACTIVE DestLogEndOffset: 283731 LastFetchSourceHighWatermark: 283731 Lag: 0 TimeSinceLastFetchMs: 8508856
Topic: inventory.shipments LinkName: onprem-to-cloud LinkId: 2c3dee72-f54b-4bb3-9694-cf29caaf3637 MirrorTopic: inventory.shipments State: ACTIVE StateTime: 2021-11-10 15:51:21
As with other Kafka commands in Confluent Platform, --bootstrap-server
is a required flag for the kafka-mirrors
command.
--bootstrap-server
(Required) The connection string for the broker(s) in a cluster is in the form
host:port
(which can be a comma-separated list for multiple brokers). You must specify the destination cluster where you plan to create mirror topics. The destination cluster must be running Confluent Platform 6.0.0 or later, which is required to support Cluster Linking.- Type: string
- Default: empty string
These parameters can be added to the command:
--link
(optional) The name of the cluster link to filter on. If provided, only mirror topics on this cluster link will be listed.
- Type: string
--include-stopped
- (optional) If this flag is added, the list will include any topics that were formerly mirror topics
but were stopped by a
promote
orfailover
command. This flag does not take an argument.
Stopping mirroring on a topic¶
You can use either the kafka-mirrors --failover
or kafka-mirrors --promote
command to stop mirroring on a topic. Both commands permanently convert the mirror topic
into a regular, writable topic, but each is designed for a specific use case.
- The
promote
command is intended for migrations and, therefore, performs some additional verification and operations before stopping the mirroring. - The
failover
command is intended for disaster recovery, and so takes effect immediately and always succeeds. No additional operations are performed. - The
dry-run
option allows you to preview the results of a command before actually executing it.
You can perform promote
and failover
commands on multiple topics at the same
time by passing in a list of topic names using the --topics
flag, and a comma-separated
list of topic names. Even if you are only promoting 1 topic, you still use the --topics
(plural) flag. For example:
--topics topic1
--topics topic1,topic2,topic3
More examples are shown below.
Promote a topic¶
Use kafka-mirrors --promote
to stop mirroring and convert a mirror topic to a
regular topic in a graceful process typically appropriate for migration scenarios.
This command checks that there is 0 lag between the source topic and
the mirror topic, and does one final sync of the metadata (consumer group offsets
and topic configs) before converting the mirror topic into a regular topic.
Note
- The
promote
command does not stop data from being produced to the source topic. It is possible for producers to produce more data to the source topic after the check for 0 lag succeeded. In that case, the two topics will have diverged, and that data will not be mirrored to the regular topic (formerly mirrored). - If there is lag between the source topic and the mirror topic at the time that the promote command is run, the promote command will fail.
Examples
You can use promote
to specify one topic or multiple topics on the same cluster link
to stop mirroring. The command syntax is:
kafka-mirrors --promote --topics <destination-topic-1> .. <destination-topic-n> --bootstrap-server <host:port>
Here is an example of using kafka-mirrors --promote
to stop mirroring on
a topic called example-topic
which uses a link named example-link
.
kafka-mirrors --promote --topics example-topic --bootstrap-server localhost:9093
Failover a topic¶
Use kafka-mirrors failover
to stop mirroring and convert a mirror topic to a
regular topic immediately. This is typically appropriate for disaster recovery
where the source cluster fails unexpectedly. The failover
command does not perform
additional checks or syncs. It works like a “forced” version of promote
.
You can specify one topic or multiple topics to stop mirroring on.
Examples
You can use failover
to specify one topic or multiple topics on the same cluster link
to stop mirroring. The command syntax is:
kafka-mirrors --failover --topics <destination-topic-1> .. <destination-topic-n> --bootstrap-server <host:port>
Here is an example of using kafka-mirrors --failover
to stop mirroring on
a topic called example-topic
which uses a link named example-link
.
The link will be inferred; it should not be included in the command, as shown below.
kafka-mirrors --failover --topics example-topic --bootstrap-server localhost:9093
Validate only (dry run)¶
To test the results of either promote
or failover
before executing the commands,
add the --validate-only
flag. For example:
kafka-mirrors --failover --topics example-topic --bootstrap-server localhost:9093 --validate-only
Pausing and resuming a mirror topic¶
You can use the pause (kafka-mirrors --pause
) and resume (kafka-mirrors --unpause
)
commands to temporarily pause and resume mirroring.
To pause a mirror topic:
kafka-mirrors --pause --topics example-topic --bootstrap-server localhost:9093
To resume a mirror topic use --unpause
:
kafka-mirrors --unpause --topics example-topic --bootstrap-server localhost:9093
Migrating consumer groups from source to destination cluster¶
To migrate a consumer group across the link, set consumer.offset.sync.enable=true
in your link configuration, specify a group filter in a JSON file, and pass the name of the
file as the value for the --consumer-group-filters-json-file
flag on the CLI commands.
You can set this at the time you create the link, or as an update to an existing configuration.
Note
Consumer group filters should only include groups that are not being used on the destination. This will help ensure that the system does not override offsets committed by other consumers on the destination. The system attempts to work around filters containing groups that are also used on the destination, but in these cases there are no guarantees; offsets may be overwritten. For mirror topic “promotion” to work, the system must be able to roll back offsets, which cannot be done if the group is being used by destination consumers.
This example assumes you are migrating group “someGroup” from cluster “broker-west” to cluster “broker-east” and the state before the migration is executed is that you are currently migration all offsets with the following filter set.
{"groupFilters": [
{
"name": "*",
"patternType": "LITERAL",
"filterType": "INCLUDE"
}
]}
To migrate a consumer group from a source cluster to a destination cluster, follow these steps.
Stop the consumer on the source cluster.
Wait for a period of 2x
consumer.offset.sync.ms
.Make sure that Cluster Linking replication is beyond the latest committed offset. You can confirm this with the following commands.
Check the CURRENT-OFFSET on the source cluster.
kafka-consumer-groups --bootstrap-server broker-west:19091 --describe --group someGroup
Your output should resemble the following:
GROUP TOPIC PARTITION CURRENT-OFFSET LOG-END-OFFSET LAG CONSUMER-ID HOST CLIENT-ID someGroup west-offsets 0 100 100 0 - - -
Check the LOG-END-OFFSET on the destination cluster and ensure it is equal or larger than the CURRENT-OFFSET recorded above.
kafka-consumer-groups --bootstrap-server broker-east:19092 --describe --group someGroup
Your output should resemble the following:
GROUP TOPIC PARTITION CURRENT-OFFSET LOG-END-OFFSET LAG CONSUMER-ID HOST CLIENT-ID someGroup west-offsets 0 100 100 0
Verify the current offset is consistent in source and destination using the following commands.
Check the CURRENT-OFFSET on the source cluster.
kafka-consumer-groups --bootstrap-server broker-west:19091 --describe --group someGroup
Your output should resemble the following:
GROUP TOPIC PARTITION CURRENT-OFFSET LOG-END-OFFSET LAG CONSUMER-ID HOST CLIENT-ID someGroup west-offsets 0 100 100 0 - - -
Check the CURRENT-OFFSET on the destination cluster.
kafka-consumer-groups --bootstrap-server broker-east:19092 --describe --group someGroup
Your output should resemble the following:
GROUP TOPIC PARTITION CURRENT-OFFSET LOG-END-OFFSET LAG CONSUMER-ID HOST CLIENT-ID someGroup west-offsets 0 100 100 0
Update the offset migration filters to remove the group from the migration process.
echo "consumer.offset.group.filters={\"groupFilters\": [ \ { \ \"name\": \"*\", \ \"patternType\": \"LITERAL\", \ \"filterType\": \"INCLUDE\" \ }, \ { \ \"name\": \"someGroup\", \ \"patternType\": \"LITERAL\", \ \"filterType\": \"EXCLUDE\" \ } \ ]}" > newFilters.properties kafka-configs --bootstrap-server broker-east:19092 --alter --cluster-link offsets-cluster-link --add-config-file newFilters.properties
Start the consumer on the destination cluster.
REST API commands¶
The Cluster Linking REST API is documented in the REST API Proxy v3 Documentation.