Important
You are viewing documentation for an older version of Confluent Platform. For the latest, click here.
Replicator and Cross-Cluster Failover¶
Configuring Replicator to Handle Failover Scenarios¶
You can configure replicator to manage failures in active-standby setups. Replicator can translate committed offsets from the active primary cluster to the standby secondary cluster. In case of failover from the primary cluster to the secondary cluster, the consumers will start consuming data from the last committed offset.
Configuring the Consumer for Failover¶
You must configure an additional interceptor for consumers that are reading topics that are replicated to the secondary cluster.
interceptor.classes=io.confluent.connect.replicator.offsets.ConsumerTimestampsInterceptor
Important
You can only set the interceptor for consumers. Do not set for Replicator or Connect workers.
This interceptor is located in the Replicator JAR (kafka-connect-replicator-5.1.4.jar
). This JAR must be
available on the CLASSPATH
of the consumer.
For Maven projects, include the following dependency.
<dependency>
<groupId>io.confluent</groupId>
<artifactId>kafka-connect-replicator</artifactId>
<version>5.1.4</version>
</dependency>
The ConsumerTimestampsInterceptor
will write to a new topic __consumer_timestamps
to preserve
the required timestamp information for the offset translation. The Replicator is the sole consumer
of this new topic and will use the timestamps to determine the correct offsets in the secondary cluster.
Replicator Configuration for Failover¶
You must set the following property to instruct Replicator to translate offsets and set them in the secondary cluster.
dest.kafka.bootstrap.servers=localhost:9092
Bringing the Primary Cluster Back Up After a Failover¶
After the original primary cluster is restarted, it can be brought up to date by running Replicator in the primary cluster. This will copy data that has been produced in the secondary cluster since the failover. However, before running Replicator in the primary cluster, you must address these prerequisites:
- There might be data in the primary cluster that was never replicated to the secondary before the failure occurred. Determine if the extra data needs to be copied to the secondary cluster, and whether its order matters. If order doesn’t matter, then you can briefly resume Replicator running in the secondary cluster to copy the data from the primary to the secondary. If order matters, then there is no standard answer on how to handle the extra data, since it will depend on your business needs.
- The Replicator must only replicate new data. It should not replicate data that has already been
replicated during the original active-standby configuration. Replicator uses the same offset
translation mechanism that it uses for other consumers, to keep track of where it must resume
in the secondary cluster. This allows it to switch from reading the primary to the secondary cluster.
The ID of the consumer group must be the same when switching from one cluster to the other.
For the consumer group ID, the Replicator will either use the value of
src.consumer.group.id
if specified, or thename
of the Replicator.
If provenance.header.enable=true
, Replicator will add a provenance header that contains the originating cluster ID,
originating topic, and current timestamp to all records that it replicates to a given cluster.
The Replicator can use this provenance header to avoid duplicates when switching
from one cluster to another (since records may have been replicated after the last committed offset).
The provenance header is also useful for avoiding “infinite” replication loops in active-active
setups involving bi-directional replication,
where Replicator is running in each active cluster. However, if your brokers do not support
the message format for 0.11.0 (message format version v2) or you have topics that do not support
the message format for 0.11.0, then you will need to configure Replicator with
provenance.header.enable=false
or else errors will result.
Advanced Configuration for Failover Scenarios¶
The following configuration properties are for advanced tuning of offset translation.
Replicator Configuration¶
offset.translator.tasks.max
The maximum number of Replicator tasks that will perform offset translation. If -1 (the default), all tasks will perform offset translation.
- Type: int
- Default: -1
- Importance: medium
offset.translator.tasks.separate
Whether to translate offsets in separate tasks from those performing topic replication.
- Type: boolean
- Default: false
- Importance: medium
offset.timestamps.commit
Whether to commit internal offset timestamps for Replicator, so that it can resume properly when switching to the secondary cluster.
- Type: boolean
- Default: true
- Importance: medium
provenance.header.enable
Whether to enable the use of provenance headers during replication.
- Type: boolean
- Default: false
- Importance: medium
fetch.offset.expiry.ms
The amount of time in milliseconds after which a fetch offset request for offset translation expires and is discarded.
- Type: long
- Default: 600000
- Importance: low
fetch.offset.retry.backoff.ms
The amount of time in milliseconds to wait before attempting to retry a failed fetch offset request during offset translation.
- Type: long
- Default: 100
- Importance: low
Consumer Configuration¶
timestamps.topic.num.partitions
The number of partitions for the consumer timestamps topic.
- Type: int
- Default: 50
- Importance: high
timestamps.topic.replication.factor
The replication factor for the consumer timestamps topic.
- Type: short
- Default: 3
- Importance: high
timestamps.topic.max.per.partition
The maximum number of timestamp records that will be held in memory for processing per partition. This is initially unbounded but can be configured lower to reduce memory consumption. If the number of timestamp records exceeds this value, then timestamps could be lost, which would result in losing the corresponding translated offsets.
- Type: int
- Default: Integer.MAX_VALUE
- Importance: low