Configuration Options for Cluster Linking on Confluent Platform¶
This page describes how to configure Cluster Linking with various Confluent tools, products, and security options.
Using Cluster Linking with Confluent for Kubernetes¶
You can use Cluster Linking with Confluent Platform deployed with Confluent for Kubernetes.
Confluent for Kubernetes 2.2 released built-in Cluster Linking support, as described in this section of the CFK documentation: Cluster Linking using Confluent for Kubernetes.
To configure Cluster Linking on earlier versions of CFK, use configOverrides
in the Kafka
custom resource. See Configuration Overrides.
in the CFK documentation for more information about using configOverrides
.
Also, pre Confluent Platform 7.0.0 releases required that you include a configOverrides
section on the server
to specify confluent.cluster.link.enable: "true"
. For Confluent Platform 7.0.0 and later releases,
Cluster Linking is enabled by default, so this element of the configuration is no longer needed,
regardless of the Confluent for Kubernetes version.
For example:
apiVersion: platform.confluent.io/v1beta1
kind: Kafka
metadata:
name: kafka
namespace: confluent
spec:
replicas: 3
image:
application: confluentinc/cp-server:7.3.10
init: confluentinc/confluent-init-container:2.0.1
configOverrides:
server:
- confluent.cluster.link.enable=true # Enable Cluster Linking
Using Cluster Linking with Ansible¶
You can use Cluster Linking with Confluent Platform deployed with Ansible.
Starting in Confluent Platform 7.0.0, Cluster Linking is enabled by default, so no changes are needed to the configuration file.
Tip
Pre Confluent Platform 7.0.0 releases required that you add a broker configuration
property to the kafka_broker_custom_properties
section in the inventory as described in
Configure Confluent Platform with Ansible,
to set confluent.cluster.link.enable: "true"
. If you are upgrading from an earlier release,
this configuration can be deleted, as it is no longer needed.
Link Properties¶
Several configurations are available for cluster links. The following sections describe how to set these using the CLI Commands, and then list the available properties.
Note that Cluster Linking configurations are isolated from Kafka broker configurations, as such there is no property inheritance from Kafka broker to Cluster Linking. Only properties passed during cluster links create and update will override Cluster Linking behavior.
Setting Properties on a Cluster Link¶
You can set configurations on each, individual cluster link. To do this, provide the configurations as “key=value” pairs in a properties file, and pass the file as an argument to the CLI Commands using either:
- the
--config-file
flag, when you first create the link, - Or, the
--add-config-file
flag to update configurations on an existing link.
Alternatively, you can specify or update properties for the cluster link by providing “key=value” pairs directly on the command line, using either:
- the
--config
flag, when you first create the link, - Or, the
--add-config
flag to update configurations on an existing link.
Tip
When updating the configuration for an existing cluster link, pass in only those configs that change. Be especially mindful when you are using a config file with
--add-config--file
(where it would be easy to pass in a full set of configs) that it contains only the configs you want to update. For example,my-update-configs.txt
might include:consumer.offset.sync.ms=25384 topic.config.sync.ms=38254
You can change several aspects of a cluster link configuration, but you cannot change its source cluster (source cluster ID), prefix, or the link name.
Examples and command syntax for specifying link properties in a file and at the command line are shown in in Creating a Cluster Link and in Altering a Cluster Link, and in the Tutorial: Use Cluster Linking to Share Data Across Topics.
Configuration Options¶
These properties are available to specify for the cluster link.
If you disable a feature that has filters (ACL sync, consumer offset sync, auto create mirror topics) after having it enabled initially, then any existing filters will be cleared (deleted) from the cluster link.
acl.filters
JSON string that lists the ACLs to migrate. Define the ACLs in a file,
acl.filters.json
, and pass the file name as an argument to--acl-filters-json-file
. See Migrating ACLs from Source to Destination Cluster for examples of how to define the ACLs in the JSON file.- Type: string
- Default: “”
Note
Populate
acl.filters
by passing a JSON file on the command line that specifies the ACLs as described in Migrating ACLs from Source to Destination Cluster.acl.sync.enable
Whether or not to migrate ACLs. To learn more, see Migrating ACLs from Source to Destination Cluster.
- Type: boolean
- Default: false
acl.sync.ms
How often to refresh the ACLs, in milliseconds (if ACL migration is enabled). The default is 5000 milliseconds (5 seconds).
- Type: int
- Default: 5000
auto.create.mirror.topics.enable
- Whether or not to auto-create mirror topics based on topics on the source cluster. When set to “true”, mirror topics will be auto-created. Setting this option to “false” disables mirror topic creation and clears any existing filters. For details on this option, see Auto-create Mirror Topics.
auto.create.mirror.topics.filters
- A JSON object with one property,
topicFilters
, that contains an array of filters to apply to indicate which topics should be mirrored. For details on this option, see Auto-create Mirror Topics.
cluster.link.prefix
A prefix that is applied to the names of the mirror topics. The same prefix is applied to consumer groups when consumer.group.prefix.enable is set to
true
. To learn more, see “Prefixing Mirror Topics and Consumer Group Names” in Mirror Topics.Note
The prefix cannot be changed after the cluster link is created.
- Type: string
- Default: null
cluster.link.paused
Whether or not the cluster link is running or paused. The default is false.
- Type: boolean
- Default: false
cluster.link.retry.timeout.ms
The number of milliseconds after which failures are no longer retried and partitions are marked as failed. If the source topic is deleted and re-created within this timeout, the link may contain records from the old as well as the new topic.
- Type: int
- Default: 300000 (5 minutes)
availability.check.ms
How often the cluster link checks to see if the source cluster is available. The frequency with which the cluster link checks is specified in milliseconds.
- Type: int
- Default: 60000 (1 minute)
A cluster link regularly checks whether the source cluster is still available for mirroring data by performing a
DescribeCluster
operation (bounded bydefault.api.timeout.ms
). If the source cluster becomes unavailable (for example, because of an outage or disaster), then the cluster link signals this by updating its status and the status of its mirror topics.availability.check.ms
works in tandem availability.check.consecutive.failure.threshold.
availability.check.consecutive.failure.threshold
The number of consecutive failed availability checks the source cluster is allowed before the cluster link status becomes
SOURCE_UNAVAILABLE
.- Type: int
- Default: 5
If, for example, the default (5) is used, the source cluster is determined to be unavailable after 5 failed checks in a row. If availability.check.ms and
default.api.timeout.ms
are also set to their defaults of 1 minute and there are 5 failed checks, then the cluster link will show asSOURCE_UNAVAILABLE
after 5 * (1+1) mins = 10 minutes. Note that this reflects that source unavailability is detected afteravailability.check.consecutive.failure.threshold
* (default.api.timeout.ms
+availability.check.ms
), taking into account theDescribeCluster
operation performed as a part of availability.check.ms.confluent.cluster.link.enable
- Enables or disables Cluster Linking. In Confluent Platform 7.0.0 and later versions, the default is
true
, Cluster Linking is enabled by default. To learn how to turn off Cluster Linking, see Disabling Cluster Linking. connections.max.idle.ms
Idle connections timeout. The server socket processor threads close any connections that idle longer than this.
- Type: int
- Default: 600000
connection.mode
Used only for source-initiated links. Set this to INBOUND on the destination cluster’s link (which you create first). Set this to OUTBOUND on the source cluster’s link (which you create second). You must use this in combination with
link.mode
. This property should only be set for source-initiated cluster links.- Type: string
- Default: OUTBOUND
consumer.offset.group.filters
JSON to denote the list of consumer groups to be migrated. To learn more, see Migrating Consumer Groups from Source to Destination Cluster.
- Type: string
- Default: “”
Note
Consumer group filters should only include groups that are not being used on the destination. This will help ensure that the system does not override offsets committed by other consumers on the destination. The system attempts to work around filters containing groups that are also used on the destination, but in these cases there are no guarantees; offsets may be overwritten. For mirror topic “promotion” to work, the system must be able to roll back offsets, which cannot be done if the group is being used by destination consumers.
consumer.offset.sync.enable
Whether or not to migrate consumer offsets from the source cluster.
If you set this up and run Cluster Linking, then later disable it, the filters will be cleared (deleted) from the cluster link.
- Type: boolean
- Default: false
consumer.offset.sync.ms
How often to sync consumer offsets, in milliseconds, if enabled.
- Type: int
- Default: 30000
consumer.group.prefix.enable
When set to
true
, the prefix specified for the cluster link prefix is also applied to the names of consumer groups. The cluster link prefix must be specified in order for the consumer group prefix to be applied. To learn more, see “Prefixing Mirror Topics and Consumer Group Names” in Mirror Topics.- Type: boolean
- Default: false
num.cluster.link.fetchers
Number of fetcher threads used to replicate messages from source brokers in cluster links.
- Type: int
- Default: 1
topic.config.sync.ms
How often to refresh the topic configs, in milliseconds.
- Type: int
- Default: 5000
local.listener.name
- For a source initiated link, an alternative listener to be used by the cluster link on the source cluster. For more, see Understanding Listeners in Cluster Linking
link.mode
Used only for source-initiated links. Set this to DESTINATION on the destination cluster’s link (which you create first). Set this to SOURCE on the source cluster’s link (which you create second). You must use this in combination with
connection.mode
. This property should only be set for source-initiated cluster links.- Type: string
- Default: DESTINATION
Common Configuration Options¶
The following subset of common properties, although not specific to Cluster Linking, may be particularly relevant to setting up and managing cluster links. These are common across Confluent Platform for clients, brokers, and security configurations, and are described in their respective sections per the links provided.
Client Configurations¶
For a full list of AdminClient
configurations, see Kafka AdminClient Configurations for Confluent Platform.
bootstrap.servers
client.dns.lookup
metadata.max.age.ms
retry.backoff.ms
request.timeout.ms
Cluster Link Replication Configurations¶
These configuration options are fully described in Kafka Broker Configurations for Confluent Platform.
replica.fetch.backoff.ms
replica.fetch.max.bytes
replica.fetch.min.bytes
replica.fetch.response.max.bytes
replica.fetch.wait.max.ms
replica.socket.receive.buffer.bytes
replica.socket.timeout.ms
Client SASL and SSL Configurations¶
sasl.client.callback.handler.class
sasl.jaas.config
sasl.kerberos.kinit.cmd
sasl.kerberos.min.time.before.relogin
sasl.kerberos.service.name
sasl.kerberos.ticket.renew.jitter
sasl.kerberos.ticket.renew.window.factor
sasl.login.callback.handler.class
sasl.login.class
sasl.login.refresh.buffer.seconds
sasl.login.refresh.min.period.seconds
sasl.login.refresh.window.factor
sasl.login.refresh.window.jitter
sasl.mechanism
security.protocol
ssl.cipher.suites
ssl.enabled.protocols
ssl.endpoint.identification.algorithm
ssl.engine.factory.class
ssl.key.password
ssl.keymanager.algorithm
ssl.keystore.location
ssl.keystore.password
ssl.keystore.type
ssl.protocol
ssl.provider
ssl.secure.random.implementation
ssl.trustmanager.algorithm
ssl.truststore.location
ssl.truststore.password
ssl.truststore.type
Configuring Reconnection Speed and Behavior¶
A cluster link has two sets of configuration options, both exponential times, which control connections. These are the same options that Apache Kafka® clients have.
reconnect.backoff.ms
andreconnect.backoff.max.ms
- These options determine how soon the cluster link retries after a connection failure. These are 50ms and 10s by default for cluster links.socket.connection.setup.timeout.ms
andsocket.connection.setup.timeout.max.ms
- These options determine how long the cluster link waits for a connection attempt to succeed before breaking and retrying after a “reconnect backoff”. These are 10s and 30s, respectively, by default.
On Confluent Platform clusters, reducing the values for these options may give faster reconnection speeds, at the expense of CPU and network usage.
These options cannot be updated by cluster links that have a Confluent Cloud destination cluster.
Required Configurations for Control Center¶
Cluster Linking requires embedded v3 Confluent REST Proxy to communicate with Confluent Control Center and properly display mirror topics on the Control Center UI. If the REST configurations are not implemented, mirror topics will display in Control Center as regular topics, showing inaccurate information. (To learn more, see Known Limitations and Best Practices.)
Configure REST Endpoints in the Control Center properties file¶
If you want to use Control Center with Cluster Linking, you must configure the Control Center cluster with REST endpoints to enable HTTP servers on the brokers. If this is not configured properly for all brokers, Cluster Linking will not be accessible from Confluent Control Center.
In the appropriate Control Center properties file (for example $CONFLUENT_HOME/etc/confluent-control-center/control-center-dev.properties
or control-center.properties
),
use confluent.controlcenter.streams.cprest.url
to define the REST endpoints for controlcenter.cluster
.
The default is http://localhost:8090
, as shown below.
# Kafka REST endpoint URL
confluent.controlcenter.streams.cprest.url="http://localhost:8090"
Identify the associated URL for each broker. If you have multiple brokers in the cluster, use a comma-separated list.
See also
confluent.controlcenter.streams.cprest.url
in the
Control Center Configuration Reference
Configure authentication for REST endpoints on Kafka brokers (Secure Setup)¶
Tip
- Cluster Linking does not require the Metadata Service (MDS) or security to run, but if you want to configure security, you can get started with the following example which shows an MDS client configuration for RBAC.
- You can use
confluent.metadata.server.listeners
(which will enable the Metadata Service) instead ofconfluent.http.server.listeners
to listen for API requests. Use eitherconfluent.metadata.server.listeners
orconfluent.http.server.listeners
, but not both. If a listener uses HTTPS, then appropriate SSL configuration parameters must also be set. To learn more, see Admin REST APIs Configuration Options.
To run Cluster Linking in a secure setup, you must configure authentication for REST endpoints in each of the Kafka broker server.properties
files.
If the Kafka broker files are missing these configs, Control Center will not be able to access Cluster Linking in a secure setup.
At a minimum, you will need the following configurations.
# EmbeddedKafkaRest: HTTP Auth Configuration
kafka.rest.kafka.rest.resource.extension.class=io.confluent.kafkarest.security.KafkaRestSecurityResourceExtension
kafka.rest.rest.servlet.initializor.classes=io.confluent.common.security.jetty.initializer.InstallBearerOrBasicSecurityHandler
Here is an example of an MDS client configuration for Kafka RBAC in a broker server.properties
file .
# EmbeddedKafkaRest: Kafka Client Configuration
kafka.rest.bootstrap.servers=<host:port>, <host:port>, <host:port>
kafka.rest.client.security.protocol=SASL_PLAINTEXT
# EmbeddedKafkaRest: HTTP Auth Configuration
kafka.rest.kafka.rest.resource.extension.class=io.confluent.kafkarest.security.KafkaRestSecurityResourceExtension
kafka.rest.rest.servlet.initializor.classes=io.confluent.common.security.jetty.initializer.InstallBearerOrBasicSecurityHandler
kafka.rest.public.key.path=<rbac_enabled_public_pem_path>
# EmbeddedKafkaRest: MDS Client configuration
kafka.rest.confluent.metadata.bootstrap.server.urls=<host:port>, <host:port>, <host:port>
kafka.rest.ssl.truststore.location=<truststore_location>
kafka.rest.ssl.truststore.password=<password>
kafka.rest.confluent.metadata.http.auth.credentials.provider=BASIC
kafka.rest.confluent.metadata.basic.auth.user.info=<user:password>
kafka.rest.confluent.metadata.server.urls.max.age.ms=60000
kafka.rest.client.confluent.metadata.server.urls.max.age.ms=60000
See also
- Admin REST APIs Security
- REST Proxy Security
- Scripted Confluent Platform Demo, On-Prem Tutorial, Security section provides examples of different types of configurations
Disabling Cluster Linking¶
To disable Cluster Linking on a cluster running Confluent Enterprise version 7.0.0 or later, add the following
line to the broker configuration on the destination cluster (for example $CONFLUENT_HOME/etc/server.properties
).
confluent.cluster.link.enable=false
This will disable creating cluster links with that cluster as the destination, or “source initiated cluster links” with that cluster as the source. Note: this will not disable creating a destination-initiated cluster link with this cluster as its source.
Cluster Linking is not available as a dynamic configuration. It must either be
enabled before starting the brokers (it is on by default starting with Confluent Platform 7.0.0),
or to enable it on a running cluster where it was previously turned off, set the
configuration confluent.cluster.link.enable=true
on the brokers and restart them to perform a rolling update.
Understanding Listeners in Cluster Linking¶
For a forward connection, the target server knows which listener the connection came in on and associates the listener with that connection. When a metadata request arrives on that connection, the server returns metadata corresponding to the listener.
For example, in Confluent Cloud, when a client on the external listener asks for
the leader of topicA
, it always gets the external endpoint of the leader and never
the internal one, because the system knows the listener name from the connection.
For reverse connections, the target server (that is, the source cluster) established the connection. When the connection is reversed, this target server needs to know which listener to associate the reverse connection with; that is, for example, which endpoint it should return to the destination for its leader requests.
By default, the listener is associated based on the source cluster where the link was created. In most cases this is sufficient because typically a single external listener is used. On Confluent Cloud, this default is used and you cannot override it.
On self-managed Confluent Platform, you have the option to override the default listener/connection association. This provides the flexibility to create the source link on an internal listener but associate the external listener with the reverse connection.
The configuration local.listener.name
refers to source cluster listener
name. By default, this is the listener that was used to create the source link.
If you want to use a different listener, you must explicitly configure it. If
Confluent Cloud is the source, then it would be the external listener (default) and cannot be overridden.
For the destination, the listener is determined by bootstrap.servers and cannot be overridden.