.. _replicator_detail: Replicator for Multi-Datacenter Replication =========================================== |crep-full| can be deployed across clusters and in multiple datacenters. Multi-datacenter deployments enable use-cases such as: * Active-active geo-localized deployments: allows users to access a near-by data center to optimize their architecture for low latency and high performance * Active-passive disaster recover (DR) deployments: in an event of a partial or complete datacenter disaster, allow failing over applications to use |cp| in a different datacenter. * Centralized analytics: Aggregate data from multiple |ak-tm| clusters into one location for organization-wide analytics * Cloud migration: Use |ak| to synchronize data between on-prem applications and cloud deployments Replication of events in |ak| topics from one cluster to another is the foundation of Confluent's multi datacenter architecture. Replication can be done with |crep-full| or using the open source `Kafka MirrorMaker `_. This documentation focuses on |crep-full|, including :ref:`architecture `, :ref:`quick start tutorial `, how to :ref:`configure and run ` Replicator in different contexts, :ref:`tuning and monitoring `, :ref:`cross-cluster failover `, and more. A section on how to :ref:`migrate from MirrorMaker to Replicator ` is also included. Some of the general thinking on deployment strategies can also apply to |mmaker|, but if you are primarily interested in |mmaker|, see `Mirroring data between clusters `_ in the |ak| documentation. .. _replicator_architecture: |crep| ------ |crep| allows you to easily and reliably replicate topics from one |ak| cluster to another. In addition to copying the messages, |crep| will create topics as needed preserving the topic configuration in the source cluster. This includes preserving the number of partitions, the replication factor, and any configuration overrides specified for individual topics. ------------ Architecture ------------ The diagram below shows the |crep| architecture. |crep| uses the |kconnect-long| APIs and Workers to provide high availability, load-balancing and centralized management. .. figure:: replicator_components.png :align: center Replicator Architecture .. tip:: You can deploy |crep| near the destination cluster or the origin cluster, and it will work either way. However, a best practice is to deploy |crep| closer to the destination cluster for reliability and performance over networks. Therefore, if the destination cluster is `Confluent Cloud `__, we recommend that you deploy |crep| on an instance in the same region as your Confluent Cloud cluster. However, if the origin cluster does not permit external connections, you may deploy |crep| in the origin cluster. (See also `Migrate Topics on Confluent Cloud Clusters `__.) ------------------ Example Deployment ------------------ In a typical multi-datacenter deployment, data from two geographically distributed |ak| clusters located in separate datacenters is aggregated in a separate cluster located in another datacenter. The origin of the copied data is referred to as the "source" cluster while the target of the copied data is referred to as the "destination." Each source cluster requires a separate instance of |crep|. For convenience you can run them in the same Connect cluster, located in the aggregate datacenter. .. figure:: replicator.png :align: center :scale: 75 % Replication to an Aggregate Cluster ------------------------------ Guidelines for Getting Started ------------------------------ Follow these guidelines to configure a multi-datacenter deployment using |crep|: 1. Use the :ref:`Replicator quick start` to set up replication between two |ak| clusters. 2. Learn how to :ref:`install and configure` |crep| and other |cp| components in multi datacenter environments. 3. Before running |crep| in production, make sure you read the :ref:`monitoring and tuning guide`. 4. For a practical guide to designing and configuring multiple |ak| clusters to be resilient in case of a disaster scenario, see the `Disaster Recovery `_ white paper. This white paper provides a plan for failover, failback, and ultimately successful recovery. .. _mdc_replicator_demos: ------------------ Demos and Examples ------------------ After completing the :ref:`Replicator quick start`, explore end-to-end demos that showcase |crep| in multi-datacenter deployments in a full event streaming platform. Check out these hands-on demos, for which you can download the demo from GitHub and run yourself: 1. :ref:`Multi-datacenter`: fully-automated Docker-based demo of an active-active multi-datacenter design with two instances of |crep| copying data bidirectionally between the datacenters 2. :ref:`Hybrid on-prem to Confluent Cloud`: on-prem Kafka cluster and |ccloud| cluster, and data copied between them with |crep| 3. :ref:`Hybrid GKE to Confluent Cloud`: Kubernetes, Confluent Operator, and |crep| to replicate topics to |ccloud| 4. :ref:`Schema translation`: showcases the transfer of schemas stored in |sr| from one cluster to another using |crep| 5. :ref:`Confluent Platform demo`: shows users how to deploy a |ak| streaming ETL using KSQL for stream processing and |c3| for monitoring, along with |crep| to replicate data. ---------------------------------------- Using |crep| to Migrate Data to |ccloud| ---------------------------------------- You can use |crep| to migrate topics and schemas to `Confluent Cloud `__. To learn about migrating topics, see `Migrate Topics on Confluent Cloud Clusters `__. To learn about migrating schemas, see :ref:`schemaregistry_migrate`. .. include:: includes/kafka-replicator-ccloud-compat.rst .. _replicator_topic_renaming: -------------- Topic Renaming -------------- By default, the replicator is configured to use the same topic name in both the source and destination clusters. This works fine if you are only replicating from a single cluster. When copying data from multiple clusters to a single destination (i.e. the aggregate use case), you should use a separate topic for each source cluster in case there are any configuration differences between the topics in the source clusters. It is possible to use the same |ak| cluster as the source and destination as long as you ensure that the replicated topic name is different. This is not a recommended pattern since generally you should prefer |ak|'s built-in replication within the same cluster, but it may be useful in some cases (e.g. testing). Starting with |cp| 5.0, |crep| protects against circular replication through the use of :ref:`provenance headers `. This guarantees that if two |crep| instances are configured to run, one replicating from DC1 to DC2 and the second instance configured to replicate from DC2 to DC1, |crep| will ensure that messages replicated to DC2 are not replicated back to DC1, and vice versa. As a result, Replicator safely runs in each direction. Although |crep| can enable applications in different datacenters to access topics with the same names, you should design client applications with a topic naming strategy that takes into consideration a number of factors. If you plan to have the same topic name span datacenters, be aware that in this configuration: - Producers do not wait for commit acknowledgment from the remote cluster, and |crep| asynchronously copies the data between datacenters after it has been committed locally. - If there are producers in each datacenter writing to topics of the same name, there is no "global ordering". This means there are no message ordering guarantees for data that originated from producers in different datacenters. - If there are consumer groups in each datacenter with the same group ID reading from topics of the same name, in steady state, they will be reprocessing the same messages in each datacenter. In some cases, you may not want to use the same topic name in each datacenter. For example, in cases where: - Replicator is running a version less than 5.0.1 - Kafka brokers are running a version prior to Kafka 0.11 that does not yet support message headers - Kafka brokers are running Kafka version 0.11 or later but have less than the minimum required ``log.message.format.version=2.0`` for using headers - Client applications are not designed to handle topics with the same name across datacenters In these cases, refer to the appendix on "Topic Naming Strategies to Prevent Cyclic Repetition" in the `Disaster Recovery `_ white paper. ------------------------- Periodic Metadata Updates ------------------------- The replicator periodically checks topics in the source cluster to tell whether there are any new topics which need to be replicated, and whether there are any configuration changes (e.g. increases in the number of partitions). The frequency of this checking is controlled with the ``metadata.max.age.ms`` setting in the connector configuration. The default is set to 2 minutes, which is intended to provide reasonable responsiveness to configuration changes while ensuring that the connector does not add any unnecessary load on the source cluster. You can lower this setting to detect changes quicker, but it's probably not advisable as long as topic creation/reconfiguration is relatively rare (as is most common). .. _replicator_security_overview: -------- Security -------- .. important:: A |zk| quorum enabled with TLS is not supported with Replicator. To run Replicator with a TLS-enabled |zk|, remove any |zk|-related connection details for your Replicator. |crep| supports communication with secure |ak| over SSL for both the source and destination clusters. |crep| also supports SSL or SASL for authentication. Differing security configurations can be used on the source and destination clusters. All properties documented here are additive (i.e. you can apply both SSL Encryption and SASL Plain authentication properties) except for ``security.protocol``. The following table can be used to determine the correct value for this: ========== ============== ================= Encryption Authentication security.protocol ========== ============== ================= SSL None SSL SSL SSL SSL SSL SASL SASL_SSL Plaintext SASL SASL_PLAINTEXT ========== ============== ================= You can configure |crep| connections to source and destination |ak| with: - :ref:`SSL Encryption `. You can use different SSL configurations on the source and destination clusters. - :ref:`SSL Authentication ` - :ref:`SASL/SCRAM ` - :ref:`SASL/GSSAPI ` - :ref:`SASL/PLAIN ` You can configure |zk| by passing the name of its JAAS file as a JVM parameter when starting: .. code:: bash export KAFKA_OPTS="-Djava.security.auth.login.config=etc/kafka/zookeeper_jaas.conf" bin/zookeeper-server-start etc/kafka/zookeeper.properties .. important:: The source and destination |zk| must be secured with the same credentials. To configure security on the source cluster, see the connector configurations for :ref:`source_security_config`. To configure security on the destination cluster, see the connector configurations :ref:`here ` and the general security configuration for Connect workers :ref:`here `. .. seealso:: To see the required security configuration parameters for |crep| consolidated in one place, try out the docker-compose environments in :devx-examples:`GitHub confluentinc/examples|replicator-security/README.md`. When using SASL or SSL authentication and ACL is enabled on source or destination or both, |crep| requires the following ACLs: For license management: =================================================================================== ========================================= ================ Cluster Resource Operation =================================================================================== ========================================= ================ Destination (or other cluster configured with `confluent.topic.bootstrap.servers`) TOPIC - _confluent-command All =================================================================================== ========================================= ================ Commands to execute to configure the above ACLs: .. code:: bash kafka-acls --bootstrap-server localhost:9092 --command-config adminclient-configs.conf --add --allow-principal User: --operation ALL --topic _confluent-command To read from the source cluster: ============ ========================================= ================ Cluster Resource Operation ============ ========================================= ================ Source CLUSTER Describe Source TOPIC - all topics |crep| will replicate Describe Source TOPIC - all topics |crep| will replicate Read ============ ========================================= ================ Commands to execute to configure the above ACLs: .. code:: bash kafka-acls --bootstrap-server localhost:9092 --command-config adminclient-configs.conf --add --allow-principal User: --operation DESCRIBE --cluster kafka-acls --bootstrap-server localhost:9092 --command-config adminclient-configs.conf --add --allow-principal User: --operation DESCRIBE --topic kafka-acls --bootstrap-server localhost:9092 --command-config adminclient-configs.conf --add --allow-principal User: --operation READ --topic To write to the destination cluster: ============ ========================================= ================ Cluster Resource Operation ============ ========================================= ================ Destination CLUSTER Describe Destination TOPIC - all topics |crep| will replicate Describe Destination TOPIC - all topics |crep| will replicate Write ============ ========================================= ================ Commands to execute to configure the above ACLs: .. code:: bash kafka-acls --bootstrap-server localhost:9092 --command-config adminclient-configs.conf --add --allow-principal User: --operation DESCRIBE --cluster kafka-acls --bootstrap-server localhost:9092 --command-config adminclient-configs.conf --add --allow-principal User: --operation DESCRIBE --topic kafka-acls --bootstrap-server localhost:9092 --command-config adminclient-configs.conf --add --allow-principal User: --operation WRITE --topic If using the topic creation and config sync features of |crep| (enabled by default): ============ ========================================= ================ Cluster Resource Operation ============ ========================================= ================ Source TOPIC - all topics |crep| will replicate DescribeConfigs Destination CLUSTER Create Destination TOPIC - all topics |crep| will replicate DescribeConfigs ============ ========================================= ================ for configuration options relating to topic creation and config sync see :ref:`rep-destination-topics`. Commands to execute to configure the above ACLs: .. code:: bash kafka-acls --bootstrap-server localhost:9092 --command-config adminclient-configs.conf --add --allow-principal User: --operation DESCRIBECONFIGS --topic kafka-acls --bootstrap-server localhost:9092 --command-config adminclient-configs.conf --add --allow-principal User: --operation DESCRIBECONFIGS --topic kafka-acls --bootstrap-server localhost:9092 --command-config adminclient-configs.conf --add --allow-principal User: --operation CREATE --cluster If using the offset translation feature of |crep| (enabled by default): ============ =================================================== ================ Cluster Resource Operation ============ =================================================== ================ Source TOPIC - __consumer_timestamps All Destination GROUP - All consumer groups that will be translated All ============ =================================================== ================ for configuration options relating to offset translation see :ref:`consumer_offset_translation`. Commands to execute to configure the above ACLs: .. code:: bash kafka-acls --bootstrap-server localhost:9092 --command-config adminclient-configs.conf --add --allow-principal User: --operation ALL --topic {__consumer_timestamps} .. important:: Any clients instrumented with the |crep| timestamp interceptor must also have the following ACLs provided: ============ ========================================= ================ Cluster Resource Operation ============ ========================================= ================ Source TOPIC - __consumer_timestamps Write Source TOPIC - __consumer_timestamps Describe ============ ========================================= ================ Commands to execute to configure the above ACLs: .. code:: bash kafka-acls --bootstrap-server localhost:9092 --command-config adminclient-configs.conf --add --allow-principal User: --operation WRITE --topic __consumer_timestamps kafka-acls --bootstrap-server localhost:9092 --command-config adminclient-configs.conf --add --allow-principal User: --operation DESCRIBE --topic __consumer_timestamps If using the source offset management feature of |crep| (enabled by default): ============ ================================================================================================================ ================ Cluster Resource Operation ============ ================================================================================================================ ================ Source GROUP - The consumer group name is determined by the |crep| name or by the `src.consumer.group.id` property All ============ ================================================================================================================ ================ for configuration options relating to offset management see :ref:`replicator_offset_management`. Commands to execute to configure the above ACLs: .. code:: bash kafka-acls --bootstrap-server localhost:9092 --command-config adminclient-configs.conf --add --allow-principal User: --operation ALL --group For more information on configuring ACLs, see :ref:`kafka_authorization`. ------------ Requirements ------------ From a high level, |crep| works like a consumer group with the partitions of the replicated topics from the source cluster divided between the connector's tasks. |crep| periodically polls the source cluster for changes to the configuration of replicated topics and the number of partitions, and updates the destination cluster accordingly by creating topics or updating configuration. For this to work correctly, the following is required: * The Origin and destination clusters must be |ak-tm| or |cp|. For version compatibility see :ref:`connector interoperability` * The |crep| version must match the |kconnect-long| version it is deployed on. For instance |crep| |version| should only be deployed to |kconnect-long| |version|. * The |crep| principal must have permission to create and modify topics in the destination cluster. In version 4.0 or lower this requires write access to the corresponding |zk|. In later versions this requires the Acls mentioned in :ref:`here ` * The default topic configurations in the source and destination clusters must match. In general, aside from any broker-specific settings (such as ``broker.id``), you should use the same broker configuration in both clusters. * The destination |ak| cluster must have a similar capacity as the source cluster. In particular, since |crep| will preserve the replication factor of topics in the source cluster, which means that there must be at least as many brokers as the maximum replication factor used. If not, topic creation will fail until the destination cluster has the capacity to support the same replication factor. Note in this case, that topic creation will be retried automatically by the connector, so replication will begin as soon as the destination cluster has enough brokers. * The ``dest.kafka.bootstrap.servers`` destination connection setting in the |crep| properties file must be configured to use a single destination cluster, even when using multiple source clusters. For example, the figure shown at the start of this section shows two source clusters in different datacenters targeting a single *aggregate* destination cluster. Note that the aggregate destination cluster must have a similar capacity as the total of all associated source clusters. * On |cp| versions 5.3.0 and later, |crep-full| requires the enterprise edition of :ref:`kafka_connect`. Starting with |cp| 5.3.0, |crep| does not support the community edition of |kconnect|. You can install the enterprise edition of |kconnect| as part of the |cp| on-premises bundle, as described in :ref:`on-prem-production` and in the :ref:`ce-quickstart` (choose Self-managed |cp|). Demos of enterprise |kconnect| are available at :ref:`ce-docker-quickstart` and on Docker Hub at `confluentinc/cp-server-connect `__. .. tip:: For best performance, run |crep| as close to the destination cluster as possible to provide a low latency connection for |kconnect-long| operations within |crep|. ------------- Compatibility ------------- .. include:: includes/replicator-compatibility.rst ---------------- |crep| Connector ---------------- |crep| is implemented as a |ak| connector. For basic information on the connector and additional use cases beyond multi-datacenter, see :ref:`connect_replicator` in :ref:`connect_bundled_connectors`. .. include:: ../../connect/includes/connector-native-install-cpe.rst |mmaker| -------- |mmaker| is a stand-alone tool for copying data between two |ak| clusters. To learn more, see `Mirroring data between clusters `_ in the |ak| documentation |crep-full| is a more complete solution that handles topic configuration and data, and integrates with |kconnect-long| and |c3| to improve availability, scalability and ease of use. To learn more, try out the Quick Start :ref:`replicator_quickstart` and see :ref:`migrate-replicator`. Suggested Reading ----------------- - Blog post: `15 Things Every Apache Kafka Engineer Should Know About Confluent Replicator `__ - Blog post: `Enterprise Streaming Multi-Datacenter Replication using Apache Kafka `__ - Whitepaper: `Disaster Recovery for Multi-Datacenter Apache Kafka Deployments `__ - :ref:`connect_replicator_config_options` reference for |crep-full| - `Migrate Topics on Confluent Cloud Clusters `__ describes how to use |crep| to migrate a self-managed |sr| to |ccloud| .. toctree:: :maxdepth: 3 :titlesonly: :hidden: replicator-quickstart Download and Install Configure and Run Tuning Replicator Monitoring Replicator replicator-failover Configuration Options <../../connect/kafka-connect-replicator/configuration_options> migrate-replicator