Manage Cluster Linking for Confluent Platform Using Confluent for Kubernetes Blueprints¶
The Cluster Linking feature allows you to directly connect Kafka clusters together and mirror topics from one cluster (the source) to another (the destination). You can use the feature in multi-datacenter, multi-cluster, and hybrid cloud deployments to seamlessly move data between Kafka clusters.
Confluent for Kubernetes (CFK) Blueprints provides a declarative API, the ClusterLink custom resource definition (CRD), for defining, updating, and managing cluster links.
The following types of cluster links are supported in CFK Blueprints. Select a type of cluster link based on where you want to move data from and to, and configure the ClusterLink custom resource (CR) accordingly:
- Confluent Platform to Confluent Platform cluster link
To move data from an on-prem Confluent Platform to another on-prem Confluent Platform or from Confluent Cloud to Confluent Cloud, create a cluster link on the destination with connection and security information for the destination to talk to the source. No changes are required on the source cluster.
The destination cluster initiates the connection to the source cluster.
You can also use this type of cluster links to migrate data from Apache Kafka® or from other Kafka, such as Amazon MSK, to an on-prem Confluent Platform.
In this document, this type of cluster link is referred to as destination-initiated. Note that this is not a term used in the other cluster linking documentation.
- Confluent Platform to Confluent Cloud cluster link
To move data from an on-prem Confluent Platform to Confluent Cloud in a hybrid environment, use the source-initiated type of cluster links.
For source-initiated cluster links, you configure connection and security information on the source cluster to talk to the destination.
The source cluster (on-prem Confluent Platform) initiates the connection to the destination cluster (Confluent Cloud).
For source-initiated cluster links, cluster links need to be configured both on the source and destination clusters.
The high-level workflow to run Cluster Linking is:
Have the source and destination Kafka clusters deployed. For deploying a Kafka using CFK Blueprints, see Deploy and Manage Confluent Platform Using Confluent for Kubernetes Blueprints.
- For a destination-initiated cluster linking, the destination cluster must be a Confluent Platform or a Confluent Cloud cluster.
- For a source-initiated cluster linking, the source cluster and the destination cluster must be a Confluent Platform or a Confluent Cloud cluster.
If the source or destination Kafka cluster is in Confluent Cloud, set up the connection to the Kafka cluster in Confluent Cloud.
Set up the password encoder for the source and destination Kafka clusters.
When security is enabled, the source and destination Kafka brokers must be configured with a password encoder secret for encrypting sensitive link configurations, such as certificates and passwords.
See Updating Password Configurations Dynamically for more about the secret encoder and how to rotate the encoder secret.
Create a cluster link¶
Review the following to ensure that you create the cluster links on the right clusters:
- To create a destination-initiated cluster link, create a cluster link on the destination cluster.
- To create a source-initiated cluster link, create two cluster links with the same cluster link names on the source cluster and the destination cluster.
- For both destination-initiated and source-initiated cluster links,
mirrorTopics
,mirrors.options
,aclFilter
,consumerGroupFilters
, and other cluster link configurations must be defined in the ClusterLink CR on the destination cluster.
Create a cluster link by using a new ClusterLink custom resource (CR):
apiVersion: apps.cpc.platform.confluent.io/v1beta1
kind: ClusterLink
spec:
name: --- [1]
sourceInitiatedLink: --- [2]
linkMode: --- [3]
destination: --- [4]
kafkaClusterRef:
name: --- [5]
namespace: --- [6]
source: --- [7]
kafkaClusterRef:
name: --- [8]
namespace: --- [9]
consumerGroupFilters: --- [10]
aclFilters: --- [11]
configs: --- [12]
mirrors: --- [13]
[1] The name of the cluster link. If not defined
metadata.name
is used.[2] Configure if this cluster link is a source-initiated cluster link.
[3] Required under
sourceInitiatedLink
. Specify whether this source-initiated cluster link CR is on the source cluster or on the destination cluster. Valid values areSource
andDestination
.[4] Required. The information about the destination cluster.
[5] Required. The name of the destination Kafka cluster.
[6] The namespace of the destination Kafka cluster. If omitted, the namespace of this ClusterLink CR is used.
[7] Required. The information about the source Kafka cluster.
[8] Required. The name of the source Kafka cluster.
[9] The namespace of the source Kafka cluster. If omitted, the namespace of this ClusterLink CR is used.
[10] An array of consumer groups to be migrated from the source cluster to the destination cluster. See Define consumer group filters.
[11] An array of Access Control Lists (ACLs) to be migrated from the source cluster to the destination cluster. See Define ACL filters.
[12] A map of additional configurations for creating a cluster link.
For example:
spec: configs: connections.max.idle.ms: "620000" cluster.link.retry.timeout.ms: "200000"
For the list of optional configurations, see Cluster Linking config options.
[13] Configure mirror topics. See Manage mirror topics.
Define consumer group filters¶
Consumer group filters specify a list of consumer groups to be migrated from the source cluster to the destination cluster.
To define consumer group filters, set the following properties in the ClusterLink CR:
apiVersion: apps.cpc.platform.confluent.io/v1beta1
kind: ClusterLink
spec:
consumerGroupFilters:
- name: --- [1]
filterType: --- [2]
patternType: --- [3]
- [1] The name of the consumer group associated with this filter. If
not specified, the default is a wildcard (
"*"
). - [2] Required. The type of this filter. Specify
INCLUDE
orEXCLUDE
. - [3] Defines whether the pattern of resource is
PREFIXED
orLITERAL
. The default isLITERAL
.
The following example specifies migrating all consumer groups from the source cluster to the destination cluster, excluding one consumer group, “someGroup”.
apiVersion: apps.cpc.platform.confluent.io/v1beta1
kind: ClusterLink
spec:
consumerGroupFilters:
- name: "*"
patternType: LITERAL
filterType: INCLUDE
- name: "someGroup"
patternType: LITERAL
filterType: EXCLUDE
Define ACL filters¶
Access Control List (ACL) filters specify a list of ACLs to be migrated from the source cluster to the destination cluster.
To define ACL filters, set the following properties in the ClusterLink CR:
apiVersion: apps.cpc.platform.confluent.io/v1beta1
kind: ClusterLink
spec:
aclFilters:
- accessFilter: --- [1]
host: --- [2]
operation: --- [3]
permissionType: --- [4]
principal: --- [5]
resourceFilter: --- [6]
name: --- [7]
patternType: --- [8]
resourceType: --- [9]
- [1] Specifies the access filter for ACLs.
- [2] The host for which operations can be coming from. The default value is
*
, which matches all hosts. - [3] Required. Specifies the operation type of the filter. It can be
ANY
or operations based on the resource type defined in the Confluent ACL documentation. - [4] Required. The permission type of the filter. Valid options are
any
,allow
, anddeny
. - [5] The name of the principal. The default value is
*
. - [6] Specifies the resource for this ACL filter. A resource can be a cluster, group, Kafka topic, transactional ID, or Delegation token.
- [7] The name of the resource associated with this filter. The default value is
*
. - [8] Required. The pattern of the resource. Valid options are
prefixed
,literal
,any
, andmatch
. - [9] Required. The type of the filter. Valid options are
any
,cluster
,group
,topic
,transactionId
, anddelegationToken
.
Edit a cluster link¶
For cluster link configuration options, see Cluster Linking Config Options.
Update the ClusterLink CR and apply the changes by running the kubectl apply
command.
Delete a cluster link¶
When a cluster link is deleted, all the mirror topics managed by that cluster link are failed over, and KafkaTopic CRs will be created for the failed over topics.
When you delete a cluster link, the history of any STOPPED
topics is also
deleted. If you need the Last Source Fetch Offset
or the Status Time
of
your promoted or failed-over mirror topics, ensure you save those before you
delete the cluster link.
To delete a cluster link, run:
kubectl delete -f <Cluster Link CR yaml file>
Describe a cluster link¶
You can get the following information from a cluster link status:
- Cluster link ID
- Cluster link name
- Destination Kafka cluster ID
- Source Kafka cluster ID
- Number of mirror topics
- Status of all mirror topics in the cluster
To list the cluster links on a destination Kafka cluster, run:
kubectl get clusterlink.app <Cluster Link name> -oyaml
A cluster link maintains information about promoted topics even though it is not a mirror topic anymore.
Alternatively, you can use the following command to get the status of a cluster link:
kubectl explain clusterlink.status
Manage mirror topics¶
Configure mirror topic options¶
Mirror topic options specify configuration for mirror topics.
To configure mirror topic options for a cluster link, set the following properties in the ClusterLink CR:
apiVersion: apps.cpc.platform.confluent.io/v1beta1
kind: ClusterLink
spec:
mirrors:
options:
autoCreateTopics: --- [1]
enabled: --- [2]
topicFilters: --- [3]
- filterType: --- [4]
name: --- [5]
patternType: --- [6]
prefix: --- [7]
[1] Configurations for the cluster link to automatically create mirror topics on the destination cluster. For more info, see Auto-create Mirror Topics.
[2] Set to
true
to automatically create mirror topics on the destination cluster.Setting this option to
false
disables mirror topic creation and clears existing filters.[3] An array of filters to indicate which topics should be mirrored.
[4] Required when defining a topic filter. The topic filter type. Valid options are
INCLUDE
andEXCLUDE
.If set to
INCLUDE
, any topic names on the source cluster that match this filter will be created as mirror topics.If set to
EXCLUDE
, any matching topic names will not be created as mirror topics. In other words, it prevents automatically creating mirror topics for the specified topic names.EXCLUDE
filters override any overlappingINCLUDE
filters.[5] Required when defining a topic filter. The resource name associated with this filter. Set the name to the wildcard,
*
, to mirror all topics.[6] Required when defining a topic filter. The pattern of the resource. Valid options are
PREFIXED
andLITERAL
.If the name is set to
foo
, then setting patternType toLITERAL
will only match a topic namedfoo
. Setting patternType toPREFIXED
will match any topic names that begin withfoo
, for example,foo
,football
, andfoo.fighters
.[7] The prefix for mirror topics. When auto-create is enabled (
spec.mirrors.options.autoCreateTopics.enabled: true
),and theprefix
is configured, the topics created on the destination cluster will automatically contain the prefix. Otherwise,spec.mirrorTopic.name
must be defined in the<prefix><sourceTopicName>
format.
Create a mirror topic¶
Mirror topics are created by and owned by a cluster link.
A mirror topic is created as a new topic with a unique name on the destination cluster. There must be a topic of this same name on the cluster link’s source cluster, which will be the mirror topic’s source topic.
You cannot convert an existing read/write topic into a mirror topic.
For details on mirror topic creation in Confluent Platform, see Mirror Topic Creation.
To create a mirror topic:
Set the following in the ClusterLink CR and apply the changes using the
kubectl apply command
:apiVersion: apps.cpc.platform.confluent.io/v1beta1 kind: ClusterLink spec: mirrors: topics: - name: --- [1] state: --- [2] configs: --- [3] replicationFactor: --- [4] sourceTopicName: --- [5]
[1] Required. The mirror topic name. If the
sourceTopicName
is not configured, a topic with the exact same name must exist on the source cluster, and no topic with this name should exist on the destination cluster.When
spec.mirrors.options.prefix: <prefix>
is configured for the cluster link, thename
has to be of the format<prefix><sourceTopicName>
.[2] Optional. The state of this mirror topic when created. Possible values are
PAUSE
,PROMOTE
,FAILOVER
,ACTVE
. If the state is not defined, the default value isACTIVE
.[3] Optional. For the list of optional configurations, see Cluster Linking Config Options.
[4] Optional. The replication factor for the mirror topic on the destination cluster. If omitted, this mirror topic will inherit the broker’s default replication factor.
[5] Optional. The topic name on the source cluster that will be mirrored to the destination cluster.
When
spec.mirrors.options.prefix
is not configured, do not configure this field.When
spec.mirrors.options.prefix
is configured, a topic with the exact same name must exist on the source cluster.
Modify a mirror topic¶
You can promote, failover, or pause a mirror topic:
- Promote a topic: Promote a topic when you want to stop mirroring and make the topic on destination writeable. Verifies if the lag is zero and does final sync before promoting the topic.
- Failover a topic: This is similar to promoting, but doesn’t wait for the lag to become zero.
To modify an active mirror topic to promote, failover, or pause:
Modify the
mirrorTopics
section in the ClusterLink CR and apply the changes with thekubectl
command:apiVersion: apps.cpc.platform.confluent.io/v1beta1 kind: ClusterLink spec: mirrors: topics: - name: --- [1] state: --- [2]
- [1] Required.
- [2] Optional. The state of this mirror topic. Possible values are
PAUSE
,PROMOTE
,FAILOVER
,ACTIVE
.
Delete a mirror topic¶
For the process of deleting a mirror topic in Confluent Platform, see Mirror Topic Deletion.
For the available states and status of a mirror topic in Confluent Platform, see Mirror Topic States and Statuses.
To delete a mirror topic:
Delete a mirror topic from the list in the ClusterLink CR by removing the name and the other properties for the mirror topic. And apply the changes with the
kubectl apply
command.apiVersion: apps.cpc.platform.confluent.io/v1beta1 kind: ClusterLink spec: mirrors: topics: - name: state: configs: replicationFactor: sourceTopicName: