.. _replicator: ================================================================================ Example: Replicate Data in an Active-Active Multi-DataCenter Deployment on |cp| ================================================================================ This example uses pre-built Docker images to show how to deploy an active-active multi-datacenter design, with two instances of |crep-full| that copy data bidirectionally between the datacenters. This Docker environment is for demo purposes only, not for production. Setup ===== .. include:: ../../includes/docker-prereqs.rst :start-line: 2 :end-line: 17 Design ------ This is an active-active multi-datacenter design, with two instances of |crep-full| that copy data bidirectionally between datacenters `dc1` and `dc2`. |c3| is running to manage and monitor the cluster. .. figure:: ../../images/mdc-level-1.png :alt: image |cp| Component ports -------------------- =============== ======================= ======================= \ dc1 dc2 =============== ======================= ======================= Broker 9091 9092 |zk| 2181 2182 |sr| 8081 (primary) 8082 (secondary) |kconnect| 8381 (copying dc2->dc1) 8382 (copying dc1->dc2) |c3-short| 9021 =============== ======================= ======================= Data generation and topic names ------------------------------- +--------------+--------+----------+----------------+-------------+----------------+ | Datagen | Origin | Origin | Replicator | Destination | Destination | | Docker | DC | topics | instance | DC | topics | | container | | | | | | +==============+========+==========+================+=============+================+ | datagen-dc1- | dc1 | topic1, | replicator-dc1 | dc2 | topic1, | | topic1 | | _schemas | -to-dc2-topic1 | | _schemas | +--------------+--------+----------+----------------+-------------+----------------+ | datagen-dc1- | dc1 | topic2 | replicator-dc1 | dc2 | topic2.replica | | topic2 | | | -to-dc2-topic2 | | | +--------------+--------+----------+----------------+-------------+----------------+ | datagen-dc2- | dc2 | topic1 | replicator-dc2 | dc1 | topic1 | | topic1 | | | -to-dc1-topic1 | | | +--------------+--------+----------+----------------+-------------+----------------+ |crep-full| (version 5.0.1 and higher) prevents cyclic repetition of data between the dc1 ``topic1`` and dc2 ``topic1`` by using provenance information in the message headers. Run the demo ============ Environment ----------- This demo has been validated with: - Docker version 17.06.1-ce - Docker Compose version 1.14.0 with Docker Compose file format 2.1 Start the services ------------------ #. Clone the `confluentinc/examples `__ GitHub repository and check out the :litwithvars:`|release|-post` branch. .. codewithvars:: bash git clone https://github.com/confluentinc/examples cd examples git checkout |release|-post #. Navigate to ``multi-datacenter`` examples directory. .. sourcecode:: bash cd multi-datacenter/ #. Start all services and sample messages from topics in each datacenter: .. code:: bash ./start.sh #. View the full configurations for the |ak| brokers, |sr|, and connect workers in the :devx-examples:`docker-compose.yml|multi-datacenter/docker-compose.yml` file. .. code:: bash cat docker-compose.yml Verify multi-datacenter setup ============================= |crep| ------ In this multi-datacenter environment, there are two Apache Kafka® clusters, ``dc1`` and ``dc2``. |c3| manages both of these clusters. |crep-full| is that copy data bidirectionally between ``dc1`` and ``dc2``, but for simplicity in explaining how it works, the following sections consider replication only from ``dc1`` to ``dc2``. #. After starting the demo (see `previous section <#start-the-services>`__), navigate your Chrome browser to the |c3-short| UI at http://localhost:9021 and verify that there are two |ak| clusters ``dc1`` and ``dc2``: .. figure:: ../../images/c3-two-clusters.png :alt: image #. This demo has two Kafka Connect clusters, ``connect-dc1`` and ``connect-dc2``. Recall that Replicator is a source connector that typically runs on the connect cluster at the destination, so ``connect-dc1`` runs Replicator copying from ``dc2`` to ``dc1``, and ``connect-dc2`` runs Replicator copying from ``dc1`` to ``dc2``. Since version 5.2, |c3-short| can manage multiple Kafka Connect clusters, but in this demo we focus only on those Replicator instances running in ``connect-dc2`` copying from ``dc1`` to ``dc2``. #. For Replicator copying from ``dc1`` to ``dc2``: navigate to the **Connect** menu to verify that Kafka Connect (configured to ``connect-dc2``) is running two instances of Replicator: :devx-examples:`replicator-dc1-to-dc2-topic1|multi-datacenter/submit_replicator_dc1_to_dc2.sh` and :devx-examples:`replicator-dc1-to-dc2-topic2|multi-datacenter/submit_replicator_dc1_to_dc2_topic2.sh` .. figure:: ../../images/replicator-instances.png :alt: image Inspect topics -------------- #. For each datacenter, inspect the data in various topics, provenance information, timestamp information, and cluster ID. .. code:: bash ./read-topics.sh #. Verify the output resembles: .. code-block:: text -----dc1----- list topics: __consumer_offsets __consumer_timestamps _confluent-command _confluent-license _confluent-telemetry-metrics _confluent_balancer_api_state _schemas connect-configs-dc1 connect-offsets-dc1 connect-status-dc1 topic1 topic2 topic1: {"userid":{"string":"User_7"},"dc":{"string":"dc1"}} {"userid":{"string":"User_7"},"dc":{"string":"dc2"}} {"userid":{"string":"User_9"},"dc":{"string":"dc2"}} {"userid":{"string":"User_2"},"dc":{"string":"dc1"}} {"userid":{"string":"User_5"},"dc":{"string":"dc2"}} {"userid":{"string":"User_1"},"dc":{"string":"dc1"}} {"userid":{"string":"User_3"},"dc":{"string":"dc2"}} {"userid":{"string":"User_7"},"dc":{"string":"dc1"}} {"userid":{"string":"User_1"},"dc":{"string":"dc2"}} {"userid":{"string":"User_8"},"dc":{"string":"dc1"}} Processed a total of 10 messages topic2: {"registertime":{"long":1513471082347},"userid":{"string":"User_2"},"regionid":{"string":"Region_7"},"gender":{"string":"OTHER"}} {"registertime":{"long":1496006007512},"userid":{"string":"User_5"},"regionid":{"string":"Region_6"},"gender":{"string":"OTHER"}} {"registertime":{"long":1494319368203},"userid":{"string":"User_7"},"regionid":{"string":"Region_2"},"gender":{"string":"FEMALE"}} {"registertime":{"long":1493150028737},"userid":{"string":"User_1"},"regionid":{"string":"Region_5"},"gender":{"string":"FEMALE"}} {"registertime":{"long":1517151907191},"userid":{"string":"User_5"},"regionid":{"string":"Region_3"},"gender":{"string":"OTHER"}} {"registertime":{"long":1489672305692},"userid":{"string":"User_2"},"regionid":{"string":"Region_6"},"gender":{"string":"OTHER"}} {"registertime":{"long":1511471447951},"userid":{"string":"User_2"},"regionid":{"string":"Region_5"},"gender":{"string":"MALE"}} {"registertime":{"long":1488018372941},"userid":{"string":"User_7"},"regionid":{"string":"Region_2"},"gender":{"string":"OTHER"}} {"registertime":{"long":1500952152251},"userid":{"string":"User_2"},"regionid":{"string":"Region_1"},"gender":{"string":"MALE"}} {"registertime":{"long":1493556444692},"userid":{"string":"User_1"},"regionid":{"string":"Region_8"},"gender":{"string":"FEMALE"}} Processed a total of 10 messages _schemas: null null null {"subject":"topic1-value","version":1,"id":1,"schema":"{\"type\":\"record\",\"name\":\"KsqlDataSourceSchema\",\"namespace\":\"io.confluent.ksql.avro_schemas\",\"fields\":[{\"name\":\"userid\",\"type\":[\"null\",\"string\"],\"default\":null},{\"name\":\"dc\",\"type\":[\"null\",\"string\"],\"default\":null}]}","deleted":false} {"subject":"topic2-value","version":1,"id":2,"schema":"{\"type\":\"record\",\"name\":\"KsqlDataSourceSchema\",\"namespace\":\"io.confluent.ksql.avro_schemas\",\"fields\":[{\"name\":\"registertime\",\"type\":[\"null\",\"long\"],\"default\":null},{\"name\":\"userid\",\"type\":[\"null\",\"string\"],\"default\":null},{\"name\":\"regionid\",\"type\":[\"null\",\"string\"],\"default\":null},{\"name\":\"gender\",\"type\":[\"null\",\"string\"],\"default\":null}]}","deleted":false} {"subject":"topic2.replica-value","version":1,"id":2,"schema":"{\"type\":\"record\",\"name\":\"KsqlDataSourceSchema\",\"namespace\":\"io.confluent.ksql.avro_schemas\",\"fields\":[{\"name\":\"registertime\",\"type\":[\"null\",\"long\"],\"default\":null},{\"name\":\"userid\",\"type\":[\"null\",\"string\"],\"default\":null},{\"name\":\"regionid\",\"type\":[\"null\",\"string\"],\"default\":null},{\"name\":\"gender\",\"type\":[\"null\",\"string\"],\"default\":null}]}","deleted":false} [2021-01-04 19:16:09,579] ERROR Error processing message, terminating consumer process: (kafka.tools.ConsoleConsumer$) org.apache.kafka.common.errors.TimeoutException Processed a total of 6 messages provenance info (cluster, topic, timestamp): 2qHo2TsdTIaTjvkCyf3qdw,topic1,1609787778125 2qHo2TsdTIaTjvkCyf3qdw,topic1,1609787779123 2qHo2TsdTIaTjvkCyf3qdw,topic1,1609787780125 2qHo2TsdTIaTjvkCyf3qdw,topic1,1609787781246 2qHo2TsdTIaTjvkCyf3qdw,topic1,1609787782125 Processed a total of 10 messages timestamp info (group: topic-partition): replicator-dc1-to-dc2-topic2: topic2-0 1609787797164 replicator-dc1-to-dc2-topic1: topic1-0 1609787797117 Processed a total of 2 messages cluster id: ZagAAEfORQG-lxwq6OsV5Q -----dc2----- list topics: __consumer_offsets __consumer_timestamps _confluent-command _confluent-controlcenter-6-1-0-1-AlertHistoryStore-changelog _confluent-controlcenter-6-1-0-1-AlertHistoryStore-repartition _confluent-controlcenter-6-1-0-1-Group-ONE_MINUTE-changelog _confluent-controlcenter-6-1-0-1-Group-ONE_MINUTE-repartition _confluent-controlcenter-6-1-0-1-Group-THREE_HOURS-changelog _confluent-controlcenter-6-1-0-1-Group-THREE_HOURS-repartition _confluent-controlcenter-6-1-0-1-KSTREAM-OUTEROTHER-0000000106-store-changelog _confluent-controlcenter-6-1-0-1-KSTREAM-OUTEROTHER-0000000106-store-repartition _confluent-controlcenter-6-1-0-1-KSTREAM-OUTERTHIS-0000000105-store-changelog _confluent-controlcenter-6-1-0-1-KSTREAM-OUTERTHIS-0000000105-store-repartition _confluent-controlcenter-6-1-0-1-MetricsAggregateStore-changelog _confluent-controlcenter-6-1-0-1-MetricsAggregateStore-repartition _confluent-controlcenter-6-1-0-1-MonitoringMessageAggregatorWindows-ONE_MINUTE-changelog _confluent-controlcenter-6-1-0-1-MonitoringMessageAggregatorWindows-ONE_MINUTE-repartition _confluent-controlcenter-6-1-0-1-MonitoringMessageAggregatorWindows-THREE_HOURS-changelog _confluent-controlcenter-6-1-0-1-MonitoringMessageAggregatorWindows-THREE_HOURS-repartition _confluent-controlcenter-6-1-0-1-MonitoringStream-ONE_MINUTE-changelog _confluent-controlcenter-6-1-0-1-MonitoringStream-ONE_MINUTE-repartition _confluent-controlcenter-6-1-0-1-MonitoringStream-THREE_HOURS-changelog _confluent-controlcenter-6-1-0-1-MonitoringStream-THREE_HOURS-repartition _confluent-controlcenter-6-1-0-1-MonitoringTriggerStore-changelog _confluent-controlcenter-6-1-0-1-MonitoringTriggerStore-repartition _confluent-controlcenter-6-1-0-1-MonitoringVerifierStore-changelog _confluent-controlcenter-6-1-0-1-MonitoringVerifierStore-repartition _confluent-controlcenter-6-1-0-1-TriggerActionsStore-changelog _confluent-controlcenter-6-1-0-1-TriggerActionsStore-repartition _confluent-controlcenter-6-1-0-1-TriggerEventsStore-changelog _confluent-controlcenter-6-1-0-1-TriggerEventsStore-repartition _confluent-controlcenter-6-1-0-1-actual-group-consumption-rekey _confluent-controlcenter-6-1-0-1-aggregate-topic-partition-store-changelog _confluent-controlcenter-6-1-0-1-aggregate-topic-partition-store-repartition _confluent-controlcenter-6-1-0-1-aggregatedTopicPartitionTableWindows-ONE_MINUTE-changelog _confluent-controlcenter-6-1-0-1-aggregatedTopicPartitionTableWindows-ONE_MINUTE-repartition _confluent-controlcenter-6-1-0-1-aggregatedTopicPartitionTableWindows-THREE_HOURS-changelog _confluent-controlcenter-6-1-0-1-aggregatedTopicPartitionTableWindows-THREE_HOURS-repartition _confluent-controlcenter-6-1-0-1-cluster-rekey _confluent-controlcenter-6-1-0-1-expected-group-consumption-rekey _confluent-controlcenter-6-1-0-1-group-aggregate-store-ONE_MINUTE-changelog _confluent-controlcenter-6-1-0-1-group-aggregate-store-ONE_MINUTE-repartition _confluent-controlcenter-6-1-0-1-group-aggregate-store-THREE_HOURS-changelog _confluent-controlcenter-6-1-0-1-group-aggregate-store-THREE_HOURS-repartition _confluent-controlcenter-6-1-0-1-group-stream-extension-rekey _confluent-controlcenter-6-1-0-1-metrics-trigger-measurement-rekey _confluent-controlcenter-6-1-0-1-monitoring-aggregate-rekey-store-changelog _confluent-controlcenter-6-1-0-1-monitoring-aggregate-rekey-store-repartition _confluent-controlcenter-6-1-0-1-monitoring-message-rekey-store _confluent-controlcenter-6-1-0-1-monitoring-trigger-event-rekey _confluent-license _confluent-metrics _confluent-monitoring _confluent-telemetry-metrics _confluent_balancer_api_state _schemas connect-configs-dc2 connect-offsets-dc2 connect-status-dc2 topic1 topic2.replica topic1: {"userid":{"string":"User_2"},"dc":{"string":"dc2"}} {"userid":{"string":"User_1"},"dc":{"string":"dc1"}} {"userid":{"string":"User_6"},"dc":{"string":"dc2"}} {"userid":{"string":"User_9"},"dc":{"string":"dc1"}} {"userid":{"string":"User_9"},"dc":{"string":"dc2"}} {"userid":{"string":"User_9"},"dc":{"string":"dc1"}} {"userid":{"string":"User_9"},"dc":{"string":"dc2"}} {"userid":{"string":"User_9"},"dc":{"string":"dc1"}} {"userid":{"string":"User_9"},"dc":{"string":"dc2"}} {"userid":{"string":"User_9"},"dc":{"string":"dc1"}} Processed a total of 10 messages topic2.replica: {"registertime":{"long":1488571887136},"userid":{"string":"User_2"},"regionid":{"string":"Region_4"},"gender":{"string":"FEMALE"}} {"registertime":{"long":1496554479008},"userid":{"string":"User_3"},"regionid":{"string":"Region_9"},"gender":{"string":"OTHER"}} {"registertime":{"long":1515819037639},"userid":{"string":"User_1"},"regionid":{"string":"Region_7"},"gender":{"string":"FEMALE"}} {"registertime":{"long":1498630829454},"userid":{"string":"User_9"},"regionid":{"string":"Region_5"},"gender":{"string":"FEMALE"}} {"registertime":{"long":1491954362758},"userid":{"string":"User_6"},"regionid":{"string":"Region_6"},"gender":{"string":"FEMALE"}} {"registertime":{"long":1498308706008},"userid":{"string":"User_2"},"regionid":{"string":"Region_2"},"gender":{"string":"OTHER"}} {"registertime":{"long":1509409463384},"userid":{"string":"User_5"},"regionid":{"string":"Region_8"},"gender":{"string":"OTHER"}} {"registertime":{"long":1494736574275},"userid":{"string":"User_4"},"regionid":{"string":"Region_4"},"gender":{"string":"OTHER"}} {"registertime":{"long":1513254638109},"userid":{"string":"User_3"},"regionid":{"string":"Region_5"},"gender":{"string":"FEMALE"}} {"registertime":{"long":1499607488391},"userid":{"string":"User_4"},"regionid":{"string":"Region_2"},"gender":{"string":"OTHER"}} Processed a total of 10 messages _schemas: null null null {"subject":"topic1-value","version":1,"id":1,"schema":"{\"type\":\"record\",\"name\":\"KsqlDataSourceSchema\",\"namespace\":\"io.confluent.ksql.avro_schemas\",\"fields\":[{\"name\":\"userid\",\"type\":[\"null\",\"string\"],\"default\":null},{\"name\":\"dc\",\"type\":[\"null\",\"string\"],\"default\":null}]}","deleted":false} {"subject":"topic2-value","version":1,"id":2,"schema":"{\"type\":\"record\",\"name\":\"KsqlDataSourceSchema\",\"namespace\":\"io.confluent.ksql.avro_schemas\",\"fields\":[{\"name\":\"registertime\",\"type\":[\"null\",\"long\"],\"default\":null},{\"name\":\"userid\",\"type\":[\"null\",\"string\"],\"default\":null},{\"name\":\"regionid\",\"type\":[\"null\",\"string\"],\"default\":null},{\"name\":\"gender\",\"type\":[\"null\",\"string\"],\"default\":null}]}","deleted":false} {"subject":"topic2.replica-value","version":1,"id":2,"schema":"{\"type\":\"record\",\"name\":\"KsqlDataSourceSchema\",\"namespace\":\"io.confluent.ksql.avro_schemas\",\"fields\":[{\"name\":\"registertime\",\"type\":[\"null\",\"long\"],\"default\":null},{\"name\":\"userid\",\"type\":[\"null\",\"string\"],\"default\":null},{\"name\":\"regionid\",\"type\":[\"null\",\"string\"],\"default\":null},{\"name\":\"gender\",\"type\":[\"null\",\"string\"],\"default\":null}]}","deleted":false} [2021-01-04 19:17:26,336] ERROR Error processing message, terminating consumer process: (kafka.tools.ConsoleConsumer$) org.apache.kafka.common.errors.TimeoutException Processed a total of 6 messages provenance info (cluster, topic, timestamp): ZagAAEfORQG-lxwq6OsV5Q,topic1,1609787854055 ZagAAEfORQG-lxwq6OsV5Q,topic1,1609787854057 ZagAAEfORQG-lxwq6OsV5Q,topic1,1609787856052 ZagAAEfORQG-lxwq6OsV5Q,topic1,1609787857052 ZagAAEfORQG-lxwq6OsV5Q,topic1,1609787857054 Processed a total of 10 messages timestamp info (group: topic-partition): replicator-dc2-to-dc1-topic1: topic1-0 1609787867007 replicator-dc2-to-dc1-topic1: topic1-0 1609787877008 Processed a total of 2 messages cluster id: 2qHo2TsdTIaTjvkCyf3qdw .. note:: Starting with |cp| 6.2.1, the ``_confluent-command`` internal topic is available as the preferred alternative to the ``_confluent-license`` topic, and is the default for new clusters. Both will be supported going forward. To learn more, see :ref:`cp_configure_license`. Monitor |crep| performance ========================== Monitoring Replicator is important to: 1. Gauge how synchronized is the data between multiple datacenters 2. Optimize performance of the network and Confluent Platform |crep| monitoring ----------------- |c3-short|’s provides detailed monitoring for |crep| from the `Replicators` menu. Selecting this shows an overview of the |crep| instances currently running on the clusters. Important metrics are shown for each |crep|: * Source Cluster - the cluster that |crep| is replicating from. * Source Topics - the number of topics being replicated. * Messages Replicated in/s - the number of messages replicated per second. * Message Lag - the number of messages present on the source cluster that have not yet been replicated to the destination. * Latency - the difference between the time the message was present on the destination and present on the source. .. figure:: ../../images/c3-replicators.png :alt: image Selecting any of the |crep| instances displayed here will show more detail for a particular instance. Under the **Status** tab you will see the metrics above broken down by |kconnect| task and on the **Source Topics** tab you will see them broken down by topic. .. figure:: ../../images/c3-replicator-detail.png :alt: image Further selecting the `Throughput` card shows detailed message lag broken down by |kconnect| tasks and selecting any of the listed tasks shows metrics for the partitions that task is replicating. .. figure:: ../../images/c3-replicator-tasks.png :alt: image Streams monitoring ------------------ |c3-short|’s monitoring capabilities include monitoring stream performance: verifying that all data is consumed and at what throughput and latency. Monitoring can be displayed on a per-consumer group or per-topic basis. |crep-full| has an embedded consumer that reads data from the origin cluster, so you can monitor its performance in |c3-short|. To enable streams monitoring for Replicator, configure it with the :ref:`Monitoring Consumer Interceptor `, as shown in this :devx-examples:`example|multi-datacenter/submit_replicator_dc1_to_dc2.sh`. .. figure:: ../../images/replicator-embedded-consumer.png :alt: image #. Select the ``dc1`` Kafka cluster from the menu on the left and then select **Data Streams**. Verify that there are two consumer groups, one for reach Replicator instance running from ``dc1`` to ``dc2``, called ``replicator-dc1-to-dc2-topic1`` and ``replicator-dc1-to-dc2-topic2``: .. figure:: ../../images/c3-streams-dc-1.png :alt: image #. Select the ``dc2`` Kafka cluster from the menu on the left and then select **Data Streams**. Verify that there is one consumer group running from ``dc2`` to ``dc1`` called ``replicator-dc2-to-dc1-topic1``: .. figure:: ../../images/c3-streams-dc-2.png :alt: image Consumer lag ------------ Replicator has an embedded consumer that reads data from the origin cluster, and it commits its offsets only after the connect worker’s producer has committed the data to the destination cluster (configure the frequency of commits with the parameter ``offset.flush.interval.ms``). You can monitor the consumer lag of Replicator’s embedded consumer in the origin cluster (for Replicator instances that copy data from ``dc1`` to ``dc2``, the origin cluster is ``dc1``). The ability to monitor Replicator’s consumer lag is enabled when it is configured with ``offset.topic.commit=true`` (``true`` by default), which allows Replicator to commit its own consumer offsets to the origin cluster ``dc1`` after the messages have been written to the destination cluster. #. For Replicator copying from ``dc1`` to ``dc2``: Select ``dc1`` (origin cluster) from the menu on the left and then select **Consumers**. Verify that there are two consumer groups, one for reach Replicator instance running from ``dc1`` to ``dc2``: ``replicator-dc1-to-dc2-topic1`` and ``replicator-dc1-to-dc2-topic2``. Replicator’s consumer lag information is available in |c3-short| and ``kafka-consumer-groups``, but it is not available via JMX. a. Click on ``replicator-dc1-to-dc2-topic1`` to view Replicator’s consumer lag in reading ``topic1`` and ``_schemas``. This view is equivalent to: .. code-block:: text docker-compose exec broker-dc1 kafka-consumer-groups --bootstrap-server broker-dc1:29091 --describe --group replicator-dc1-to-dc2-topic1 .. figure:: ../../images/c3-consumer-lag-dc1-topic1.png :alt: image b. Click on ``replicator-dc1-to-dc2-topic2`` to view Replicator’s consumer lag in reading ``topic2`` (equivalent to ``docker-compose exec broker-dc1 kafka-consumer-groups --bootstrap-server broker-dc1:29091 --describe --group replicator-dc1-to-dc2-topic2``) .. figure:: ../../images/c3-consumer-lag-dc1-topic2.png :alt: image #. For Replicator copying from ``dc1`` to ``dc2``: do not mistakenly try to monitor Replicator consumer lag in the destination cluster ``dc2``. |c3-short| also shows the Replicator consumer lag for topics in ``dc2`` (i.e., ``topic1``, ``_schemas``, ``topic2.replica``) but this does not mean that Replicator is consuming from them. The reason you see this consumer lag in ``dc2`` is because by default Replicator is configured with ``offset.timestamps.commit=true`` for which Replicator commits its own offset timestamps of its consumer group in the ``__consumer_offsets`` topic in the destination cluster ``dc2``. In case of disaster recovery, this enables Replicator to resume where it left off when switching to the secondary cluster. #. Do not confuse consumer lag with an MBean attribute called ``records-lag`` associated with Replicator’s embedded consumer. That attribute reflects whether Replicator’s embedded consumer can keep up with the original data production rate, but it does not take into account replication lag producing the messages to the destination cluster. ``records-lag`` is real time, and it is normal for this value to be ``0.0``. .. code-block:: text docker-compose exec connect-dc2 \ kafka-run-class kafka.tools.JmxTool \ --object-name "kafka.consumer:type=consumer-fetch-manager-metrics,partition=0,topic=topic1,client-id=replicator-dc1-to-dc2-topic1-0" \ --attributes "records-lag" \ --jmx-url service:jmx:rmi:///jndi/rmi://connect-dc2:9892/jmxrmi Resume applications after failover ================================== After a disaster event occurs, switch your Java consumer application to a different datacenter, and then it can automatically restart consuming data in the destination cluster where it left off in the origin cluster. To use this capability, configure Java consumer applications with the :ref:`Consumer Timestamps Interceptor `, which is shown in this :devx-examples:`sample code|multi-datacenter/src/main/java/io/confluent/examples/clients/ConsumerMultiDatacenterExample.java`. #. After starting the demo (see `previous section <#start-the-services>`__), run the consumer to connect to the ``dc1`` Kafka cluster. It automatically configures the consumer group ID as ``java-consumer-topic1`` and uses the Consumer Timestamps Interceptor and Monitoring Interceptor. .. code:: bash mvn clean package mvn exec:java -Dexec.mainClass=io.confluent.examples.clients.ConsumerMultiDatacenterExample -Dexec.args="topic1 localhost:9091 http://localhost:8081 localhost:9092" #. Verify in the consumer output that it is reading data originating from both dc1 and dc2: .. code:: bash ... key = User_1, value = {"userid": "User_1", "dc": "dc1"} key = User_9, value = {"userid": "User_9", "dc": "dc2"} key = User_6, value = {"userid": "User_6", "dc": "dc2"} ... #. Even though the consumer is consuming from dc1, there are dc2 consumer offsets committed for the consumer group ``java-consumer-topic1``. Run the following command to read from the ``__consumer_offsets`` topic in dc2. .. code:: bash docker-compose exec broker-dc2 \ kafka-console-consumer \ --topic __consumer_offsets \ --bootstrap-server localhost:9092 \ --formatter "kafka.coordinator.group.GroupMetadataManager\$OffsetsMessageFormatter" | grep java-consumer #. Verify that there are committed offsets: .. code:: bash ... [java-consumer-topic1,topic1,0]::OffsetAndMetadata(offset=1142, leaderEpoch=Optional.empty, metadata=, commitTimestamp=1547146285084, expireTimestamp=None) [java-consumer-topic1,topic1,0]::OffsetAndMetadata(offset=1146, leaderEpoch=Optional.empty, metadata=, commitTimestamp=1547146286082, expireTimestamp=None) [java-consumer-topic1,topic1,0]::OffsetAndMetadata(offset=1150, leaderEpoch=Optional.empty, metadata=, commitTimestamp=1547146287084, expireTimestamp=None) ... #. When :ref:`Confluent Monitoring Interceptors ` are configured on Kafka clients, they write metadata to a topic called ``_confluent-monitoring``. Kafka clients include any application that uses the Apache Kafka client API to connect to Kafka brokers, such as custom client code or any service that has embedded producers or consumers, such as Kafka Connect, |ksqldb|, or a Kafka Streams application. |c3-short| uses that topic to ensure that all messages are delivered and to provide statistics on throughput and latency performance. From that same topic, you can also derive which producers are writing to which topics and which consumers are reading from which topics, and an example :devx-examples:`script|multi-datacenter/map_topics_clients.py` is provided with the repo. .. code:: bash ./map_topics_clients.py .. note:: This script is for demo purposes only. It is not suitable for production. #. In steady state with the Java consumer running, you should see: .. code:: bash Reading topic _confluent-monitoring for 60 seconds...please wait __consumer_timestamps producers consumer-1 producer-10 producer-11 producer-6 producer-8 consumers replicator-dc1-to-dc2-topic1 replicator-dc1-to-dc2-topic2 replicator-dc2-to-dc1-topic1 _schemas producers connect-worker-producer-dc2 consumers replicator-dc1-to-dc2-topic1 topic1 producers connect-worker-producer-dc1 connect-worker-producer-dc2 datagen-dc1-topic1 datagen-dc2-topic1 consumers java-consumer-topic1 replicator-dc1-to-dc2-topic1 replicator-dc2-to-dc1-topic1 topic2 producers datagen-dc1-topic2 consumers replicator-dc1-to-dc2-topic2 topic2.replica producers connect-worker-producer-dc2 #. Shut down ``dc1``: .. code:: bash docker-compose stop connect-dc1 schema-registry-dc1 broker-dc1 zookeeper-dc1 #. Stop and restart the consumer to connect to the ``dc2`` Kafka cluster. It will still use the same consumer group ID ``java-consumer-topic1`` so it can resume where it left off: .. code:: bash mvn exec:java -Dexec.mainClass=io.confluent.examples.clients.ConsumerMultiDatacenterExample -Dexec.args="topic1 localhost:9092 http://localhost:8082 localhost:9092" #. Verify that see data sourced only from ``dc2``: .. code:: bash ... key = User_8, value = {"userid": "User_8", "dc": "dc2"} key = User_9, value = {"userid": "User_9", "dc": "dc2"} key = User_5, value = {"userid": "User_5", "dc": "dc2"} ... Stop the demo ============= When you are done with the demo, stop it and remove Docker containers and images. #. Run this script to stop all services, and remove the Docker containers and images: .. code:: bash ./stop.sh #. Run the following command to verify that no containers are running: .. code:: bash docker container ls Related content =============== * For a practical guide to designing and configuring multiple Apache Kafka clusters to be resilient in case of a disaster scenario, see the `Disaster Recovery white paper `__. This white paper provides a plan for failover, failback, and ultimately successful recovery. * For an overview of using |cp| for data replication, see :ref:`multi_dc`. * For a quick start on how to configure |crep| and set up your own multi-cluster deployment, see :ref:`replicator_quickstart`. * For an overview of |crep| see :ref:`replicator_detail`. * For an introduction to using |cp| to create stretch clusters with followers, observers, and replica placement, see :ref:`bmrr`.