Configure and Manage Cluster Links on Confluent Cloud¶
You can create, view, manage, and delete cluster links using the unified Confluent CLI and the Confluent Cloud Cluster Linking (v3) REST API.
Requirements to Create a Cluster Link¶
To create a cluster link, you will need:
- Access to the Destination cluster. Cluster links are created on the destination cluster. For the specific security authorization needed, see Manage Security for Cluster Linking on Confluent Cloud.
- The name you wish to give the cluster link.
- The Source cluster’s bootstrap server and cluster ID.
- If the Source cluster is a Confluent Cloud cluster, you can get those using these commands:
confluent kafka cluster list
to get the Source cluster’s ID. For example,lkc-12345
confluent kafka cluster describe <source-cluster-id>
to get the bootstrap server. It will be the value of the row calledEndpoint
. For example,SASL_SSL://pkc-12345.us-west-2.aws.confluent.cloud:9092
- If the Source cluster is a Confluent Platform or Apache Kafka® cluster, your system administrator can give you your bootstrap server.
You can get your cluster ID by doing
confluent cluster describe
orzookeeper-shell <bootstrap-server> get /cluster/id 2> /dev/null
. For example,0gXf5xg8Sp2G4FlxNriNaA
. - Authentication credentials for the cluster link to use with the Source cluster. These can be updated, allowing you to rotate credentials used by your cluster link. See the section on configuring security credentials for more details.
- If the Source cluster is a Confluent Cloud cluster, you can get those using these commands:
- (Optional) Any additional configuration parameters you wish to add to the cluster link. See the list of possible configurations below.
You will then pass this information into the CLI or API command to create a cluster link.
Note
You cannot update the cluster link’s name, Source cluster ID, or Destination cluster ID. These values are only set at creation.
Managing Cluster Links with the CLI¶
To create or configure a cluster link with the Confluent Cloud CLI, you will need to create
a config file with the configurations you wish to set. Each configuration should be
on one line in the file, with the format key=value
, where key
is the name of the
config, and value
is the value you wish to set on that cluster link. In the following
commands, the location of this file is referred to as <config-file>
.
Create a cluster link¶
To create a cluster link with the confluent
CLI, use this command on the Destination cluster:
confluent kafka link create <link-name> \
--source-cluster <source-cluster-id> \
--source-bootstrap-server <source-bootstrap-server> \
--config-file <config-file> \
--cluster <destination-cluster-id>
Tip
--source-cluster-id
was replaced with --source-cluster
in version 3 of
confluent CLI,
as described in the command reference for confluent kafka link create.
Update a cluster link¶
To update the configuration for an existing cluster link with the cloud
CLI,
use this command on the Destination cluster:
confluent kafka link update <link-name> --config-file my-update-configs.txt --cluster <destination-cluster-id>
Tip
When updating the configuration for an existing cluster link, pass in only those configs that change. Be especially mindful when you are using a
--config-file
(where it would be easy to pass in a full set of configs) that it contains only the configs you want to update. For example,my-update-configs.txt
might include:consumer.offset.sync.ms=25384 topic.config.sync.ms=38254
You can change several aspects of a cluster link configuration, but you cannot change its source cluster (source cluster ID), prefix, or the link name.
Examples of creating and configuring cluster links with the CLI are shown in the Cluster Linking Quick Start, and the tutorials for Share Data Across Clusters, Regions, and Clouds using Confluent Cloud and Cluster Linking Disaster Recovery and Failover on Confluent Cloud.
List all cluster links going to the destination cluster¶
To see a list of all cluster links going to a Destination cluster, use the command:
confluent kafka link list --cluster <destination-cluster-id>
View the configuration for a given cluster link¶
To view the configuration of a specific cluster link , use the command:
confluent kafka link describe <link-name> --cluster <destination-cluster-id>
Delete a cluster link¶
To delete a cluster link, use the command:
confluent kafka link delete <link-name> --cluster <destination-cluster-id>
Tip
Using the command confluent kafka cluster use <destination-cluster-id>
sets the Destination cluster
to the active cluster, so you won’t need to specify --cluster <destination-cluster-id>
in any commands.
Important
- When deleting a cluster link, first check that all mirror topics are in the
STOPPED
state. If any are in thePENDING_STOPPED
state, deleting a cluster link can cause irrecoverable errors on those mirror topics due to a temporary limitation. - When a cluster link is deleted, the history of any
STOPPED
topics is also deleted. If you need theLast Source Fetch Offset
or theStatus Time
of your promoted or failed-over mirror topics, make sure to save those before you delete the cluster link. - You cannot delete a cluster link that still has mirror topics on it (the delete operation will fail).
- If you are using Confluent for Kubernetes (CFK), and you delete your cluster link resource, any mirror topics still attached to that cluster link
will be forcibly converted to regular topics by use of the
failover
API. To learn more, see Modify a mirror topic in Cluster Linking using Confluent for Kubernetes. - Cluster links are automatically deleted if the destination cluster is deleted.
Managing Cluster Links with the REST API¶
REST API calls and examples are provided in the reference documentation for the Confluent Cloud Cluster Linking (v3) REST API.
Tip
To learn how to authenticate with the REST API, see the REST API documentation.
To use the Confluent Cloud REST API to configure cluster links, you can pass in configs in JSON format. Each config will take this shape:
{
"name": "<config-name>",
"value": <config-value>
}
Since this is JSON, if your config value is a string, surround it in double quotes. Then, pass in the name/value pairs
as an array to the configs
argument of the JSON payload.
For example, this JSON enables consumer offset sync for all consumer groups and ACL sync for all ACLs.
{
"configs": [
{
"name": "consumer.offset.sync.enable",
"value": "true"
},
{
"name": "consumer.offset.sync.filters",
"value": "{\"groupFilters\": [{\"name\": \"*\",\"patternType\": \"LITERAL\",\"filterType\": \"INCLUDE\"}]}"
},
{
"name": "acl.sync.enable",
"value": "true"
},
{
"name": "acl.filters",
"value": "{ \"aclFilters\": [ { \"resourceFilter\": { \"resourceType\": \"any\", \"patternType\": \"any\" }, \"accessFilter\": { \"operation\": \"any\", \"permissionType\": \"any\" } } ] }"
}
]
}
Creating a Cluster Link through the REST API¶
See Create a cluster link in the Confluent Cloud REST API documentation.
To create a cluster link using the REST API, the JSON payload needs:
an entry called
source_cluster_id
set to your Source cluster’s IDa config in the
configs
array calledbootstrap.servers
set to your Source cluster’s bootstrap serveryour cluster link’s security configuration listed as configs in the
configs
arrayTip
The security configuration must include the API key and secret associated with the cluster resource against which you are running the REST API commands, not the Cloud user API key. For more information on creating a resource-specific key, see Create a resource-specific API key.
(optional) additional configurations in the
configs
array
Example JSON payload for a request to create a cluster link that syncs consumers offsets for all consumer groups, and syncs all ACLs:
{
"source_cluster_id": "<source-cluster-id>",
"configs": [
{
"name": "security.protocol",
"value": "SASL_SSL"
},
{
"name": "bootstrap.servers",
"value": "<source-bootstrap-server>"
},
{
"name": "sasl.mechanism",
"value": "PLAIN"
},
{
"name": "sasl.jaas.config",
"value": "org.apache.kafka.common.security.plain.PlainLoginModule required username=\"<source-api-key>\" password=\"<source-api-secret>\";"
}
]
}
Include this JSON payload in a POST request to:
<REST-Endpoint>/kafka/v3/clusters/<cluster-ID>/links/?link_name=<link-name>
Where you replace the following:
<REST-Endpoint>
is your cluster’s REST url. You can find this withconfluent kafka cluster describe <cluster-ID>
. It wil look like:https://pkc-XXXXX.us-west1.gcp.confluent.cloud:443
<cluster-ID>
is your destination cluster’s ID. You can find this withconfluent kafka cluster list
. It will look likelkc-XXXXX
.<link-name>
is the name of your cluster link. You can name it whatever you choose.
Updating and Viewing Cluster Link Configs with the REST API¶
Update an existing cluster link¶
See Alter the config under a cluster link n the Confluent Cloud REST API documentation.
To update an existing link, you will send the JSON payload described above as a PUT request
to <REST-Endpoint>/clusters/<cluster-ID>/links/<link-name>/configs:alter
.
Get the value a cluster link configuration¶
See Describe a cluster link in the Confluent Cloud REST API documentation.
To get the value of your cluster link’s configurations using the REST API, use one of these options:
- GET
<REST-Endpoint>/clusters/<cluster-ID>/links/<link-name>/configs
to get the full list of configs. - GET
<REST-Endpoint>/clusters/<cluster-ID>/links/<link-name>/<config-name>
to get the value of a specific config.
List all cluster links going to the destination cluster¶
See List all cluster links in the destination cluster in the Confluent Cloud REST API documentation.
To view a list of all cluster links going to a Destination cluster, send a GET request to
<REST-Endpoint>/clusters/<cluster-ID>/links
.
Delete a cluster link¶
See Delete the cluster link in the Confluent Cloud REST API documentation.
To delete a cluster link, send a DELETE request to <REST-Endpoint>/clusters/<cluster-ID>/links/<link-name>
.
Important
- When deleting a cluster link, first check that all mirror topics are in the
STOPPED
state. If any are in thePENDING_STOPPED
state, deleting a cluster link can cause irrecoverable errors on those mirror topics due to a temporary limitation. - When a cluster link is deleted, the history of any
STOPPED
topics is also deleted. If you need theLast Source Fetch Offset
or theStatus Time
of your promoted or failed-over mirror topics, make sure to save those before you delete the cluster link. - You can’t directly call the “delete cluster link” command on a cluster link that still has mirror topics on it (it will fail).
- If you are using Confluent for Kubernetes (CFK), and you delete your cluster link resource, any mirror topics still attached to that cluster link
will be forcibly converted to regular topics by use of the
failover
API. To learn more, see Modify a mirror topic in Cluster Linking using Confluent for Kubernetes. - Cluster links are automatically deleted if the destination cluster is deleted.
Configuring Cluster Link Behavior¶
These properties are available to control the behavior of a cluster link that goes to a Confluent Cloud destination cluster.
If you disable a feature that has filters (ACL sync, consumer offset sync, auto create mirror topics) after having it enabled initially, then any existing filters will be cleared (deleted) from the cluster link.
acl.filters
JSON string that lists the ACLs to migrate. Define the ACLs in a file,
acl.filters.json
, and pass the file name as an argument to--acl-filters-json-file
.- Type: string
- Default: “”
Note
- Populate
acl.filters
by passing a JSON file on the command line that specifies the ACLs. - Do not include in the filter ACLs that are managed independently on the destination cluster. This is to prevent cluster link migration from deleting ACLs that were added specifically on the destination and should not be deleted. See also, ACL Syncing.
acl.sync.enable
Whether or not to migrate ACLs. See also, Syncing ACLs from Source to Destination Cluster and ACL Syncing.
- Type: boolean
- Default: false
acl.sync.ms
How often to refresh the ACLs, in milliseconds (if ACL migration is enabled). The default is 5000 milliseconds (5 seconds).
- Type: int
- Default: 5000
auto.create.mirror.topics.enable
- Whether or not to auto-create mirror topics based on topics on the source cluster. When set to “true”, mirror topics will be auto-created. Setting this option to “false” disables mirror topic creation and clears any existing filters. For details on this option, see Auto-create Mirror Topics.
auto.create.mirror.topics.filters
- A JSON object with one property,
topicFilters
, that contains an array of filters to apply to indicate which topics should be mirrored. For details on this option, see Auto-create Mirror Topics.
cluster.link.prefix
A prefix that is applied to the names of the mirror topics. The same prefix is applied to consumer groups when consumer.group.prefix is set to
true
. To learn more, see “Prefixing Mirror Topics and Consumer Group Names” in Mirror Topics.Note
The prefix cannot be changed after the cluster link is created.
- Type: string
- Default: null
consumer.group.prefix.enable
When set to
true
, the prefix specified for the cluster link prefix is also applied to the names of consumer groups. The cluster link prefix must be specified in order for the consumer group prefix to be applied. To learn more, see “Prefixing Mirror Topics and Consumer Group Names” in Mirror Topics.- Type: boolean
- Default: false
consumer.offset.group.filters
JSON to denote the list of consumer groups to be migrated. To learn more, see Migrating Consumer Groups from Source to Destination Cluster.
- Type: string
- Default: “”
consumer.offset.sync.enable
Whether or not to migrate consumer offsets from the source cluster.
- Type: boolean
- Default: false
consumer.offset.sync.ms
How often to sync consumer offsets, in milliseconds, if enabled.
- Type: int
- Default: 30000
topic.config.sync.ms
How often to refresh the topic configs, in milliseconds.
- Type: int
- Default: 5000
Syncing ACLs from Source to Destination Cluster¶
Cluster Linking can sync some or all of the ACLs on the source cluster to the destination cluster. This is critical for Disaster Recovery (DR) architectures, so that when your client applications failover to the DR cluster, they have the same access that they had on their original cluster. When you create the cluster link, you can specify which ACLs to sync, filtering down by the name of the resource, principal (service account), operation, and/or permission. You can update this setting on the cluster link at any time. The cluster link will copy the initial set of matching ACLs, and then continuously sync any changes, additions, or deletions of matching ACLs from the source cluster to the destination cluster.
Note
- In Confluent Cloud, ACL sync is only supported between two Confluent Cloud clusters that belong to the same Confluent Cloud organization. ACL sync is not supported between two Confluent Cloud clusters in different organizations, or between a Confluent Platform and a Confluent Cloud cluster. Do not include in the sync filter ACLs that are managed independently on the destination cluster. Before configuring and deploying with ACL sync, see ACL Syncing limitations in the overview for full details.
- You can also migrate consumer group offsets from the source to destination cluster.
Prerequisites¶
You must have the appropriate authorizations:
- DESCRIBE Cluster ACLs (
DescribeAcls
API) on the source cluster - ALTER Cluster ACLs (
CreateAcls
/DeleteAcls
APIs) on the destination cluster
Configurations for ACL Sync¶
To enable ACL sync on a cluster link, specify these properties:
acl.sync.enable=true
This turns on ACL sync. Updating this config to
false
will turn off ACL sync.If you set this up and run Cluster Linking, then later disable it, the filters will be cleared (deleted) from the cluster link.
acl.filters
- A JSON object with one property,
aclFilters
, that contains an array of filters to include specific ACLs to sync. Examples are below. acl.sync.ms (optional)
- How frequently to sync ACLs. The default is 5000 (5 seconds). Minimum is 1000.
You can configure ACLs at the time you create the cluster link, or as an update to an existing configuration.
Here is an example of setting up a cluster link with ACL migration when you
create the cluster link. Note that the link configurations (including ACL
migration properties) are defined in a file, link-configs.properties
, rather
than specified directly on the command line.
confluent kafka link create from-aws-basic \
--source-cluster lkc-12345 \
--source-bootstrap-server SASL_SSL://pkc-abcde.us-west1.gcp.confluent.cloud:9092 \
--source-api-key 1L1NK2RUL3TH3M4LL \
--source-api-secret ******* \
--config-file link-configs.properties
Here is an example of what you might have in link-configs.properties
:
acl.sync.enable=true
acl.sync.ms=1000
acl.filters={ "aclFilters": [ { "resourceFilter": { "resourceType": "any", "patternType": "any" }, "accessFilter": { "operation": "any", "permissionType": "any" } } ] }
The following sections provide examples of how you might define the actual ACLs
in acls.filters
.
Examples¶
The following examples show how to configure the JSON for various types of ACL migration, with granularity on a topic, a resource, or a mixed set.
Migrate all ACLs from source to destination cluster¶
To migrate all ACLs from the source to the destination cluster, provide the
following for acl.filters
.
acl.filters={ \
"aclFilters": [ \
{ \
"resourceFilter": { \
"resourceType": "any", \
"patternType": "any" \
}, \
"accessFilter": { \
"operation": "any", \
"permissionType": "any" \
} \
} \
] \
}
Each field in the JSON has the following options:
resourceType
: Can beANY
(meaning any kind of resource) ortopic
,cluster
,group
, ortransactionalID
, which are the resources specified in Use Access Control Lists (ACLs) for Confluent Cloud.patternType
: Can beANY
,LITERAL
,PREFIXED
, orMATCH
.name
: Name of resource.- If left empty, will default to the
*
wildcard, which will match all names of the specifiedresourceType
. - If
patternType
is"LITERAL"
then any resources with this exact name will be included. - If
patternType
is"PREFIXED"
, then any resources with a name that has this as a prefix will be included. - If
patternType
is"MATCH"
, then any resources that match a given pattern will be included. For example, a literal, wildcard (*
), or prefixed pattern of the same name and type.
- If left empty, will default to the
principal
: (optional) Name of principal specified on the ACL. If left empty will default to the*
wildcard, which will match all principals with the specifiedoperation
andpermissionType
.- For example, a service account with ID 12345 has this principal:
User:sa-12345
- For example, a service account with ID 12345 has this principal:
operation
: Can either be any or one of those specified in Operations under the “Operation” column.permissionType
: Can beany
,allow
, ordeny
Sync all ACLs specific to a topic¶
To sync all ACLs specific to a single topic (in this case topic pineapple
),
provide the following configurations in acls.filters
:
acl.filters={ \
"aclFilters": [ \
{ \
"resourceFilter": { \
"resourceType": "topic", \
"patternType": "literal", \
"name": "pineapple” \
}, \
"accessFilter": { \
"operation": "any", \
"permissionType": "any" \
} \
} \
] \
}
Sync ACLs specific to a principal or permission¶
To sync all ACLs specific to a service account with ID 12345 and also all ACLs
that deny access to the cluster, provide the following configurations in acls.filters
:
acl.filters={ \
"aclFilters": [ \
{ \
"resourceFilter": { \
"resourceType": "any", \
"patternType": "any" \
}, \
"accessFilter": { \
“principal”: “User:sa-12345”, \
"operation": "any", \
"permissionType": "any" \
} \
}, \
{ \
"resourceFilter": { \
"resourceType": "any", \
"patternType": "any" \
}, \
"accessFilter": { \
"operation": "any", \
"permissionType": "deny" \
} \
} \
] \
}
Configuring Security Credentials¶
When you create a cluster link, you will need to give the cluster link credentials with which to authenticate into the Source cluster. Read the Cluster Linking security overview for more information about Cluster Linking security requirements, including the required ACLs.
Configuring Security Credentials for Confluent Cloud Source Clusters¶
If your Source cluster is a Confluent Cloud cluster, your cluster link will need these properties configured:
security.protocol
set toSASL_SSL
sasl.mechanism
set toPLAIN
sasl.jaas.config
set toorg.apache.kafka.common.security.plain.PlainLoginModule required username="<source-api-key>" password="<source-api-secret>";
- Replace
<source-api-key>
and<source-api-secret>
with the API key to use on your Source cluster. - Be careful not to forget the
"
characters and the ending;
- Replace
Configuring Security Credentials for Confluent Platform and Kafka Source Clusters¶
Since cluster links consume from the source cluster using the same mechanism as a regular Kafka consumer, the cluster link’s authentication configuration is identical to the one needed by Kafka consumers.
Note
For Confluent Cloud cluster links, if your source cluster uses SSL (also known as TLS) and you need to pass a keystore and truststore; you must do so in-line, rather than with a keystore and truststore location. To learn more, see the security section on Mutual TLS (mTLS).
The following properties are supported by cluster links going to Confluent Destination clusters:
sasl.client.callback.handler.class
sasl.jaas.config
sasl.kerberos.kinit.cmd
sasl.kerberos.min.time.before.relogin
sasl.kerberos.service.name
sasl.kerberos.ticket.renew.jitter
sasl.kerberos.ticket.renew.window.factor
sasl.login.callback.handler.class
sasl.login.class
sasl.login.refresh.buffer.seconds
sasl.login.refresh.min.period.seconds
sasl.login.refresh.window.factor
sasl.login.refresh.window.jitter
sasl.mechanism
security.protocol
ssl.cipher.suites
ssl.enabled.protocols
ssl.endpoint.identification.algorithm
ssl.engine.factory.class
ssl.key.password
ssl.keymanager.algorithm
ssl.keystore.location
ssl.keystore.password
ssl.keystore.type
ssl.keystore.certificate.chain
ssl.protocol
ssl.provider
ssl.truststore.certificates