.. _replicator_failover: Replicator and Cross-Cluster Failover ===================================== Configuring Replicator to Handle Failover Scenarios --------------------------------------------------- You can configure replicator to manage failures in active-standby setups. Replicator can translate committed offsets from the active primary cluster to the standby secondary cluster. In case of failover from the primary cluster to the secondary cluster, the consumers will start consuming data from the last committed offset. Configuring the Consumer for Failover ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~ You must configure an additional interceptor for consumers that are reading topics that are replicated to the secondary cluster. :: interceptor.classes=io.confluent.connect.replicator.offsets.ConsumerTimestampsInterceptor .. important:: You can only set the interceptor for consumers. Do not set for Replicator or |kconnect| workers. This interceptor is located in the Replicator JAR (:litwithvars:`kafka-connect-replicator-|release|.jar`). This JAR must be available on the ``CLASSPATH`` of the consumer. For Maven projects, include the following dependency. .. codewithvars:: bash io.confluent kafka-connect-replicator |release| The ``ConsumerTimestampsInterceptor`` will write to a new topic ``__consumer_timestamps`` to preserve the required timestamp information for the offset translation. The Replicator is the sole consumer of this new topic and will use the timestamps to determine the correct offsets in the secondary cluster. Replicator Configuration for Failover ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~ You must set the following property to instruct Replicator to translate offsets and set them in the secondary cluster. :: dest.kafka.bootstrap.servers=localhost:9092 Bringing the Primary Cluster Back Up After a Failover ----------------------------------------------------- After the original primary cluster is restarted, it can be brought up to date by running Replicator in the primary cluster. This will copy data that has been produced in the secondary cluster since the failover. However, before running Replicator in the primary cluster, you must address these prerequisites: * There might be data in the primary cluster that was never replicated to the secondary before the failure occurred. Determine if the extra data needs to be copied to the secondary cluster, and whether its order matters. If order doesn't matter, then you can briefly resume Replicator running in the secondary cluster to copy the data from the primary to the secondary. If order matters, then there is no standard answer on how to handle the extra data, since it will depend on your business needs. * The Replicator must only replicate new data. It should not replicate data that has already been replicated during the original active-standby configuration. Replicator uses the same offset translation mechanism that it uses for other consumers, to keep track of where it must resume in the secondary cluster. This allows it to switch from reading the primary to the secondary cluster. The ID of the consumer group must be the same when switching from one cluster to the other. For the consumer group ID, the Replicator will either use the value of ``src.consumer.group.id`` if specified, or the ``name`` of the Replicator. If ``provenance.header.enable=true``, Replicator will add a provenance header that contains the originating cluster ID, originating topic, and current timestamp to all records that it replicates to a given cluster. The Replicator can use this provenance header to avoid duplicates when switching from one cluster to another (since records may have been replicated after the last committed offset). The provenance header is also useful for avoiding "infinite" replication loops in active-active setups involving bi-directional replication, where Replicator is running in each active cluster. However, if your brokers do not support the message format for 0.11.0 (message format version v2) or you have topics that do not support the message format for 0.11.0, then you will need to configure Replicator with ``provenance.header.enable=false`` or else errors will result. Advanced Configuration for Failover Scenarios --------------------------------------------- The following configuration properties are for advanced tuning of offset translation. Replicator Configuration ~~~~~~~~~~~~~~~~~~~~~~~~ ``offset.translator.tasks.max`` The maximum number of Replicator tasks that will perform offset translation. If -1 (the default), all tasks will perform offset translation. * Type: int * Default: -1 * Importance: medium ``offset.translator.tasks.separate`` Whether to translate offsets in separate tasks from those performing topic replication. * Type: boolean * Default: false * Importance: medium ``offset.timestamps.commit`` Whether to commit internal offset timestamps for Replicator, so that it can resume properly when switching to the secondary cluster. * Type: boolean * Default: true * Importance: medium .. include:: includes/config.rst ``fetch.offset.expiry.ms`` The amount of time in milliseconds after which a fetch offset request for offset translation expires and is discarded. * Type: long * Default: 600000 * Importance: low ``fetch.offset.retry.backoff.ms`` The amount of time in milliseconds to wait before attempting to retry a failed fetch offset request during offset translation. * Type: long * Default: 100 * Importance: low Consumer Configuration ~~~~~~~~~~~~~~~~~~~~~~ ``timestamps.topic.num.partitions`` The number of partitions for the consumer timestamps topic. * Type: int * Default: 50 * Importance: high ``timestamps.topic.replication.factor`` The replication factor for the consumer timestamps topic. * Type: short * Default: 3 * Importance: high ``timestamps.topic.max.per.partition`` The maximum number of timestamp records that will be held in memory for processing per partition. This is initially unbounded but can be configured lower to reduce memory consumption. If the number of timestamp records exceeds this value, then timestamps could be lost, which would result in losing the corresponding translated offsets. * Type: int * Default: Integer.MAX_VALUE * Importance: low