Cluster Linking using Confluent for Kubernetes¶
Using the Cluster Linking feature, you can directly connect Kafka clusters together and mirror topics from one cluster (the source) to another (the destination). The feature makes it easier for you to build multi-datacenter, multi-cluster, and hybrid cloud deployments.
Confluent for Kubernetes (CFK) provides a declarative API, the ClusterLink custom resource definition (CRD), for defining, updating, and managing cluster links.
Cluster link configurations are done on the destination cluster. No changes are required on the source cluster.
Requirements¶
Before you create cluster links, make sure the following requirements are satisfied:
- The destination cluster must be a Confluent Platform or Confluent Cloud cluster.
- Cluster links are created and managed using Admin REST APIs, and Admin REST APIs must be running on the destination cluster. If your destination cluster is managed by CFK, instructions for setting up Admin REST APIs are described in Confluent Admin REST API.
- The destination Kafka is a consumer to the source Kafka. Therefore, the destination cluster needs to configure authentication/encryption to talk to the source cluster if the source Kafka has security settings enabled. For the required step, see Configure destination Kafka security.
Create a cluster link¶
Create a cluster link by creating a new ClusterLink custom resource (CR) on the
destination cluster, and apply the CR with the kubectl apply -f <Cluster Link
CR>
command:
apiVersion: platform.confluent.io/v1beta1
kind: ClusterLink
metadata:
name: clusterlink --- [1]
namespace: --- [2]
spec:
destinationKafkaCluster:
kafkaRestClassRef:
name: --- [3]
namespace: --- [4]
sourceKafkaCluster: --- [5]
consumerGroupFilters: --- [6]
aclFilters: --- [7]
configs: --- [8]
mirrorTopics: --- [9]
[1] Required. Defines the name of the ClusterLink CR.
[2] Defines the namespace of the ClusterLink CR.
[3] Required. The name of the Confluent Admin REST API custom resource (KafkaRestClass CR). KafkaRestClass CR is required on the destination cluster for cluster links.
[4] Optional.
[5] Required. Specify the information about the source cluster. See Specify the source Kafka cluster.
[6] Optional. An array consumer groups to be migrated from the source cluster to the destination cluster. See Define consumer group filters.
[7] Optional. An array of Access Control Lists (ACLs) to be migrated from the source cluster to the destination cluster. See Define ACL filters.
[8] Optional. A map of additional configurations for creating a cluster link.
For example:
spec: configs: connections.max.idle.ms: "620000" cluster.link.retry.timeout.ms: "200000"
For the list of optional configurations, see Cluster Linking config options.
[9] Optional. An array of mirror topics. See Create a mirror topic.
Specify the source Kafka cluster¶
Configure the connection information for the source Kafka cluster.
spec:
sourceKafkaCluster:
bootstrapEndpoint: --- [1]
kafkaRestClassRef:
name: --- [2]
namespace: --- [3]
clusterID: --- [4]
[1] The endpoint where the source Kafka is running.
[2] The name of the KafkaRestClass CR on the source Kafka cluster.
[3] The namespace of the source Kafka cluster.
[4] The cluster ID of the source Kafka cluster. If both
clusterID
and Kafka REST class name [2] are specified, thisclusterID
values takes precedence over the Kafka REST class name [2].You can get the cluster ID using the
curl
orkafka-cluster
command with the proper flags. For example:curl https://<cluster url>:8090/kafka/v3/clusters/ -kv
kafka-cluster cluster-id --bootstrap-server kafka.operator.svc.cluster.local:9092 --config /tmp/kafka.properties
SASL/PLAIN authentication to source Kafka cluster
spec:
sourceKafkaCluster:
authentication:
type: plain --- [1]
jaasConfig:
secretRef: --- [2]
jaasConfigPassThrough: --- [3]
secretRef: --- [4]
directoryPathInContainer:--- [5]
[1] Required.
[2] Optional. Provide the user names and passwords in a referenced secret.
[3] Optional. If you have customizations, such as using custom login handlers, you can bypass the CFK automation and provide the configuration directly.
[4] Set to the Kubernetes Secret name used to authenticate to Kafka.
[5] Set to the directory path in the container where required Kafka authentication credentials are injected by Vault.
See Provide secrets for Confluent Platform application CR for providing the credential and required annotations when using Vault.
See Client-side SASL/PLAIN authentication for Kafka for more information on
jaasConfig
and jaasConfigPassThrough
.
mTLS authentication to source Kafka cluster
spec:
sourceKafkaCluster:
authentication:
type: mtls --- [1]
tls:
enabled: true --- [2]
secretRef: --- [3]
- [1] Required.
- [2] Required. See the next section for TLS configuration.
- [3] Required. The name of the secret that contains the certificates. See Configure Network Encryption with Confluent for Kubernetes for more information on certificates.
TLS encryption for source Kafka cluster
spec:
sourceKafkaCluster:
tls:
enabled: true --- [1]
secretRef: --- [2]
- [1] Required.
- [2] Required. The name of the secret that contains the certificates. See Configure Network Encryption with Confluent for Kubernetes for more information on certificates.
Define consumer group filters¶
Consumer group filters specify a list of consumer groups to be migrated from the source cluster to the destination cluster.
To define consumer group filters, set the following properties in the ClusterLink CR:
spec:
consumerGroupFilters:
- name: --- [1]
filterType: --- [2]
patternType: --- [3]
- [1] Optional. The name of the consumer group associated with this filter. If
not specified, the default is wildcard (
"*"
). - [2] Required. The type of this filter. Specify
INCLUDE
orEXCLUDE
. - [3] Optional. Defines whether the pattern of resource is
PREFIXED
orLITERAL
. The default isLITERAL
.
The following example specifies to migrate all consumer groups from the source cluster to the destination cluster, excluding one consumer group, “someGroup”.
spec:
consumerGroupFilters:
- name: "*"
patternType: LITERAL
filterType: INCLUDE
- name: "someGroup"
patternType: LITERAL
filterType: EXCLUDE
Define ACL filters¶
ACL filters specify a list of ACLs to be migrated from the source cluster to the destination cluster.
To define ACL filters, set the following properties in the ClusterLink CR:
spec:
aclFilters:
- accessFilter: --- [1]
host: --- [2]
operation: --- [3]
permissionType: --- [4]
principal: --- [5]
resourceFilter: --- [6]
name: --- [7]
patternType: --- [8]
resourceType: --- [9]
- [1] Specifies the access filter for ACLs.
- [2] The host for which operations can be coming from. The default value is
*
, which matches all hosts. - [3] Required. Specifies the operation type of the filter. It can be
ANY
or operations based on resource type defined in the Confluent ACL documentation. - [4] Required. The permission type of the filter. Valid options are
any
,allow
, anddeny
. - [5] The name of the principal. The default value is
*
. - [6] Specifies the resource for this ACL filter. A resource can be a cluster, group, Kafka topic, transactional ID, or Delegation token.
- [7] The name of the resource associated with this filter. The default value is
*
. - [8] Required. The pattern of the resource. Valid options are
prefixed
,literal
,any
, andmatch
. - [9] Required. The type of the filter. Valid options are
any
,cluster
,group
,topic
,transactionId
, anddelegationToken
.
Configure destination Kafka security¶
When security is enabled, the destination Kafka brokers must be configured with a password encoder secret for encrypting sensitive link configurations, such as certificates and password for SASL/PLAIN, mTLS, or TLS mode.
For setting a password encoder secret, see Manage Password Encoder Secret.
See Updating Password Configurations Dynamically for more about the secret encoder and how to rotate the encoder secret.
Edit a cluster link¶
For cluster link configuration options, see Cluster Linking Config Options.
Update the ClusterLink CR and apply the changes by running:
kubectl apply -f <ClusterLink CR>
Delete a cluster link¶
When a cluster link is deleted, all the mirror topics managed by that cluster links are failed over, and KafkaTopic CRs will be created for the failed over topics.
When you delete a cluster link, the history of any STOPPED
topics is also
deleted. If you need the Last Source Fetch Offset
or the Status Time
of
your promoted or failed-over mirror topics, ensure you save those before you
delete the cluster link.
To delete a cluster link, run:
kubectl delete -f <Cluster Link CR>
Describe a cluster link¶
You can get the following information from a cluster link status:
- Cluster link ID
- Cluster link name
- Destination Kafka cluster ID
- Source Kafka cluster ID
- Number of mirror topics
- Status of all mirror topics in the cluster
To list the cluster links on a destination Kafka cluster, run:
kubectl get clusterlink <cluster link name> -oyaml
Cluster link maintains information about promoted topics even though it is not a mirror topic anymore.
Alternatively, you can use the following command to get the status of a cluster link:
kubectl explain clusterlink.status
Create a mirror topic¶
Mirror topics are created by and owned by a cluster link.
A mirror topic is created as a new topic with a unique name on the destination cluster. There must be a topic of this same name on the cluster link’s source cluster, which will be the mirror topic’s source topic.
You cannot convert an existing read/write topic into a mirror topic.
For details on mirror topic creation in Confluent Platform, see Mirror Topic Creation.
To create a mirror topic:
Set the following in the ClusterLink CR:
spec: mirrorTopics: - name: --- [1] state: --- [2] configs: --- [3]
- [1] Required. The name of the source topic. There must be a topic of this same name on the cluster link’s source cluster.
- [2] Optional. The state of this mirror topic when created. Possible values are
PAUSE
,PROMOTE
,FAILOVER
,ACTVE
. If state is not defined, the default value isACTIVE
. - [3] Optional. For the list of optional configurations, see Cluster Linking Config Options.
Apply the changes by running:
kubectl apply -f <Cluster Link CR>
Modify a mirror topic¶
You can promote, failover, or pause a mirror topic:
- Promote a topic: Promote a topic when you want to stop mirroring and make the topic on destination writeable. Verifies if the lag is zero and does final sync before promoting the topic.
- Failover a topic: This is similar to promote, but doesn’t wait for the lag to become zero.
To learn more about what happens to a Confluent Platform mirror topic when promoted or failed over, see Converting a Mirror Topic to a Normal Topic.
For the available states and statues of a mirror topic in Confluent Platform, see Mirror Topic States and Statuses.
When you promote or failover a mirror topic, CFK creates a KafkaTopic CR. The name of the new KafkaTopic CR will be as below. Note that the topic name in the destination cluster does not change and remains the same as the mirror topic name.
clink-toLowerCase(<topic-name>)-<First section of the ClusterLink object UID>
For example, when a mirror topic, myMirrorTopic
, is promoted, the new
KafkaTopic CR name is:
clink-mymirrortopic-0154a475
If the new KafkaTopic CR name contains more than 63 characters, it will be truncated to the length of 63 characters long.
To modify an active mirror topic to promote, fail over, or pause:
Modify the
mirrorTopics
section in the ClusterLink CR:spec: mirrorTopics: - name: --- [1] state: --- [2]
- [1] Required.
- [2] Optional. The state of this mirror topic. Possible values are
PAUSE
,PROMOTE
,FAILOVER
,ACTIVE
.
Apply the changes by running:
kubectl apply -f clusterlink.yaml
Delete a mirror topic¶
For the process of deleting a mirror topic in Confluent Platform, see Mirror Topic Deletion.
For the available states and statues of a mirror topic in Confluent Platform, see Mirror Topic States and Statuses.
To delete a mirror topic:
Remove the mirror topic from the list in the ClusterLink CR:
spec: mirrorTopics: - name: --- [1]
- [1] Remove the name and the other properties for the mirror topic.
Apply the changes by running:
kubectl apply -f clusterlink.yaml