Configuration Options for Cluster Linking¶
Looking for Confluent Cloud Cluster Linking docs? You are currently viewing Confluent Platform documentation. If you are looking for Confluent Cloud docs, check out Cluster Linking on Confluent Cloud.
Important
This feature is available as a preview feature. A preview feature is a component of Confluent Platform that is being introduced to gain early feedback from developers. This feature can be used for evaluation and non-production testing purposes or to provide feedback to Confluent.
Enabling Cluster Linking¶
To enable Cluster Linking, add the following line to the broker configuration on
the destination cluster (for example $CONFLUENT_HOME/etc/server.properties
).
confluent.cluster.link.enable=true
Cluster Linking is not available as a dynamic configuration. You must either
enable it before starting the brokers, or to enable it on a running cluster, set
the configuration confluent.cluster.link.enable=true
on the brokers and
restart them to perform a rolling update.
Configuration options for the cluster links themselves are available as values for flags on the kafka-cluster-links command. These are documented below.
Using Cluster Linking with Confluent for Kubernetes¶
You can use Cluster Linking with Confluent Platform deployed with Confluent for Kubernetes.
To configure Cluster Linking on Confluent for Kubernetes, use configOverrides
in the Kafka
custom resource. See Configuration Overrides
in the Confluent for Kubernetes documentation for more information about using
configOverrides
.
apiVersion: platform.confluent.io/v1beta1
kind: Kafka
metadata:
name: kafka
namespace: confluent
spec:
replicas: 3
image:
application: confluentinc/cp-server-operator:6.1.0.0
init: confluentinc/cp-init-container-operator:6.1.0.0
dataVolumeCapacity: 10Gi
configOverrides:
server:
- confluent.cluster.link.enable=true # Enable Cluster Linking
Using Cluster Linking with Ansible¶
You can use Cluster Linking with Confluent Platform deployed with Ansible .
To configure Cluster Linking with Ansible, add to the
kafka_broker_custom_properties
section in the inventory as describe in
Configure Confluent Platform with Ansible.
For example:
all:
vars:
kafka_broker_custom_properties:
confluent.cluster.link.enable: "true"
Link Properties¶
Several configurations are available for cluster links. The following sections describe how to set these using the kafka-cluster-links command, and then list the available properties.
Setting Properties on a Cluster Link¶
You can set configurations on each, individual cluster link. To do this, provide the configurations as “key=value” pairs in either a properties file, and pass the file as an argument to the kafka-cluster-links command using either:
- the
--config-file
flag, when you first create the link, - Or, the
--add-config-file
flag to update configurations on an existing link.
Alternatively, you can specify or update properties for the cluster link by providing “key=value” pairs directly on the command line, using either:
- the
--config
flag, when you first create the link, - Or, the
--add-config
flag to update configurations on an existing link.
Examples and command syntax for specifying link properties in a file and at the command line are shown in in Creating a Cluster Link and in Altering a Cluster Link, and in the Tutorial: Using Cluster Linking for Topic Data Sharing.
Configuration Options¶
These properties are available to specify for the cluster link.
cluster.link.paused
Whether or not the cluster link is running or paused. The default is false.
- Type: boolean
- Default: false
cluster.link.retry.timeout.ms
The number of milliseconds after which failures are no longer retried and partitions are marked as failed. If the source topic is deleted and re-created within this timeout, the link may contain records from the old as well as the new topic.
- Type: int
- Default: 300000 (5 minutes)
availability.check.ms
How often the cluster link checks to see if the source cluster is available. The frequency with which the cluster link checks is specified in milliseconds.
- Type: int
- Default: 60000 (1 minute)
A cluster link regularly checks whether the source cluster is still available for mirroring data. If the source cluster becomes unavailable (for example, because of an outage or disaster), then the cluster link signals this by updating its status and the status of its mirror topics.
availability.check.ms
works in tandem availability.check.consecutive.failure.threshold.
availability.check.consecutive.failure.threshold
The number of consecutive failed availability checks the source cluster is allowed before the cluster link status becomes
SOURCE_UNAVAILABLE
.- Type: int
- Default: 5
If, for example, the default (5) is used, the source cluster is determined to be unavailable after 5 failed checks in a row. If availability.check.ms is also set to its default of 1 minute and there are 5 failed checks, then the cluster link will show as
SOURCE_UNAVAILABLE
after 5 minutes.confluent.cluster.link.enable
To enable Cluster Linking, add the following configuration to the broker configuration on the destination cluster (for example
$CONFLUENT_HOME/etc/server-dst.properties
).confluent.cluster.link.enable=true
Note
Cluster Linking is not available as a dynamic configuration. To enable it, you must set this before starting the brokers.
connections.max.idle.ms
Idle connections timeout. The server socket processor threads close any connections that idle longer than this.
- Type: int
- Default: 600000
consumer.offset.group.filters
JSON to denote the list of consumer groups to be migrated. To learn more, see Migrating Consumer Groups from Source to Destination Cluster.
- Type: string
- Default: “”
Note
Consumer group filters should only include groups that are not being used on the destination. This will help ensure that the system does not override offsets committed by other consumers on the destination. The system attempts to work around filters containing groups that are also used on the destination, but in these cases there are no guarantees; offsets may be overwritten. For mirror topic “promotion” to work, the system must be able to roll back offsets, which cannot be done if the group is being used by destination consumers.
consumer.offset.sync.enable
Whether or not to migrate consumer offsets from the source cluster.
- Type: boolean
- Default: false
consumer.offset.sync.ms
How often to sync consumer offsets, in milliseconds, if enabled.
- Type: int
- Default: 30000
num.cluster.link.fetchers
Number of fetcher threads used to replicate messages from source brokers in cluster links.
- Type: int
- Default: 1
topic.config.sync.ms
How often to refresh the topic configs, in milliseconds.
- Type: int
- Default: 5000
Common Configuration Options¶
The following subset of common properties, although not specific to Cluster Linking, may be particularly relevant to setting up and managing cluster links. These are common across Confluent Platform for clients, brokers, and security configurations, and are described in their respective sections per the links provided.
Client Configurations¶
For a full list of AdminClient
configurations, see AdminClient Configurations.
bootstrap.servers
client.dns.lookup
metadata.max.age.ms
retry.backoff.ms
request.timeout.ms
Cluster Link Replication Configurations¶
These configuration options are fully described in Kafka Broker Configurations.
replica.fetch.backoff.ms
replica.fetch.max.bytes
replica.fetch.min.bytes
replica.fetch.response.max.bytes
replica.fetch.wait.max.ms
replica.socket.receive.buffer.bytes
replica.socket.timeout.ms
Client SASL and SSL Configurations¶
sasl.client.callback.handler.class
sasl.jaas.config
sasl.kerberos.kinit.cmd
sasl.kerberos.min.time.before.relogin
sasl.kerberos.service.name
sasl.kerberos.ticket.renew.jitter
sasl.kerberos.ticket.renew.window.factor
sasl.login.callback.handler.class
sasl.login.class
sasl.login.refresh.buffer.seconds
sasl.login.refresh.min.period.seconds
sasl.login.refresh.window.factor
sasl.login.refresh.window.jitter
sasl.mechanism
security.protocol
ssl.cipher.suites
ssl.enabled.protocols
ssl.endpoint.identification.algorithm
ssl.engine.factory.class
ssl.key.password
ssl.keymanager.algorithm
ssl.keystore.location
ssl.keystore.password
ssl.keystore.type
ssl.protocol
ssl.provider
ssl.secure.random.implementation
ssl.trustmanager.algorithm
ssl.truststore.location
ssl.truststore.password
ssl.truststore.type