Manage Cluster Linking for Confluent Platform Using Confluent for Kubernetes¶
The Cluster Linking feature allows you to directly connect Kafka clusters together and mirror topics from one cluster (the source) to another (the destination). You can use the feature in multi-datacenter, multi-cluster, and hybrid cloud deployments to seamlessly move data between Kafka clusters.
Confluent for Kubernetes (CFK) provides a declarative API, the ClusterLink custom resource definition (CRD), for defining, updating, and managing cluster links.
For use case scenarios for configuring cluster links in CFK, refer to Cluster Linking Use Cases in the CFK example GitHub repo.
The following types of cluster links are supported in CFK. Select a type of cluster links based on where you want to move data to and from, and configure the ClusterLink CR accordingly:
Confluent Platform to Confluent Platform cluster links
To move data from an on-prem Confluent Platform to another on-prem Confluent Platform or from Confluent Cloud to Confluent Cloud, create a cluster link on the destination with connection and security information for the destination to talk to the source. No changes are required on the source cluster.
The destination cluster initiates the connection to the source cluster.
You can also use this type of cluster links to migrate data from Apache Kafka® or from other Kafka, such as Amazon MSK, to on-prem Confluent Platform.
In this document, this type of cluster link is referred to as destination-initiated. Note that this is not a term used in the other cluster linking documentation.
Confluent Platform to Confluent Cloud cluster links
To move data from an on-prem Confluent Platform to Confluent Cloud in a hybrid environment, use the source-initiated type of cluster links.
For source-initiated cluster links, you configure connection and security information on the source cluster to talk to the destination.
The source cluster (on-prem Confluent Platform) initiates the connection to the destination cluster (Confluent Cloud).
For source-initiated cluster links, cluster links need to be configured both on the source and destination clusters.
For information on how to create each type of cluster link, see Create a cluster link.
Requirements¶
Before a cluster link is created, the following requirements must be satisfied:
Requirements for destination-initiated cluster linking¶
- The destination cluster must be a Confluent Platform or a Confluent Cloud cluster.
- Cluster links are created and managed using Admin REST APIs, therefore the Admin REST APIs service must be running on the destination cluster. Set up the REST Class as described in Manage Confluent Admin REST Class for Confluent Platform Using Confluent for Kubernetes.
- The destination Kafka is a consumer to the source Kafka. Therefore, the destination cluster needs to configure authentication/encryption to talk to the source cluster if the source Kafka has security settings enabled. For the required step, see Configure destination Kafka security.
Requirements for source-initiated cluster linking¶
- The source cluster and the destination cluster must be a Confluent Platform or a Confluent Cloud cluster.
- Cluster links are created and managed using Admin REST APIs, therefore the Admin REST service must be running on the source and destination clusters. Set up the Confluent REST Class as described in Manage Confluent Admin REST Class for Confluent Platform Using Confluent for Kubernetes.
- The source Kafka cluster needs to configure authentication and encryption to communicate with the destination cluster if the destination Kafka has security settings enabled. For the required step, see Configure destination Kafka security.
Create a cluster link¶
Create a cluster link by using a new ClusterLink custom resource (CR) and apply
the CR with the kubectl apply -f <Cluster Link CR>
command.
- To create a destination-initiated cluster link
- Create a cluster link on the destination cluster and configure authentication and encryption for the source cluster.
- To create a source-initiated cluster link
- Create two cluster links with the same cluster link names.
mirrorTopics
, mirrorTopicOptions
, aclFilter
,
consumerGroupFilters
and other cluster link configs must be defined in the
ClusterLink CR on the destination cluster, for both destination-initiated
and source-initiated cluster links.
kind: ClusterLink
metadata:
name: clusterlink --- [1]
namespace: --- [2]
spec:
name: --- [3]
sourceInitiatedLink: --- [4]
linkMode: --- [5]
destinationKafkaCluster: --- [6]
sourceKafkaCluster: --- [7]
consumerGroupFilters: --- [8]
aclFilters: --- [9]
configs: --- [10]
mirrorTopics: --- [11]
mirrorTopicOptions: --- [12]
[1] Required. The name of the ClusterLink CR.
[2] Optional. The namespace of the ClusterLink CR. If omitted, the same namespace as this CR is assumed.
[3] Optional. The name of the cluster link. If not defined
metadata.name
([1]) is used.[4] Optional. Configure if this cluster link is a source-initiated cluster link.
[5] Required under
sourceInitiatedLink
. Specify whether this source-initiated cluster link CR is on the source cluster or on the destination cluster. Valid values areSource
andDestination
.[6] Required. The information about the destination cluster. See Configure the destination-initiated cluster link and Configure the source-initiated cluster link on the destination cluster.
[7] Required. The information about the source cluster. See Configure the destination-initiated cluster link and Configure the source-initiated cluster link on the source cluster.
[8] Optional. An array of consumer groups to be migrated from the source cluster to the destination cluster. See Define consumer group filters.
[9] Optional. An array of Access Control Lists (ACLs) to be migrated from the source cluster to the destination cluster. See Define ACL filters.
[10] Optional. A map of additional configurations for creating a cluster link.
For example:
spec: configs: connections.max.idle.ms: "620000" cluster.link.retry.timeout.ms: "200000"
This setting can be in all types and modes or ClusterLink CRs.
For the list of optional configurations, see Cluster Linking config options.
[11] Optional. An array of mirror topics. See Create a mirror topic.
[12] Optional. Configuration options for mirror topics. See Configure mirror topic options.
Configure the destination-initiated cluster link¶
On the destination cluster, configure the information about the destination and source Kafka clusters in the ClusterLink CR:
spec:
destinationKafkaCluster:
kafkaRestClassRef: --- [1]
name: --- [2]
namespace: --- [3]
sourceKafkaCluster:
bootstrapEndpoint: --- [4]
clusterID: --- [5]
kafkaRestClassRef: --- [6]
name: --- [7]
namespace: --- [8]
[1] Required. The reference to the KafkaRestClass application resource which defines the Kafka REST Class connection information.
[2] Required under [3]. The name of the KafkaRestClass CR on the destination cluster.
[3] Optional. The namespace of the KafkaRestClass CR. If omitted, the same namespace as this CR is assumed.
[4] Required. The bootstrap endpoint where the source Kafka is running.
[5] The cluster ID of the source Kafka. If both
clusterID
and Kafka REST class name ([7]) are specified, thisclusterID
value takes precedence over the Kafka REST class name ([7]).You can get the cluster ID using the
curl
orkafka-cluster
command with the proper flags. For example:curl https://<cluster url>:8090/kafka/v3/clusters/ -kv
kafka-cluster cluster-id --bootstrap-server kafka.operator.svc.cluster.local:9092 \ --config /tmp/kafka.properties
[6] The reference to the KafkaRestClass application resource which defines the Kafka REST Class connection information.
[7] Required under [6]. The name of the KafkaRestClass CR on the source Kafka cluster.
[8] Optional. The namespace of the KafkaRestClass CR. If omitted, the same namespace as this CR is assumed.
SASL/PLAIN authentication to source Kafka cluster¶
For the destination-initiated cluster link to authenticate to the source Kafka cluster using SASL/PLAIN, configure the following in the ClusterLink CR:
spec:
sourceKafkaCluster:
authentication:
type: plain --- [1]
jaasConfig:
secretRef: --- [2]
jaasConfigPassThrough: --- [3]
secretRef: --- [4]
[1] Required.
[2] Optional. Provide the user names and passwords in the referenced secret.
[3] Optional. If you have customizations, such as using custom login handlers, you can bypass the CFK automation and provide the configuration directly.
[4] Set to the Kubernetes Secret name used to authenticate to Kafka.
jaasConfigPassThrough
does not support thedirectoryPathInContainer
feature where required authentication credentials are injected by Vault.
See Client-side SASL/PLAIN authentication for Kafka for more information on
jaasConfig
and jaasConfigPassThrough
.
mTLS authentication to Kafka cluster¶
For the destination-initiated cluster link to authenticate to the source Kafka cluster using mTLS, configure the following in the ClusterLink CR:
spec:
sourceKafkaCluster:
authentication:
type: mtls --- [1]
tls:
enabled: true --- [2]
secretRef: --- [3]
[1] Required.
[2] Required.
[3] Required. The name of the secret that contains the certificates.
See Provide TLS keys and certificates in PEM format for the expected keys in the TLS secret. Only the PEM format is supported for ClusterLink CRs.
TLS encryption for Kafka cluster¶
For the destination-initiated cluster link to encrypt data to the source Kafka cluster using TLS, configure the following in the ClusterLink CR:
spec:
sourceKafkaCluster:
tls:
enabled: true --- [1]
secretRef: --- [2]
[1] Required.
[2] Required. The name of the secret that contains the certificates.
See Provide TLS keys and certificates in PEM format for the expected keys in the TLS secret. Only the PEM format is supported for the ClusterLink CRs.
Configure the source-initiated cluster link on the source cluster¶
For a source-initiated cluster link, configure the cluster information in the Source mode ClusterLink CR:
spec:
sourceInitiatedLink:
linkMode: Source --- [1]
destinationKafkaCluster:
bootstrapEndpoint: --- [2]
clusterID: --- [3]
kafkaRestClassRef: --- [4]
name: --- [5]
namespace: --- [6]
sourceKafkaCluster:
kafkaRestClassRef: --- [7]
name: --- [8]
namespace: --- [9]
[1] Required.
[2] Required. The bootstrap endpoint where the destination Kafka is running.
[3] The cluster ID of the destination Kafka cluster. If both
clusterID
and Kafka REST class name ([5]) are specified, thisclusterID
value takes precedence over the Kafka REST class name ([5]).You can get the cluster ID using the
curl
orkafka-cluster
command with the proper flags. For example:curl https://<cluster url>:8090/kafka/v3/clusters/ -kv
kafka-cluster cluster-id --bootstrap-server kafka.operator.svc.cluster.local:9092 \ --config /tmp/kafka.properties
[4] Optional. The reference to the KafkaRestClass application custom resource (CR) which defines the Kafka REST Class connection information.
[5] Required under [4]. The name of the KafkaRestClass CR on the destination cluster.
[6] Optional. The namespace of the KafkaRestClass CR. If omitted, the same namespace as this CR is assumed.
[7] Required. The reference to the KafkaRestClass application custom resource (CR) which defines the Kafka REST Class connection information.
[8] Required. The name of the KafkaRestClass CR on the source Kafka cluster.
[9] Optional. The namespace of the KafkaRestClass CR. If omitted, the same namespace as this CR is assumed.
SASL/PLAIN authentication to Kafka cluster¶
For the source cluster to authenticate to the destination Kafka cluster using SASL/PLAIN, configure the following in the Source mode ClusterLink CR:
spec:
destinationKafkaCluster:
authentication:
type: plain --- [1]
jaasConfig:
secretRef: --- [2]
jaasConfigPassThrough: --- [3]
secretRef: --- [4]
[1] Required.
[2] Optional. Provide the user names and passwords in the referenced secret.
[3] Optional. If you have customizations, such as using custom login handlers, you can bypass the CFK automation and provide the configuration directly.
[4] Set to the Kubernetes Secret name used to authenticate to Kafka.
jaasConfigPassThrough
does not support thedirectoryPathInContainer
feature where required authentication credentials are injected by Vault.
See Client-side SASL/PLAIN authentication for Kafka for more information on
jaasConfig
and jaasConfigPassThrough
.
mTLS authentication to Kafka cluster¶
For the source cluster to authenticate to the destination Kafka cluster using mTLS, configure the following in the Source mode ClusterLink CR:
spec:
destinationKafkaCluster:
authentication:
type: mtls --- [1]
tls:
enabled: true --- [2]
secretRef: --- [3]
[1] Required.
[2] Required.
[3] Required. The name of the secret that contains the certificates.
See Provide TLS keys and certificates in PEM format for the expected keys in the TLS secret. Only the PEM format is supported for ClusterLink CRs.
TLS encryption for Kafka cluster¶
For the source cluster to encrypt data to the destination Kafka cluster using TLS, configure the following in the Source mode ClusterLink CR:
spec:
destinationKafkaCluster:
tls:
enabled: true --- [1]
secretRef: --- [2]
[1] Required.
[2] Required. The name of the secret that contains the certificates.
See Provide TLS keys and certificates in PEM format for the expected keys in the TLS secret. Only the PEM format is supported for ClusterLink CRs.
Configure the source-initiated cluster link on the destination cluster¶
For a source-initiated cluster link, configure the cluster information in the Destination mode ClusterLink CR:
spec:
sourceInitiatedLink:
linkMode: Destination --- [1]
destinationKafkaCluster:
kafkaRestClassRef: --- [2]
name: --- [3]
namespace: --- [4]
sourceKafkaCluster:
clusterID: --- [5]
kafkaRestClassRef: --- [6]
name: --- [7]
namespace: --- [8]
[1] Required.
[2] The reference to the KafkaRestClass application resource which defines the Kafka REST Class connection information.
[3] Required under [2]. The name of the KafkaRestClass CR on the destination cluster.
[4] Optional. The namespace of the KafkaRestClass CR. If omitted, the same namespace as this CR is assumed.
[5] The cluster ID of the source Kafka cluster. If both
clusterID
and Kafka REST class name ([7]) are specified, thisclusterID
value takes precedence over the Kafka REST class name ([7]).[6] The reference to the KafkaRestClass application custom resource (CR) which defines the Kafka REST Class connection information.
Optional when
clusterID
([5]) is specified.[7] Required under [6]. The name of the KafkaRestClass CR on the source Kafka cluster.
[8] Optional. The namespace of the KafkaRestClass CR. If omitted, the same namespace as this CR is assumed.
In a source-initiated cluster link, authentication and TLS information is not needed in the Destination mode ClusterLink CR.
Define consumer group filters¶
Consumer group filters specify a list of consumer groups to be migrated from the source cluster to the destination cluster.
To define consumer group filters, set the following properties in the ClusterLink CR:
spec:
consumerGroupFilters:
- name: --- [1]
filterType: --- [2]
patternType: --- [3]
- [1] Optional. The name of the consumer group associated with this filter. If
not specified, the default is wildcard (
"*"
). - [2] Required. The type of this filter. Specify
INCLUDE
orEXCLUDE
. - [3] Optional. Defines whether the pattern of resource is
PREFIXED
orLITERAL
. The default isLITERAL
.
The following example specifies to migrate all consumer groups from the source cluster to the destination cluster, excluding one consumer group, “someGroup”.
spec:
consumerGroupFilters:
- name: "*"
patternType: LITERAL
filterType: INCLUDE
- name: "someGroup"
patternType: LITERAL
filterType: EXCLUDE
Define ACL filters¶
ACL filters specify a list of ACLs to be migrated from the source cluster to the destination cluster.
To define ACL filters, set the following properties in the ClusterLink CR:
spec:
aclFilters:
- accessFilter: --- [1]
host: --- [2]
operation: --- [3]
permissionType: --- [4]
principal: --- [5]
resourceFilter: --- [6]
name: --- [7]
patternType: --- [8]
resourceType: --- [9]
- [1] Specifies the access filter for ACLs.
- [2] The host for which operations can be coming from. The default value is
*
, which matches all hosts. - [3] Required. Specifies the operation type of the filter. It can be
ANY
or operations based on resource type defined in the Confluent ACL documentation. - [4] Required. The permission type of the filter. Valid options are
any
,allow
, anddeny
. - [5] The name of the principal. The default value is
*
. - [6] Specifies the resource for this ACL filter. A resource can be a cluster, group, Kafka topic, transactional ID, or Delegation token.
- [7] The name of the resource associated with this filter. The default value is
*
. - [8] Required. The pattern of the resource. Valid options are
prefixed
,literal
,any
, andmatch
. - [9] Required. The type of the filter. Valid options are
any
,cluster
,group
,topic
,transactionId
, anddelegationToken
.
Configure mirror topic options¶
Mirror topic options specify configuration for mirror topics.
To configure mirror topic options for a cluster link, set the following properties in the ClusterLink CR:
spec:
mirrorTopicOptions:
autoCreateTopics: --- [1]
enabled: --- [2]
topicFilters: --- [3]
- filterType: --- [4]
name: --- [5]
patternType: --- [6]
prefix: --- [7]
[1] Configurations for the cluster link to automatically create mirror topics on the destination cluster. For more info, see Auto-create Mirror Topics.
[2] Set to
true
to automatically create mirror topics on the destination cluster.Setting this option to
false
disables mirror topic creation and clears existing filters.[3] An array of filters to indicate which topics should be mirrored.
[4] Required when defining a topic filter. The topic filter type. Valid options are
INCLUDE
andEXCLUDE
.If set to
INCLUDE
, any topic names on the source cluster that match this filter will be created as mirror topics.If set to
EXCLUDE
, any matching topic names will not be created as mirror topics. In other words, prevents auto mirror topic creation for the specified topic names. `EXCLUDE` filters override any overlapping
INCLUDE
filters.[5] Required when defining a topic filter. The resource name associated with this filter. Set name to the wildcard,
*
, to mirror all topics.[6] Required when defining a topic filter. The pattern of the resource. Valid options are
PREFIXED
andLITERAL
.If name is set to
foo
, then setting patternType toLITERAL
will only match a topic namedfoo
. Setting patternType toPREFIXED
will match any topic names that begin withfoo
, for example,foo
,football
, andfoo.fighters
.[7] The prefix for mirror topics. When auto-create is enabled (
mirrorTopicOptions.autoCreateTopics.enabled: true
) and theprefix
is configured, the topics created on the destination cluster will automatically contain the prefix. Otherwise,spec.mirrorTopic.name
must be defined in the<prefix><sourceTopicName>
format.
Configure destination Kafka security¶
When security is enabled, the destination or source Kafka brokers must be configured with a password encoder secret for encrypting sensitive link configurations, such as certificates and passwords for SASL/PLAIN, mTLS, or TLS:
- For a destination-initiated Cluster Linking, configure a password encoder on the destination Kafka cluster.
- For a source-initiated Cluster Linking, configure a password encoder on the source Kafka cluster.
For setting a password encoder secret, see Manage Password Encoder Secrets for Confluent Platform Using Confluent for Kubernetes.
See Updating Password Configurations Dynamically for more about the secret encoder and how to rotate the encoder secret.
Edit a cluster link¶
For cluster link configuration options, see Cluster Linking Config Options.
Update the ClusterLink CR and apply the changes by running:
kubectl apply -f <ClusterLink CR>
Delete a cluster link¶
When a cluster link is deleted, all the mirror topics managed by that cluster links are failed over, and KafkaTopic CRs will be created for the failed over topics.
When you delete a cluster link, the history of any STOPPED
topics is also
deleted. If you need the Last Source Fetch Offset
or the Status Time
of
your promoted or failed-over mirror topics, ensure you save those before you
delete the cluster link.
To delete a cluster link, run:
kubectl delete -f <Cluster Link CR>
Important
After you delete a cluster link, use a brand NEW topic CR to manage the promoted topic in the destination Kafka topic. Never reuse the existing one referenced in the source cluster as this could lead possible data loss.
Describe a cluster link¶
You can get the following information from a cluster link status:
- Cluster link ID
- Cluster link name
- Destination Kafka cluster ID
- Source Kafka cluster ID
- Number of mirror topics
- Status of all mirror topics in the cluster
To list the cluster links on a destination Kafka cluster, run:
kubectl get clusterlink <cluster link name> -oyaml
Cluster link maintains information about promoted topics even though it is not a mirror topic anymore.
Alternatively, you can use the following command to get the status of a cluster link:
kubectl explain clusterlink.status
Create a mirror topic¶
Mirror topics are created by and owned by a cluster link.
A mirror topic is created as a new topic with a unique name on the destination cluster. There must be a topic of this same name on the cluster link’s source cluster, which will be the mirror topic’s source topic.
You cannot convert an existing read/write topic into a mirror topic.
For details on mirror topic creation in Confluent Platform, see Mirror Topic Creation.
To create a mirror topic:
Set the following in the ClusterLink CR:
spec: mirrorTopics: - name: --- [1] state: --- [2] configs: --- [3] replicationFactor: --- [4] sourceTopicName: --- [5]
[1] Required. The mirror topic name. If the
sourceTopicName
is not configured, a topic with the exact same name must exist on the source cluster, and no topic with this name should exist on the destination cluster.When
spec.mirrorTopicOptions.prefix: <prefix>
is configured for the cluster link, thename
has to be of the format<prefix><sourceTopicName>
.[2] Optional. The state of this mirror topic when created. Possible values are
PAUSE
,PROMOTE
,FAILOVER
,ACTVE
. If state is not defined, the default value isACTIVE
.[3] Optional. For the list of optional configurations, see Cluster Linking Config Options.
[4] Optional. The replication factor for the mirror topic on the destination cluster. If omitted, this mirror topic will inherit the broker’s default replication factor.
[5] Optional. The topic name on the source cluster that will be mirrored to the destination cluster.
When
spec.mirrorTopicOptions.prefix
is not configured, do not configure this field.When
spec.mirrorTopicOptions.prefix
is configured, a topic with the exact same name must exist on the source cluster.
Apply the changes by running:
kubectl apply -f <Cluster Link CR>
Modify a mirror topic¶
You can promote, failover, or pause a mirror topic:
- Promote a topic: Promote a topic when you want to stop mirroring and make the topic on destination writeable. Verifies if the lag is zero and does final sync before promoting the topic.
- Failover a topic: This is similar to promote, but doesn’t wait for the lag to become zero.
To learn more about what happens to a Confluent Platform mirror topic when promoted or failed over, see Converting a Mirror Topic to a Normal Topic.
For the available states and statues of a mirror topic in Confluent Platform, see Mirror Topic States and Statuses.
When you promote or failover a mirror topic, CFK creates a KafkaTopic CR. The name of the new KafkaTopic CR will be as below. Note that the topic name in the destination cluster does not change and remains the same as the mirror topic name.
clink-toLowerCase(<topic-name>)-<First section of the ClusterLink object UID>
For example, when a mirror topic, myMirrorTopic
, is promoted, the new
KafkaTopic CR name is:
clink-mymirrortopic-0154a475
If the new KafkaTopic CR name contains more than 63 characters, it will be truncated to the length of 63 characters long.
To modify an active mirror topic to promote, fail over, or pause:
Modify the
mirrorTopics
section in the ClusterLink CR:spec: mirrorTopics: - name: --- [1] state: --- [2]
- [1] Required.
- [2] Optional. The state of this mirror topic. Possible values are
PAUSE
,PROMOTE
,FAILOVER
,ACTIVE
.
Apply the changes by running:
kubectl apply -f clusterlink.yaml
Delete a mirror topic¶
For the process of deleting a mirror topic in Confluent Platform, see Mirror Topic Deletion.
For the available states and statues of a mirror topic in Confluent Platform, see Mirror Topic States and Statuses.
To delete a mirror topic:
Delete a mirror topic from the list in the ClusterLink CR by removing the name and the other properties for the mirror topic.
spec: mirrorTopics: - name: state: configs: replicationFactor: sourceTopicName:
Apply the changes by running:
kubectl apply -f clusterlink.yaml