Replicator and Cross-Cluster Failover

Configuring Replicator to Handle Failover Scenarios

You can configure replicator to manage failures in active-standby setups. Replicator can translate committed offsets from the active primary cluster to the standby secondary cluster. In case of failover from the primary cluster to the secondary cluster, the consumers will start consuming data from the last committed offset.

Configuring the Consumer for Failover

You must configure an additional interceptor for consumers that are reading topics that are replicated to the secondary cluster.

interceptor.classes=io.confluent.connect.replicator.offsets.ConsumerTimestampsInterceptor

This interceptor is located in the Replicator JAR (kafka-connect-replicator-5.0.0.jar). This JAR must be available on the CLASSPATH of the consumer.

The ConsumerTimestampsInterceptor will write to a new topic __consumer_timestamps to preserve the required timestamp information for the offset translation. The Replicator is the sole consumer of this new topic and will use the timestamps to determine the correct offsets in the secondary cluster.

Replicator Configuration for Failover

You must set the following property to instruct Replicator to translate offsets and set them in the secondary cluster.

dest.kafka.bootstrap.servers=localhost:9092

Bringing the Primary Cluster Back Up After a Failover

After the original primary cluster is restarted, it can be brought up to date by running Replicator in the primary cluster. This will copy data that has been produced in the secondary cluster since the failover. However, before running Replicator in the primary cluster, you must address these prerequisites:

  • There might be data in the primary cluster that was never replicated to the secondary before the failure occurred. Determine if the extra data needs to be copied to the secondary cluster, and whether its order matters. If order doesn’t matter, then you can briefly resume Replicator running in the secondary cluster to copy the data from the primary to the secondary. If order matters, then there is no standard answer on how to handle the extra data, since it will depend on your business needs.
  • The Replicator must only replicate new data. It should not replicate data that has already been replicated during the original active-standby configuration. Replicator uses the same offset translation mechanism that it uses for other consumers, to keep track of where it must resume in the secondary cluster. This allows it to switch from reading the primary to the secondary cluster. The ID of the consumer group must be the same when switching from one cluster to the other. For the consumer group ID, the Replicator will either use the value of src.consumer.group.id if specified, or the name of the Replicator.

By default, Replicator adds a provenance header that contains the Replicator name to all records that it replicates to a given cluster. The Replicator can use this provenance header to avoid duplicates when switching from one cluster to another (since records may have been replicated after the last committed offset). The provenance header is also useful for avoiding “infinite” replication loops in active-active setups involving bi-directional replication, where Replicator is running in each active cluster. However, if your brokers do not support the message format for 0.11.0 (message format version v2) or you have topics that do not support the message format for 0.11.0, then you will need to configure Replicator with provenance.header.enable=false or else errors will result.

Advanced Configuration for Failover Scenarios

The following configuration properties are for advanced tuning of offset translation.

Replicator Configuration

offset.translator.tasks.max

The maximum number of Replicator tasks that will perform offset translation. If -1 (the default), all tasks will perform offset translation.

  • Type: int
  • Default: -1
  • Importance: medium
offset.translator.tasks.separate

Whether to translate offsets in separate tasks from those performing topic replication.

  • Type: boolean
  • Default: false
  • Importance: medium
offset.timestamps.commit

Whether to commit internal offset timestamps for Replicator, so that it can resume properly when switching to the secondary cluster.

  • Type: boolean
  • Default: true
  • Importance: medium
provenance.header.enable

Whether to enable the use of provenance headers during replication.

  • Type: boolean
  • Default: true
  • Importance: medium
fetch.offset.expiry.ms

The amount of time in milliseconds after which a fetch offset request for offset translation expires and is discarded.

  • Type: long
  • Default: 600000
  • Importance: low
fetch.offset.retry.backoff.ms

The amount of time in milliseconds to wait before attempting to retry a failed fetch offset request during offset translation.

  • Type: long
  • Default: 100
  • Importance: low

Consumer Configuration

timestamps.topic.num.partitions

The number of partitions for the consumer timestamps topic.

  • Type: int
  • Default: 50
  • Importance: high
timestamps.topic.replication.factor

The replication factor for the consumer timestamps topic.

  • Type: short
  • Default: 3
  • Importance: high
timestamps.topic.max.per.partition

The maximum number of timestamp records that will be held in memory for processing per partition. This is initially unbounded but can be configured lower to reduce memory consumption. If the number of timestamp records exceeds this value, then timestamps could be lost, which would result in losing the corresponding translated offsets.

  • Type: int
  • Default: Integer.MAX_VALUE
  • Importance: low