.. _replicator_detail: Replicate Multi-Datacenter Topics Across |ak| Clusters in |cp| ============================================================== |crep-full| allows you to easily and reliably replicate topics from one |ak| cluster to another. In addition to copying the messages, |crep| will create topics as needed preserving the topic configuration in the source cluster. This includes preserving the number of partitions, the replication factor, and any configuration overrides specified for individual topics. |crep| is implemented as a connector. .. important:: For most use cases, :ref:`Cluster Linking ` and :ref:`Schema Linking ` are now recommended over |crep|. Features -------- |crep| supports the following features: * Topic selection using whitelists, blacklists, and regular expressions. * Dynamic topic creation in the destination cluster with matching partition counts, replication factors, and topic configuration overrides. * Automatic resizing of topics when new partitions are added in the source cluster. * Automatic reconfiguration of topics when topic configuration changes in the source cluster. * :ref:`Timestamp Preservation `, :ref:`Using Provenance Headers to Prevent Duplicates or Cyclic Message Repetition `, and :ref:`Consumer Offset Translation ` (supported on |cp| 5.0.1 and later). * You can :ref:`migrate from MirrorMaker to Replicator ` on existing datacenters (|cp| 5.0.0 and later). Migration from |mmaker| to |crep| is not supported in earlier versions of |cp| (pre 5.5.0). * At least once delivery, meaning the |crep| connector guarantees that records are delivered at least once to the |ak| topic. If the connector restarts, there may be some duplicate records in the |ak| topic. Multi-Datacenter Use Cases -------------------------- |crep| can be deployed across clusters and in multiple datacenters. Multi-datacenter deployments enable use-cases such as: * Active-active geo-localized deployments: allows users to access a near-by datacenter to optimize their architecture for low latency and high performance * Active-passive disaster recover (DR) deployments: in an event of a partial or complete datacenter disaster, allow failing over applications to use |cp| in a different datacenter. * Centralized analytics: Aggregate data from multiple |ak-tm| clusters into one location for organization-wide analytics * Cloud migration: Use |ak| to synchronize data between on-prem applications and cloud deployments Replication of events in |ak| topics from one cluster to another is the foundation of Confluent's multi datacenter architecture. Replication can be done with |crep-full| or using the open source `Kafka MirrorMaker `_. |crep| can be used for replication of topic data as well as :ref:`migrating schemas ` in |sr|. This documentation focuses on |crep|, including :ref:`architecture `, :ref:`quick start tutorial `, how to :ref:`configure and run ` Replicator in different contexts, :ref:`tuning and monitoring `, :ref:`cross-cluster failover `, and more. A section on how to :ref:`migrate from MirrorMaker to Replicator ` is also included. Some of the general thinking on deployment strategies can also apply to |mmaker|, but if you are primarily interested in |mmaker|, see `Mirroring data between clusters `_ in the |ak| documentation. .. _replicator_architecture: Architecture ------------ The diagram below shows the |crep| architecture. |crep| uses the |kconnect-long| APIs and Workers to provide high availability, load-balancing and centralized management. .. figure:: replicator_components.png :align: center Replicator Architecture .. tip:: You can deploy |crep| near the destination cluster or the origin cluster, and it will work either way. However, a best practice is to deploy |crep| closer to the destination cluster for reliability and performance over networks. Therefore, if the destination cluster is :cloud:`Confluent Cloud|index.html`, we recommend that you deploy |crep| on an instance in the same region as your Confluent Cloud cluster. However, if the origin cluster does not permit external connections, you may deploy |crep| in the origin cluster. (See also :cloud:`Migrate Topics on Confluent Cloud Clusters|migrate-topics-on-cloud-clusters.html`.) Example Deployment ------------------ In a typical multi-datacenter deployment, data from two geographically distributed |ak| clusters located in separate datacenters is aggregated in a separate cluster located in another datacenter. The origin of the copied data is referred to as the "source" cluster while the target of the copied data is referred to as the "destination." Each source cluster requires a separate instance of |crep|. For convenience you can run them in the same Connect cluster, located in the aggregate datacenter. .. figure:: replicator.png :align: center :scale: 75 % Replication to an Aggregate Cluster Guidelines for Getting Started ------------------------------ Follow these guidelines to configure a multi-datacenter deployment using |crep|: 1. Use the :ref:`Replicator quick start` to set up replication between two |ak| clusters. 2. Learn how to :ref:`install and configure` |crep| and other |cp| components in multi datacenter environments. 3. Before running |crep| in production, make sure you read the :ref:`monitoring and tuning guide`. 4. For a practical guide to designing and configuring multiple |ak| clusters to be resilient in case of a disaster scenario, see the `Disaster Recovery `_ white paper. This white paper provides a plan for failover, failback, and ultimately successful recovery. .. _mdc_replicator_demos: Demos and Examples ------------------ After completing the :ref:`Replicator quick start`, explore these hands-on working examples of |crep| in multi-datacenter deployments, for which you can download the demo from GitHub and run yourself. Refer to the diagram below to determine the |crep| examples that correspond to your deployment scenario. .. figure:: images/replicator-demos.png 1. |ak| on-premises to |ak| on-premises - :ref:`replicator`: fully-automated example of an active-active multi-datacenter design with two instances of |crep| copying data bidirectionally between the datacenters - :ref:`Schema translation`: showcases the transfer of schemas stored in |sr| from one cluster to another using |crep| - :ref:`Confluent Platform demo`: deploy a |ak| streaming ETL, along with |crep| to replicate data 2. |ak| on-premises to |ccloud| - :ref:`Hybrid On-premises and Confluent Cloud`: on-premises |ak| cluster and |ccloud| cluster, and data copied between them with |crep| - :cloud:`Connect Cluster Backed to Destination|get-started/examples/ccloud/docs/replicator-to-cloud-configuration-types.html`: |crep| configuration with |kconnect-long| backed to destination cluster - :cloud:`On-premises to Cloud with Connect Backed to Origin|get-started/examples/ccloud/docs/replicator-to-cloud-configuration-types.html#onprem-cloud-origin`: |crep| configuration with |kconnect-long| backed to origin cluster 3. |ccloud| to |ccloud| - :cloud:`Cloud to Cloud with Connect Backed to Destination|get-started/examples/ccloud/docs/replicator-to-cloud-configuration-types.html#cloud-cloud-destination`: |crep| configuration with |kconnect-long| backed to destination cluster - :cloud:`Cloud to Cloud with Connect Backed to Origin|get-started/examples/ccloud/docs/replicator-to-cloud-configuration-types.html#cloud-cloud-origin`: |crep| configuration with |kconnect-long| backed to origin cluster - :cloud:`Migrate Topics on Confluent Cloud Clusters|clusters/migrate-topics-on-cloud-clusters.html`: migrate topics from the origin |ccloud| cluster to the destination |ccloud| cluster .. _replicator_topic_renaming: Topic Renaming -------------- By default, the replicator is configured to use the same topic name in both the source and destination clusters. This works fine if you are only replicating from a single cluster. When copying data from multiple clusters to a single destination (i.e. the aggregate use case), you should use a separate topic for each source cluster in case there are any configuration differences between the topics in the source clusters. It is possible to use the same |ak| cluster as the source and destination as long as you ensure that the replicated topic name is different. This is not a recommended pattern since generally you should prefer |ak|'s built-in replication within the same cluster, but it may be useful in some cases (e.g. testing). Starting with |cp| 5.0, |crep| protects against circular replication through the use of :ref:`provenance headers `. This guarantees that if two |crep| instances are configured to run, one replicating from DC1 to DC2 and the second instance configured to replicate from DC2 to DC1, |crep| will ensure that messages replicated to DC2 are not replicated back to DC1, and vice versa. As a result, Replicator safely runs in each direction. Although |crep| can enable applications in different datacenters to access topics with the same names, you should design client applications with a topic naming strategy that takes into consideration a number of factors. If you plan to have the same topic name span datacenters, be aware that in this configuration: - Producers do not wait for commit acknowledgment from the remote cluster, and |crep| asynchronously copies the data between datacenters after it has been committed locally. - If there are producers in each datacenter writing to topics of the same name, there is no "global ordering". This means there are no message ordering guarantees for data that originated from producers in different datacenters. - If there are consumer groups in each datacenter with the same group ID reading from topics of the same name, in steady state, they will be reprocessing the same messages in each datacenter. In some cases, you may not want to use the same topic name in each datacenter. For example, in cases where: - Replicator is running a version less than 5.0.1 - Kafka brokers are running a version prior to Kafka 0.11 that does not yet support message headers - Kafka brokers are running Kafka version 0.11 or later but have less than the minimum required ``log.message.format.version=2.0`` for using headers - Client applications are not designed to handle topics with the same name across datacenters In these cases, refer to the appendix on "Topic Naming Strategies to Prevent Cyclic Repetition" in the `Disaster Recovery `_ white paper. Periodic Metadata Updates ------------------------- The replicator periodically checks topics in the source cluster to tell whether there are any new topics which need to be replicated, and whether there are any configuration changes (e.g. increases in the number of partitions). The frequency of this checking is controlled with the ``metadata.max.age.ms`` setting in the connector configuration. The default is set to 2 minutes, which is intended to provide reasonable responsiveness to configuration changes while ensuring that the connector does not add any unnecessary load on the source cluster. You can lower this setting to detect changes quicker, but it's probably not advisable as long as topic creation/reconfiguration is relatively rare (as is most common). .. _replicator_security_overview: Security and ACL Configurations ------------------------------- .. _replicator_acls: ------------- ACLs Overview ------------- .. important:: A |zk| quorum enabled with TLS is not supported with Replicator. To run Replicator with a TLS-enabled |zk|, remove any |zk|-related connection details for your Replicator. |crep| supports communication with secure |ak| over TLS/SSL for both the source and destination clusters. |crep| also supports TLS/SSL or SASL for authentication. Differing security configurations can be used on the source and destination clusters. All properties documented here are additive (i.e. you can apply both TLS/SSL Encryption and SASL Plain authentication properties) except for ``security.protocol``. The following table can be used to determine the correct value for this: ========== ============== ================= Encryption Authentication security.protocol ========== ============== ================= TLS/SSL None SSL TLS/SSL TLS/SSL SSL TLS/SSL SASL SASL_SSL Plaintext SASL SASL_PLAINTEXT ========== ============== ================= You can configure |crep| connections to source and destination |ak| with: - :ref:`TLS/SSL Encryption `. You can use different TLS/SSL configurations on the source and destination clusters. - :ref:`SSL Authentication ` - :ref:`SASL/SCRAM ` - :ref:`SASL/GSSAPI ` - :ref:`SASL/PLAIN ` You can configure |zk| by passing the name of its JAAS file as a JVM parameter when starting: .. code:: bash export KAFKA_OPTS="-Djava.security.auth.login.config=etc/kafka/zookeeper_jaas.conf" bin/zookeeper-server-start etc/kafka/zookeeper.properties .. important:: The source and destination |zk| must be secured with the same credentials. To configure security on the source cluster, see the connector configurations for :ref:`source_security_config`. To configure security on the destination cluster, see the connector configurations :ref:`destination_security_config` and the general security configuration for Connect workers :ref:`here `. .. seealso:: To see the required security configuration parameters for |crep| consolidated in one place, try out the docker-compose environments in :devx-examples:`GitHub confluentinc/examples|replicator-security/README.md`. When using SASL or TLS/SSL authentication and ACL is enabled on source or destination or both, |crep| requires the ACLs described in the following sections. ACL commands are provided for both |cp| (:confluent-cli:`Confluent Platform CLI Command Reference|command-reference/kafka/acl/index.html`) and :cloud:`Confluent Cloud|overview.html`. For more information on configuring ACLs, see :ref:`kafka_authorization`. .. _crep-security-service-accounts: ------------------------------------------------------ Principal Users (|cp|) and Service Accounts (|ccloud|) ------------------------------------------------------ Commands to configure ACLs are given here for both |cp| and |ccloud|. On |cp|, associate ACLs with a :ref:`service principal `. On |ccloud|, associate ACLs with a :cloud:`Confluent Cloud service account|access-management/service-account.html#service-accounts`. To create a service account for |ccloud|, run the following the command: .. code:: bash confluent iam service-account create --description "" For example: .. code:: bash confluent iam service-account create my-first-cluster-test-acls --description "test ACLs on Cloud" +-------------+----------------------------+ | Id | 123456 | | Resource ID | ab-123abc | | Name | my-first-cluster-test-acls | | Description | test ACLs on Cloud | +-------------+----------------------------+ Save the service account ID to use in the following commands to create the ACLs. .. tip:: You can retrieve service account IDs by listing them with ``confluent iam service-account list``. .. _crep-acls-license-management: --------------------------- ACLs for License Management --------------------------- For license management, you need the following ACLs: ===================================================================================== ========================================= ================ Cluster Resource Operation ===================================================================================== ========================================= ================ Destination (or other cluster configured with ``confluent.topic.bootstrap.servers``) TOPIC - _confluent-command All ===================================================================================== ========================================= ================ .. tabs:: .. group-tab:: |cp| Commands to configure the above ACLs on |cp|: .. code:: bash kafka-acls --bootstrap-server localhost:9092 --command-config adminclient-configs.conf --add --allow-principal User: --operation ALL --topic _confluent-command .. group-tab:: |ccloud| Commands to configure ACLs for the ``_confluent-command`` license topic on |ccloud|: .. code:: bash confluent kafka acl create --allow --service-account --operations CREATE --topic _confluent-command confluent kafka acl create --allow --service-account --operations WRITE --topic _confluent-command confluent kafka acl create --allow --service-account --operations READ --topic _confluent-command confluent kafka acl create --allow --service-account --operations DESCRIBE --topic _confluent-command confluent kafka acl create --allow --service-account --operations DESCRIBE-CONFIGS --topic _confluent-command confluent kafka acl create --allow --service-account --operations ALTER-CONFIGS --topic _confluent-command .. _crep-acls-read-from-source-cluster: ------------------------------------ ACLs to Read from the Source Cluster ------------------------------------ To read from the source cluster, you need the following ACLs: ============ =============================================================================================================== ================ Cluster Resource Operation ============ =============================================================================================================== ================ Source CLUSTER Describe Source TOPIC - all topics |crep| will replicate Describe Source TOPIC - all topics |crep| will replicate Read Source GROUP - The consumer group name is determined by the |crep| name or by the ``src.consumer.group.id`` property Read ============ =============================================================================================================== ================ .. tabs:: .. group-tab:: |cp| Commands to configure the above ACLs on |cp|: .. code:: bash kafka-acls --bootstrap-server localhost:9092 --command-config adminclient-configs.conf --add --allow-principal User: --operation DESCRIBE --cluster kafka-acls --bootstrap-server localhost:9092 --command-config adminclient-configs.conf --add --allow-principal User: --operation DESCRIBE --topic kafka-acls --bootstrap-server localhost:9092 --command-config adminclient-configs.conf --add --allow-principal User: --operation READ --topic .. group-tab:: |ccloud| Commands to configure the above ACLs on |ccloud|: .. code:: bash confluent kafka cluster use confluent kafka acl create --allow --service-account --operations DESCRIBE --cluster-scope confluent kafka acl create --allow --service-account --operations DESCRIBE --cluster confluent kafka acl create --allow --service-account --operations DESCRIBE --topic confluent kafka acl create --allow --service-account --operations READ --topic .. _crep-acls-write-to-destination-cluster: ---------------------------------------- ACLs to Write to the Destination Cluster ---------------------------------------- To write to the destination cluster, you need the following ACLs: ============ ========================================= ================ Cluster Resource Operation ============ ========================================= ================ Destination CLUSTER Describe Destination TOPIC - all topics |crep| will replicate Describe Destination TOPIC - all topics |crep| will replicate Write Destination TOPIC - all topics |crep| will replicate Read ============ ========================================= ================ .. tabs:: .. group-tab:: |cp| Commands to configure the above ACLs on |cp|: .. code:: bash kafka-acls --bootstrap-server localhost:9092 --command-config adminclient-configs.conf --add --allow-principal User: --operation DESCRIBE --cluster kafka-acls --bootstrap-server localhost:9092 --command-config adminclient-configs.conf --add --allow-principal User: --operation DESCRIBE --topic kafka-acls --bootstrap-server localhost:9092 --command-config adminclient-configs.conf --add --allow-principal User: --operation WRITE --topic kafka-acls --bootstrap-server localhost:9092 --command-config adminclient-configs.conf --add --allow-principal User: --operation READ --topic .. group-tab:: |ccloud| Commands to configure the above ACLs on |ccloud|: .. code:: bash confluent kafka cluster use confluent kafka acl create --allow --service-account --operations DESCRIBE --cluster-scope confluent kafka acl create --allow --service-account --operations DESCRIBE --cluster confluent kafka acl create --allow --service-account --operations DESCRIBE --topic confluent kafka acl create --allow --service-account --operations WRITE --topic confluent kafka acl create --allow --service-account --operations READ --topic .. _crep-acls-topic-create-config-sync: --------------------------------------- ACLs for Topic Creation and Config Sync --------------------------------------- If using the topic creation and config sync features of |crep| (enabled by default), you need the following ACLs: ============ ========================================= ================ Cluster Resource Operation ============ ========================================= ================ Source TOPIC - all topics |crep| will replicate DescribeConfigs Destination TOPIC - all topics |crep| will replicate Create Destination TOPIC - all topics |crep| will replicate DescribeConfigs Destination TOPIC - all topics |crep| will replicate AlterConfigs ============ ========================================= ================ For configuration options relating to topic creation and config sync, see :ref:`rep-destination-topics`. .. tabs:: .. group-tab:: |cp| Commands to configure the above ACLs on |cp|: .. code:: bash kafka-acls --bootstrap-server localhost:9092 --command-config adminclient-configs.conf --add --allow-principal User: --operation DESCRIBECONFIGS --topic kafka-acls --bootstrap-server localhost:9092 --command-config adminclient-configs.conf --add --allow-principal User: --operation DESCRIBECONFIGS --topic kafka-acls --bootstrap-server localhost:9092 --command-config adminclient-configs.conf --add --allow-principal User: --operation CREATE --cluster kafka-acls --bootstrap-server localhost:9092 --command-config adminclient-configs.conf --add --allow-principal User: --operation ALTERCONFIGS --cluster .. group-tab:: |ccloud| Commands to configure the above ACLs on |ccloud|: .. code:: bash confluent kafka acl create --allow --service-account --operations DESCRIBECONFIGS --topic confluent kafka acl create --allow --service-account --operations DESCRIBECONFIGS --topic confluent kafka acl create --allow --service-account --operations CREATE --cluster confluent kafka acl create --allow --service-account --operations ALTERCONFIGS --cluster confluent kafka acl create --allow --service-account --operations DESCRIBE --cluster-scope .. _crep-acls-offset-translation: --------------------------- ACLs for Offset Translation --------------------------- If using the offset translation feature of |crep| (enabled by default), you need the following ACLs: ============ =================================================== ================ Cluster Resource Operation ============ =================================================== ================ Source TOPIC - __consumer_timestamps All Destination GROUP - All consumer groups that will be translated All ============ =================================================== ================ For configuration options relating to offset translation, see :ref:`consumer_offset_translation `. .. tabs:: .. group-tab:: |cp| Commands to configure the above ACLs on |cp|: .. code:: bash kafka-acls --bootstrap-server localhost:9092 --command-config adminclient-configs.conf --add --allow-principal User: --operation ALL --topic {_consumer_timestamps} kafka-acls --bootstrap-server localhost:9092 --command-config adminclient-configs.conf --add --allow-principal User: --operation ALL --topic {} .. group-tab:: |ccloud| Commands to configure the above ACLs on |ccloud|: .. code:: bash confluent kafka acl create --allow --service-account --operations CREATE --topic __consumer_timestamps confluent kafka acl create --allow --service-account --operations WRITE --topic __consumer_timestamps confluent kafka acl create --allow --service-account --operations READ --topic __consumer_timestamps confluent kafka acl create --allow --service-account --operations DESCRIBE --topic __consumer_timestamps confluent kafka acl create --allow --service-account --operations DESCRIBE-CONFIGS --topic __consumer_timestamps confluent kafka acl create --allow --service-account --operations ALTER-CONFIGS --topic __consumer_timestamps .. code:: bash confluent kafka acl create --allow --service-account --operations CREATE --consumer-group confluent kafka acl create --allow --service-account --operations WRITE --consumer-group confluent kafka acl create --allow --service-account --operations READ --consumer-group confluent kafka acl create --allow --service-account --operations DESCRIBE --consumer-group confluent kafka acl create --allow --service-account --operations DESCRIBE-CONFIGS --consumer-group confluent kafka acl create --allow --service-account --operations ALTER-CONFIGS --consumer-group .. _crep-acls-timestamp-interceptor: ---------------------------------- ACLs for the Timestamp Interceptor ---------------------------------- Any clients instrumented with the |crep| timestamp interceptor must also have the following ACLs: ============ ========================================= ================ Cluster Resource Operation ============ ========================================= ================ Source TOPIC - __consumer_timestamps Write Source TOPIC - __consumer_timestamps Describe ============ ========================================= ================ .. tabs:: .. group-tab:: |cp| Commands to configure the above on |cp|: .. code:: bash kafka-acls --bootstrap-server localhost:9092 --command-config adminclient-configs.conf --add --allow-principal User: --operation WRITE --topic __consumer_timestamps kafka-acls --bootstrap-server localhost:9092 --command-config adminclient-configs.conf --add --allow-principal User: --operation DESCRIBE --topic __consumer_timestamps .. group-tab:: |ccloud| Commands to configure the above on |ccloud|: .. code:: bash confluent kafka acl create --allow --service-account --operations WRITE --topic __consumer_timestamps confluent kafka acl create --allow --service-account --operations DESCRIBE --topic __consumer_timestamps .. _crep-acls-offset-management: --------------------------------- ACLs for Source Offset Management --------------------------------- If using the source offset management feature of |crep| (enabled by default), you need the following ACLs: ============ ================================================================================================================ ================ Cluster Resource Operation ============ ================================================================================================================ ================ Source GROUP - The consumer group name is determined by the |crep| name or by the ``src.consumer.group.id`` property All ============ ================================================================================================================ ================ For configuration options relating to offset management see :ref:`replicator_offset_management` in the |crep| Configuration Reference. .. tabs:: .. group-tab:: |cp| Commands to configure the above ACLs on |cp|: .. code:: bash kafka-acls --bootstrap-server localhost:9092 --command-config adminclient-configs.conf --add --allow-principal User: --operation ALL --group .. group-tab:: |ccloud| Commands to configure the above ACLs on |ccloud|: .. code:: bash confluent kafka acl create --allow --service-account --operations CREATE --consumer-group confluent kafka acl create --allow --service-account --operations WRITE --consumer-group confluent kafka acl create --allow --service-account --operations READ --consumer-group confluent kafka acl create --allow --service-account --operations DESCRIBE --consumer-group confluent kafka acl create --allow --service-account --operations DESCRIBE-CONFIGS --consumer-group confluent kafka acl create --allow --service-account --operations ALTER-CONFIGS --consumer-group .. _replicator_rbac: |crep| with |rbac| ------------------ When using |rbac|, |crep| clients should use token authentication as described in :ref:`security_sasl_rbac_oauthbearer_clientconfig`. These configurations should be prefixed with the usual |crep| prefixes of ``src.kafka.`` and ``dest.kafka.``. An example configuration for source and destination cluster that are |rbac| enabled is below: .. code:: bash src.kafka.sasl.jaas.config=org.apache.kafka.common.security.oauthbearer.OAuthBearerLoginModule required \ username="sourceUser \ password="xxx" \ metadataServerUrls="http://sourceHost:8090"; src.kafka.security.protocol=SASL_PLAINTEXT src.kafka.sasl.mechanism=OAUTHBEARER src.kafka.sasl.login.callback.handler.class=io.confluent.kafka.clients.plugins.auth.token.TokenUserLoginCallbackHandler dest.kafka.sasl.jaas.config=org.apache.kafka.common.security.oauthbearer.OAuthBearerLoginModule required \ username="destUser \ password="xxx" \ metadataServerUrls="http://destHost:8090"; dest.kafka.security.protocol=SASL_PLAINTEXT dest.kafka.sasl.mechanism=OAUTHBEARER dest.kafka.sasl.login.callback.handler.class=io.confluent.kafka.clients.plugins.auth.token.TokenUserLoginCallbackHandler for |crep| executable these should not be prefixed and should be placed in files referred to by ``--consumer.config`` and ``--producer.config`` as shown below: .. code:: bash # in --consumer.config sasl.jaas.config=org.apache.kafka.common.security.oauthbearer.OAuthBearerLoginModule required \ username="sourceUser \ password="xxx" \ metadataServerUrls="http://sourceHost:8090"; security.protocol=SASL_PLAINTEXT sasl.mechanism=OAUTHBEARER sasl.login.callback.handler.class=io.confluent.kafka.clients.plugins.auth.token.TokenUserLoginCallbackHandler # in --producer.config sasl.jaas.config=org.apache.kafka.common.security.oauthbearer.OAuthBearerLoginModule required \ username="destUser \ password="xxx" \ metadataServerUrls="http://destHost:8090"; security.protocol=SASL_PLAINTEXT sasl.mechanism=OAUTHBEARER sasl.login.callback.handler.class=io.confluent.kafka.clients.plugins.auth.token.TokenUserLoginCallbackHandler The configuration above requires that the Metadata Service (MDS) is running. For more information on this see :ref:`rbac-mds-config` .. important:: The rolebindings below are based on the following assumptions: - There are two “independent” |rbac| deployments (one in each DC) - Each |rbac| deployment supports all other services local to it (|kconnect|, |sr|, etc.) - |rbac| rolebindings will not be replicated (|crep| does not support this) |crep| requires the rolebindings listed below. For the backing |kconnect| cluster: ============ ======================================================================================= ================ Cluster Resource Role ============ ======================================================================================= ================ Destination GROUP - name given by the ``group.id`` property ResourceOwner Destination TOPIC - name given by the ``offset.storage.topic`` property (default connect-offsets) ResourceOwner Destination TOPIC - name given by the ``config.storage.topic`` property (default connect-configs) ResourceOwner Destination TOPIC - name given by the ``status.storage.topic`` property (default connect-status) ResourceOwner ============ ======================================================================================= ================ For license management: ===================================================================================== ========================================= ================ Cluster Resource Role ===================================================================================== ========================================= ================ Destination (or other cluster configured with ``confluent.topic.bootstrap.servers``) TOPIC - ``_confluent-command`` DeveloperRead Destination (or other cluster configured with ``confluent.topic.bootstrap.servers``) TOPIC - ``_confluent-command`` DeveloperWrite Destination (or other cluster configured with ``confluent.topic.bootstrap.servers``) TOPIC - ``_confluent-command`` DeveloperManage ===================================================================================== ========================================= ================ To read from the source cluster: ============ ============================================== ================ Cluster Resource Role ============ ============================================== ================ Source TOPIC - all topics that |crep| will replicate DeveloperRead Source TOPIC - all topics that |crep| will replicate DeveloperManage ============ ============================================== ================ To write to the destination cluster: ============ ============================================== ================ Cluster Resource Role ============ ============================================== ================ Destination TOPIC - all topics that |crep| will replicate ResourceOwner ============ ============================================== ================ .. important:: If not using the topic configuration sync feature of |crep| (enabled by default) then the following roles can be used in place of ``ResourceOwner``: ============ ============================================== ================ Cluster Resource Role ============ ============================================== ================ Destination TOPIC - all topics that |crep| will replicate DeveloperRead Destination TOPIC - all topics that |crep| will replicate DeveloperWrite Destination TOPIC - all topics that |crep| will replicate DeveloperManage ============ ============================================== ================ If using the offset translation feature of |crep| (enabled by default): ============ ================================================================================================================= ================ Cluster Resource Role ============ ================================================================================================================= ================ Source TOPIC - ``__consumer_timestamps`` DeveloperRead Source TOPIC - ``__consumer_timestamps`` DeveloperManage Destination GROUP - All consumer groups that will be translated (if you do not know these use the literal ``*`` to allow all) DeveloperRead ============ ================================================================================================================= ================ Also, any consumers on the source cluster using the |crep| timestamp interceptor will require: ============ =============================================================================================================== ================ Cluster Resource Role ============ =============================================================================================================== ================ Source TOPIC - ``__consumer_timestamps`` DeveloperWrite Source TOPIC - ``__consumer_timestamps`` DeveloperManage ============ =============================================================================================================== ================ For configuration options relating to offset translation see :ref:`Consumer Offset Translation `. If using the source offset management feature of |crep| (enabled by default): ============ ================================================================================================================== ================ Cluster Resource Role ============ ================================================================================================================== ================ Source GROUP - The consumer group name is determined by the |crep| name or by the ``src.consumer.group.id`` property ResourceOwner ============ ================================================================================================================== ================ If using the schema migration feature of |crep| (disabled by default): ============ ================================================================================================================ ================ Cluster Resource Role ============ ================================================================================================================ ================ Source TOPIC - underlying |sr| topic (default _schemas) DeveloperRead Destination CLUSTER - |sr| cluster ClusterAdmin ============ ================================================================================================================ ================ For more information on configuring |rbac|, see :ref:`rbac-overview`. Replicating messages with schemas --------------------------------- |crep| does not support an "active-active" |sr| setup. It only supports migration (either one-time or continuous) from an active |sr| to a passive |sr|. Starting with |cp| 7.0.0, Schema Linking is available in preview on |cp|, as described in :ref:`schema-linking-cp-overview`. **This is the recommended method of migrating schemas.** For migrating schemas from one |ccloud| cluster to another, use cloud specific :cloud:`Schema Linking|sr/schema-linking.html`. For pre |cp| 7.0.0 releases, use :ref:`Replicator ` with `Schema Translation `__ to migrate schemas from a self-managed cluster to a target cluster which is either self-managed or in :cloud:`Confluent Cloud|index.html`. (This was first available in |cp| 5.2.0.) To learn more about schema migration, see :ref:`schemaregistry_migrate` and :ref:`schema-linking-cp-overview`. .. _replicator-and-schema-validation: |sv| and |crep| --------------- By default, |crep| is configured with ``topic.config.sync=true``. If the source cluster has a topic with :ref:`schema_validation` enabled (``confluent.value.schema.validation=true``), then |crep| will copy this property to the destination cluster's replicated topic. When using |crep| to replicate data from one cluster of brokers to another, you would typically want to avoid another validation on the secondary cluster to skip the overhead of doing so. Therefore, you might want to either disable |sv| on the source cluster before replicating to the destination, or set ``topic.config.sync=false`` on |crep| and explicitly set the configurations you want on the destination cluster broker properties files. Requirements ------------ From a high level, |crep| works like a consumer group with the partitions of the replicated topics from the source cluster divided between the connector's tasks. |crep| periodically polls the source cluster for changes to the configuration of replicated topics and the number of partitions, and updates the destination cluster accordingly by creating topics or updating configuration. For this to work correctly, the following is required: * The Origin and Destination clusters must be |ak-tm| or |cp|. For version compatibility see :ref:`connector interoperability` * The |crep| version must match the |kconnect-long| version it is deployed on. For instance |crep| |version| should only be deployed to |kconnect-long| |version|. * The |crep| principal must have permission to create and modify topics in the destination cluster. In version 4.0 or lower this requires write access to the corresponding |zk|. In later versions this requires the ACLs mentioned in :ref:`here ` * The default topic configurations in the source and destination clusters must match. In general, aside from any broker-specific settings (such as ``broker.id``), you should use the same broker configuration in both clusters. * The destination |ak| cluster must have a similar capacity as the source cluster. In particular, since |crep| will preserve the replication factor of topics in the source cluster, which means that there must be at least as many brokers as the maximum replication factor used. If not, topic creation will fail until the destination cluster has the capacity to support the same replication factor. Note in this case, that topic creation will be retried automatically by the connector, so replication will begin as soon as the destination cluster has enough brokers. * The ``dest.kafka.bootstrap.servers`` destination connection setting in the |crep| properties file must be configured to use a single destination cluster, even when using multiple source clusters. For example, the figure shown at the start of this section shows two source clusters in different datacenters targeting a single *aggregate* destination cluster. Note that the aggregate destination cluster must have a similar capacity as the total of all associated source clusters. * On |cp| versions 5.3.0 and later, |crep-full| requires the enterprise edition of :ref:`kafka_connect`. Starting with |cp| 5.3.0, |crep| does not support the community edition of |kconnect|. You can install the enterprise edition of |kconnect| as part of the |cp| on-premises bundle, as described in :ref:`on-prem-production` and in the :ref:`quickstart` (choose Self-managed |cp|). Demos of enterprise |kconnect| are available at :ref:`quickstart` and on Docker Hub at `confluentinc/cp-server-connect `__. * The ``timestamp-interceptor`` for consumers supports only Java clients, as described in :ref:`configuring-the-consumer-for-failover`. .. tip:: For best performance, run |crep| as close to the destination cluster as possible to provide a low latency connection for |kconnect-long| operations within |crep|. Compatibility ------------- .. include:: includes/replicator-compatibility.rst .. include:: includes/kafka-replicator-ccloud-compat.rst Known Issues and Limitations ---------------------------- * |crep| does not support transforms. Use of the :ref:`ByteArrayConverter ` is required, as demonstrated in the :ref:`tutorial `. * |crep| should not be used for serialization changes. In these cases, the recommended method is to use |ksqldb|. To learn more, see the documentation on :ref:`ksqlDB ` and the tutorial on `How to convert a stream's serialization format `__ on the Confluent Developer site. * If you have any consumer instrumented with the ``ConsumerTimestampsInterceptor`` in versions 5.0.4, 5.1.4, 5.2.3, 5.3.1, or 5.4.0 and above, be sure that your |crep| is also running in one of those versions. There is a known issue where if the |crep| is at a version lower than those mentioned above, tasks can fail with a ``SerializationException`` with the error ``Size of data received by LongDeserializer is not 8``. To learn more about timestamp preservation, see :ref:`configuring-the-consumer-for-failover`. * When running |crep| with version 5.3.0 or above, set ``connect.protocol=eager`` as there is a known issue where using the default of ``connect.protocol=compatible`` or ``connect.protocol=sessioned`` can cause issues with tasks rebalancing and duplicate records. * If you encounter ``RecordTooLargeException`` when you use compressed records, set the record batch size for the |crep| producer to the highest possible value. When |crep| decompresses records while consuming from the source cluster, it checks the size of the uncompressed batch on the producer before recompressing them and may throw ``RecordTooLargeException``. Setting the record batch size mitigates the exception, and compression proceeds as expected when the record is sent to the destination cluster. * The |crep| latency metric is calculated by subtracting the time the record was produced to the source from the time it was replicated on the destination. This works in the real time case, when there is active production going on in the source cluster and the calculation we are doing is in real time. However if you are replicating old data, you will see very large latency due to the old record timestamps. In the historical data case, the latency does not indicate how long |crep| is taking to replicate data. It indicates how much time has passed between the original message and now for the message that Replicator is currently replicating. As |crep| proceeds over historical data, the latency metric should decrease quickly. * There's an issue with the Replicator lag metric where the value ``NaN`` is reported if there has not been a sample of lag being reported in a given time window. This can happen if you have limited production in the source cluster or if Replicator is not flushing data fast enough to the destination cluster, thus causing it to not be able to record enough samples in the given time window. This will cause the JMX metrics to report ``NaN`` for the Replicator metrics. ``NaN`` may not necssarily mean that the lag is 0; it means that there aren't enough samples in the given time window to report lag. |crep| Connector ---------------- |crep| is implemented as a |ak| connector, and listed in :connect-common:`Supported Connectors|supported.html`. Some general information related to connectors may apply in some cases, but most of the information you'll need to work with |crep| is in this |crep| specific documentation. .. include:: ../../connect/includes/connector-native-install-cpe.rst |mmaker| -------- |mmaker| is a stand-alone tool for copying data between two |ak| clusters. To learn more, see `Mirroring data between clusters `_ in the |ak| documentation. |mmaker| 2 is supported as a stand-alone executable, but is not supported as a connector. |crep-full| is a more complete solution that handles topic configuration and data, and integrates with |kconnect-long| and |c3| to improve availability, scalability and ease of use. To learn more, try out the Quick Start :ref:`replicator_quickstart` and see :ref:`migrate-replicator`. Related content --------------- -------------------------- Blog Posts and Whitepapers -------------------------- - Blog post: `15 Things Every Apache Kafka Engineer Should Know About Confluent Replicator `__ - Blog post: `Enterprise Streaming Multi-Datacenter Replication using Apache Kafka `__ - Whitepaper: `Disaster Recovery for Multi-Datacenter Apache Kafka Deployments `__ ----------- |ak| Basics ----------- - `Kafka 101: Producers `__ - `Kafka 101: Consumers `__ - `Kafka 101: Replication `__ ---------------------- Getting Started Guides ---------------------- - :ref:`replicator_quickstart` shows you how to get started using |crep| - :ref:`replicator` provides a Docker based quick start ---------- References ---------- - :ref:`replicator_config_options` provides a full reference for configuration - :confluent-cli:`Confluent Platform and Cloud CLI Command Reference|command-reference/kafka/acl/index.html` -------- Security -------- - :ref:`kafka_authorization` - :ref:`acl-operations` - :cloud:`Manage Confluent Cloud Accounts and Access|access-management/index.html` - :cloud:`Service Accounts for Confluent Cloud|access-management/service-account.html#service-accounts` ---------------- Migration Guides ---------------- - :cloud:`Migrate Topics on Confluent Cloud Clusters|migrate-topics-on-cloud-clusters.html` describes how to use |crep| to migrate topic data from one cloud cluster to another. - :ref:`schemaregistry_migrate` describes how to use |crep| to migrate a self-managed |sr| to |ccloud|.