.. _replicator_failover: Configure |crep| for Cross-Cluster Failover in |cp| =================================================== Multi-datacenter designs load balance the processing of data in multiple clusters, and safeguard against outages by replicating data across the clusters. If one datacenter goes down, the remaining datacenters have replicas of the data and can take over where the failed datacenter left off. Once the original datacenter recovers, it resumes message processing. |crep| knows where to re-start data synchronization across clusters based on these supporting features for failover and disaster recovery: * :ref:`Timestamp Preservation ` * :ref:`Provenance Headers to Prevent Cyclic Message Repetition ` * :ref:`Consumer Offset Translation ` This document describes parameters for the above features as a part of initial setup, and failover and recovery designs. You can configure |crep| to manage failures in active-standby or active-active setups. In an active-standby deployment, |crep| runs in one direction, copying |ak| messages and metadata from the active or primary cluster (Datacenter A in the example below) to the standby or secondary `passive` cluster (Datacenter B, below). .. figure:: replicator-multi-dc-active-passive.png :align: center Active-Passive Deployment In an active-active deployment, one |crep| copies |ak| messages and metadata from the origin Datacenter A to the destination Datacenter B, and another |crep| copies |ak| data and messages from origin Datacenter B to destination Datacenter A. .. figure:: replicator-multi-dc-active-active.png :align: center Active-Active Deployment Producers write data to both clusters, and depending on client application design, consumers can read data produced in their own cluster along with data produced in the other cluster and replicated to their local cluster. Configuring |crep| to handle failover scenarios ----------------------------------------------- |crep| can translate committed offsets from the active primary cluster to the standby secondary cluster. In case of failover from the primary cluster to the secondary cluster, the consumers will start consuming data from the last committed offset. .. _configuring-the-consumer-for-failover: Configuring the consumer for failover (timestamp preservation) ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~ You must configure an additional interceptor for consumers from initial deployment in the source cluster. During failover, the interceptor will allow consumers to resume when reading topics that are replicated to the secondary cluster. :: interceptor.classes=io.confluent.connect.replicator.offsets.ConsumerTimestampsInterceptor .. include:: includes/timestamp-interceptor-details.rst |crep| configuration for failover ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~ You must set the following property to instruct |crep| to translate offsets and set them in the secondary cluster. :: dest.kafka.bootstrap.servers=localhost:9092 Bringing the primary cluster back up after a failover ----------------------------------------------------- After the original primary cluster is restarted, it can be brought up to date by running |crep| in the primary cluster. This will copy data that has been produced in the secondary cluster since the failover. However, before running |crep| in the primary cluster, you must address these prerequisites: * There might be data in the primary cluster that was never replicated to the secondary before the failure occurred. Determine if the extra data needs to be copied to the secondary cluster, and whether its order matters. If order doesn't matter, then you can briefly resume |crep| running in the secondary cluster to copy the data from the primary to the secondary. If order matters, then there is no standard answer on how to handle the extra data, since it will depend on your business needs. * The |crep| must only replicate new data. It should not replicate data that has already been replicated during the original active-standby configuration. |crep| uses the same offset translation mechanism that it uses for other consumers, to keep track of where it must resume in the secondary cluster. This allows it to switch from reading the primary to the secondary cluster. The ID of the consumer group must be the same when switching from one cluster to the other. For the consumer group ID, the |crep| will either use the value of ``src.consumer.group.id`` if specified, or the ``name`` of the |crep|. .. _consumer_offset_translation_feature: Understanding consumer offset translation ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~ Starting with |cp| version 5.0, |crep| automatically translates offsets using timestamps so that consumers can failover to a different datacenter and start consuming data in the destination cluster where they left off in the origin cluster. If one cluster goes down, consumers must restart to connect to a new cluster. Before the outage, they consumed messages from topics in one cluster, and afterwards, they consume messages from topics in the other cluster or datacenter. You can configure |crep| :ref:`parameters on Consumer Offset Translation ` to specify where in a topic consumers should start reading, after failover to a new cluster. By default, when a consumer is created in a failover cluster, you can set the configuration parameter ``auto.offset.reset`` to either ``latest`` or ``earliest``. For some applications and scenarios, these defaults are sufficient; for example, ``latest`` message for clickstream analytics or log analytics, and ``earliest`` message for idempotent systems or any other system that can handle duplicates. But for other applications, neither of these defaults may be appropriate. The desired behavior might be that the consumer starts reading at a specific message and consumes only the unread messages. To target a specific offset (rather than rely on ``latest`` or ``earliest`` defaults), the consumer must reset its consumer offsets to meaningful values in the new cluster. Consumers can't reset by relying only on offsets to determine where to start because the offsets may differ between clusters. One approach is to use timestamps. :ref:`Timestamp preservation` in messages adds context to offsets, so a consumer can start consuming messages at an offset that is derived from a timestamp. You can configure offset translation by using the parameters described in :ref:`tuning_offset_translation`, :ref:`enable_disable_offset_translation`, and :ref:`consumer_offset_translation` in :ref:`replicator_config_options`. To use this capability, configure Java consumer applications with an interceptor called :ref:`Consumer Timestamps Interceptor`, which preserves metadata of consumed messages including: - Consumer group ID - Topic name - Partition - Committed offset - Timestamp Consumer timestamp information is preserved in a |ak| topic called ``__consumer_timestamps`` located in the origin cluster. |crep| does not replicate this topic because it has only local cluster significance. .. tip:: You can view the data stored on the ``__consumer_timestamps`` topic using a console consumer. For this to work, the |crep| library must be added to the consumer classpath and special deserializers must be used. :: export CLASSPATH=/usr/share/java/kafka-connect-replicator/* kafka-console-consumer --topic __consumer_timestamps --bootstrap-server kafka1:9092 --property print.key=true --property key.deserializer=io.confluent.connect.replicator.offsets.GroupTopicPartitionDeserializer --property value.deserializer=io.confluent.connect.replicator.offsets.TimestampAndDeltaDeserializer As |crep| copies data from one datacenter to another, it concurrently does the following: - Reads the consumer offset and timestamp information from the ``__consumer_timestamps`` topic in the origin cluster to understand a consumer group’s progress. - Translates the committed offsets in the origin datacenter to the corresponding offsets in the destination datacenter. - Writes the translated offsets to the ``__consumer_offsets`` topic in the destination cluster, as long as no consumers in that group are connected to the destination cluster. When |crep| writes the translated offsets to the ``__consumer_offsets`` topic in the destination cluster, it applies the offsets to any topic renames configured with ``topic.rename.format``. If there are already consumers in that consumer group connected to the destination cluster, |crep| does not write offsets to the ``__consumer_offsets`` topic since those consumers will be committing their own offsets. Either way, when a consumer application fails over to the backup cluster, it looks for and will find previously committed offsets using the normal mechanism. .. _provenance_headers: Using provenance headers to prevent duplicates or cyclic message repetition ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~ If ``provenance.header.enable=true``, |crep| adds a provenance header to all records (messages) that it replicates to a given cluster. The provenance header contains: * ID of the origin cluster where this message was first produced * Name of the topic to which this message was first produced * Timestamp when |crep| first copied the record |crep| can use this provenance header information to avoid duplicates when switching from one cluster to another (since records may have been replicated after the last committed offset). It checks to determine if the destination cluster is equal to the origin cluster from the provenance headers. By default, |crep| will not replicate a message to a destination cluster if the cluster ID of the destination cluster matches the origin cluster ID in the provenance header, `and` if the destination topic name matches the origin topic name in the provenance header. Since these are checked at the topic level, you can replicate back to the origin cluster as long as it is to a different topic. For A -> B -> C, the record in C will have 2 provenance headers, one for B and one for A. .. figure:: replicator-provenance-headers-blog.png :align: center :scale: 80% The provenance header is also useful for avoiding "infinite" replication loops in active-active setups involving bi-directional replication, where |crep| is running in each active cluster. The effect of this is that even applications in different datacenters can access topics with exactly the same name while |crep| automatically avoids cyclic repetition of messages. .. tip:: When using provenance headers, you must explicitly set ``header.converter=io.confluent.connect.replicator.util.ByteArrayConverter`` to get human readable header output, as described in :ref:`rep-dest-data-conversion` configuration options on |crep|. This enables the replicator to read the contents of the header. To learn more, see ``provenance.header.enable`` under :ref:`consumer_offset_translation` in the |crep| configuration reference. However, if your brokers do not support the message format for 0.11.0 (message format version v2) or you have topics that do not support the message format for 0.11.0, then you will need to configure |crep| with ``provenance.header.enable=false`` (which is the default) or else errors will result. To learn more, see the discussion on :ref:`replicator_topic_renaming`. Provenance headers are a |crep| configuration option for :ref:`consumer_offset_translation`. .. note:: If |ak| messages have custom headers and are being replicated with provenance headers (``provenance.header.enable=true``), the consumer applications that are processing headers should skip the header with key ``__replicator_id``. Depending on how the consumer is processing records, this may not require a code change. If the consumer is asking for a specific header, no change is required. If, however, the consumer is using ``toArray`` from the |ak| class :platform:`Interface Headers|clients/javadocs/javadoc/org/apache/kafka/common/header/Headers.html` to return all headers, the consumer must explicitly skip ``__replicator_id``, otherwise, the replication will produce errors such as "could not decode JSON types .." and "unrecognized tokens". Here is an example of Java code for the consumer that uses :platform:`ConsumerRecord|clients/javadocs/javadoc/org/apache/kafka/clients/consumer/ConsumerRecord.html` to read the header keys and skips ``__replicator_id``: .. codewithvars:: bash ConsumerRecord record = ... Headers headers = record.headers(); if (headers != null) { for (Header header : headers) { if (!"__replicator_id".equals(header.key())) { ... process application header... } } } .. _tuning_offset_translation: Advanced configuration for failover scenarios (Tuning offset translation) ------------------------------------------------------------------------- The following configuration properties are for advanced tuning of offset translation. .. include:: ../../includes/offset-translation-warning.rst |crep| configuration ~~~~~~~~~~~~~~~~~~~~ ``offset.translator.tasks.max`` The maximum number of |crep| tasks that will perform offset translation. If -1 (the default), all tasks will perform offset translation. * Type: int * Default: -1 * Importance: medium ``offset.translator.tasks.separate`` Whether to translate offsets in separate tasks from those performing topic replication. * Type: boolean * Default: false * Importance: medium ``offset.timestamps.commit`` Whether to commit internal offset timestamps for |crep|, so that it can resume properly when switching to the secondary cluster. * Type: boolean * Default: true * Importance: medium .. include:: includes/config.rst ``fetch.offset.expiry.ms`` The amount of time in milliseconds after which a fetch offset request for offset translation expires and is discarded. * Type: long * Default: 600000 * Importance: low ``fetch.offset.retry.backoff.ms`` The amount of time in milliseconds to wait before attempting to retry a failed fetch offset request during offset translation. * Type: long * Default: 100 * Importance: low Consumer configuration ~~~~~~~~~~~~~~~~~~~~~~ ``timestamps.topic.num.partitions`` The number of partitions for the consumer timestamps topic. * Type: int * Default: 50 * Importance: high ``timestamps.topic.replication.factor`` The replication factor for the consumer timestamps topic. * Type: short * Default: 3 * Importance: high ``timestamps.topic.max.per.partition`` The maximum number of timestamp records that will be held in memory for processing per partition. This is initially unbounded but can be configured lower to reduce memory consumption. If the number of timestamp records exceeds this value, then timestamps could be lost, which would result in losing the corresponding translated offsets. * Type: int * Default: Integer.MAX_VALUE * Importance: low .. _enable_disable_offset_translation: Enabling or disabling offset translation ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~ :ref:`consumer_offset_translation` is enabled (on) by default (``offset.translator.tasks.max`` is set to ``-1``). There are some cases that may require disabling offset translation. For example: - To troubleshoot a |crep| issue - When upgrading from earlier releases of |crep| that did not have bootstrap servers configured (with offset translation enabled, you would get an error message ``ConfigException: No resolvable bootstrap urls given in bootstrap.servers``) To disable offset translation, specify the following parameters: - ``offset.timestamps.commit=false`` - Turn off committing internal :ref:`consumer ` offset timestamps to source. - ``offset.translator.tasks.max=0`` - Turn off offset translation for :ref:`consumer ` interceptors. To re-enable offset translation, set ``offset.timestamps.commit`` to ``true`` and ``offset.translator.tasks.max`` to ``-1``. Related content --------------- - Blog: `15 Things Every Apache Kafka Engineer Should Know About Confluent Replicator `__ - Whitepaper: `Disaster Recovery for Multi-Datacenter Apache Kafka Deployments `_ - Demo (active-active multi-datacenter): :ref:`replicator` - :ref:`schemaregistry_mirroring` - :ref:`replicator_config_options`