.. _replicator_run: Tutorial: Configure and Run |crep| for |cp| as an Executable or Connector ========================================================================= |crep-full| is available as a part of |cp|. This tutorial provides download and install instructions, along with step-by-step guides on how to run |crep| as an executable or as a plugin on a |kconnect| cluster. .. _replicator_install: Download and install |crep| ########################### To get started: #. Download |cp| from :cp-download:`https://confluent.io/download/|`. #. Extract the contents of the archive. For further install instructions, see :ref:`installation-overview`. #. Start two clusters of |ak-tm| brokers. For more information, see :ref:`replicator_quickstart`. Configure and Run |crep| ######################## .. include:: includes/replicator-run-methods.rst To learn more about configuration options to troubleshoot and improve performance, see :ref:`replicator_tuning` and :ref:`rep-tuning-set-compression`. .. _replicator_executable: Run |crep| as an executable --------------------------- |crep| Executable is a quick and easy way to configure and start |crep| within a |kconnect| cluster. This is recommended for deployments where a pre-existing |kconnect| cluster cannot be used. After |cp| is installed, find |crep| executable as ``bin/replicator`` (ZIP and TAR) within your installation directory. Running ``bin/replicator`` without arguments will print on your terminal a list of all the available command line arguments. .. important:: The |crep| Executable uses a version of the |crep| connector that matches the installed version of |cp|. Although the |crep| connector is available from |c-hub|, do not change or upgrade the |crep| connector after installation unless instructed by Confluent. ---------------------------- Origin cluster configuration ---------------------------- |crep| Executable requires all configurations appropriate to the consumer from the origin cluster to be placed in a properties file. Use ``--consumer.config`` to point to the file that contains these properties. For example, this could be a file named ``consumer.properties`` with the following contents: :: bootstrap.servers=localhost:9082 All |ak| consumer properties are valid in this file, for a full list of these see :ref:`cp-config-consumer`. --------------------------------- Destination cluster configuration --------------------------------- |crep| Executable requires all configurations appropriate to the producer to the destination cluster to be placed in a properties file. Use ``--producer.config`` to point to the file that contains these properties. For example, this could be a file named ``producer.properties`` containing:: bootstrap.servers=localhost:9092 All Kafka producer properties are valid in this file, for a full list of these see :ref:`cp-config-producer`. .. note:: The property names defined in ``--consumer.config`` and ``--producer.config`` should not be prefixed and should match exactly those in the consumer/producer configurations referenced. -------------------- |crep| configuration -------------------- |crep| Executable allows any non-connection related |crep| properties to be overridden in a properties file referenced by ``--replication.config``. Configuration for the following sections of |crep| properties can be placed here: - :ref:`rep-destination-topics` - :ref:`consumer_offset_translation` - :ref:`replicator-connector-license-config` - :ref:`replicator_offset_management` For example, this could be a file named ``replication.properties`` containing the following. :: confluent.topic.bootstrap.servers=localhost:9092 offset.start=consumer .. note:: The property ``group.id`` is a special case in |crep| Executable and should not be provided. For more information see :ref:`replicator_cluster_or_group_id`. ------------------------------ Connect cluster configurations ------------------------------ |crep| Executable allows any |kconnect| related properties to be overridden in a properties file referenced by ``--replication.config``. For a full list of |kconnect| configurations see :ref:`cp-config-connect` For example, this could be a file named ``replication.properties`` containing:: offset.storage.topic=connect-offsets status.storage.topic=connect-status config.storage.topic=connect-configs .. important:: Client configurations should not be included in the file referenced by ``--replication.config``. The following prefixes are not allowed. Provide these without prefix in a separate configuration file: =================================== ============================================================= Prefix Executable configuration =================================== ============================================================= ``src.kafka.`` ``--consumer.config`` ``src.consumer.`` ``--consumer.config`` ``dest.kafka.`` ``--producer.config`` =================================== ============================================================= ------------------------------------- Monitoring interceptor configurations ------------------------------------- To enable monitoring interceptors, you may include their properties in the same files, or in separate ones, that you will pass to |crep| Executable using the parameters ``--consumer.monitoring.config`` and ``--producer.monitoring.config`` respectively. These properties do not require a ``producer.`` or ``consumer.`` prefix. For example, you can use ``interceptor.classes`` as opposed to ``producer.interceptor.classes``. An example, configuration could be a file named ``interceptors.properties`` containing:: interceptor.classes=io.confluent.monitoring.clients.interceptor.MonitoringProducerInterceptor confluent.monitoring.interceptor.bootstrap.servers=localhost:9092 .. _replicator_exec_cluster-id: ----------------------------- Configure replication and run ----------------------------- |crep| Executable requires the parameter ``--cluster.id``. This parameter defines a unique identifier for the |kconnect| cluster created when the |crep| Executable is started. |crep| Executable instances with the same ``cluster.id`` form a cluster and share |crep| workload. .. note:: - All instances of the |crep| Executable with the same ``--cluster.id`` should be started with the exact same overall configuration. - For non-executable deployments (using |kconnect| workers), the ``group.id`` of the |kconnect| worker(s) is used as the unique ID for the cluster, and serves the same purpose as ``--cluster.id``. To learn more about both parameters, see :ref:`replicator_cluster_or_group_id`. - The ``name`` of the replicator is the connector name submitted to the embedded |kconnect| cluster. The replicator ``name`` defaults to “replicator” and is used as the consumer ``group.id`` if ``group.id`` is not specified. When running |crep| as an executable, use replicator ``name`` (specified or default) to set consumer group names. You can now specify the configuration properties related for data replication. There are multiple ways to do this: * Store all the configuration in a file, and pass this file to |crep| Executable using the parameter ``--replication.config``. For example:: replicator \ --consumer.config ./consumer.properties \ --producer.config ./producer.properties \ --cluster.id replicator \ --replication.config ./replication.properties * Pass the replication properties from the command line using individual parameters, each corresponding to a property. For example, specify the original topics using ``--whitelist``. Confluent license can be passed using ``--confluent.license``. For example:: replicator \ --consumer.config ./consumer.properties \ --producer.config ./producer.properties \ --cluster.id replicator \ --whitelist test-topic \ --confluent.license "XYZ" * Use a mixture of some replication properties in a file and the rest using command line arguments. For example:: replicator \ --consumer.config ./consumer.properties \ --producer.config ./producer.properties \ --cluster.id replicator \ --replication.config ./replication.properties \ --whitelist test-topic ----------------- Configure logging ----------------- |crep| Executable reads logging settings from the file ``etc/kafka-connect-replicator/replicator-log4j.properties``. By default, it writes to the console, but for production deployments you should log to a file. Before you start |crep| executable, add these lines to the ``replicator-log4j.properties`` file:: log4j.appender.file=org.apache.log4j.RollingFileAppender log4j.appender.file.File=logs/replicator.log log4j.appender.file.layout=org.apache.log4j.PatternLayout log4j.appender.file.layout.ConversionPattern=[%d] %p %m (%c:%L)%n log4j.appender.file.MaxFileSize=10MB log4j.appender.file.MaxBackupIndex=5 log4j.appender.file.append=true Add the appender file that you created above to the ``log4j.rootLogger`` parameter:: # By default log4j.rootLogger=INFO, stdout, file .. _replicator_as_a_connector: Run |crep| as a connector ------------------------- To run |crep| as a Connector in the recommended distributed |kconnect| cluster see :ref:`replicator_run_manual_config`. If deploying |cp| on AWS VMs, be aware that VMs with burstable CPU types (T2, T3, T3a, and T4g) will not support high throughput streaming workloads. |crep| worker nodes running on these VMs experience throughput degradation due to credits expiring, making these VMs unsuitable for |cp| nodes expected to run at elevated CPU levels for a sustained period of time, and supporting workloads that are above and beyond their baseline resource rates. .. _testing_replicator: ---------------- Test your |crep| ---------------- Following is a generic |crep| testing scenario. A similar testing strategy is covered with more context as a part of the |crep| tutorial in the section :ref:`config-and-run-replicator`. #. Create a test topic. If you haven't already, create a topic named ``test-topic`` in the source cluster with the following command. :: ./bin/kafka-topics --create --topic test-topic --replication-factor \ 1 --partitions 4 --bootstrap-server localhost:9082 ./bin/kafka-topics --describe --topic test-topic.replica --bootstrap-server localhost:9092 The ``kafka-topics --describe --topic`` step in the above command checks whether ``test-topic.replica`` exits. After verifying that the topic exists, confirm that four partitions were created. In general, the |crep| makes sure that the destination topic has at least as many partitions as the source topic. It is fine if it has more, but because the |crep| preserves the partition assignment of the source data, any additional partitions will not be utilized. #. Send data to the source cluster. At any time after you've created the topic in the source cluster, you can begin sending data to it using a |ak| producer to write to ``test-topic`` in the source cluster. You can then confirm that the data has been replicated by consuming from ``test-topic.replica`` in the destination cluster. For example, to send a sequence of numbers using |ak|'s console producer, you can use the following command. :: seq 10000 | ./bin/kafka-console-producer --topic test-topic --broker-list localhost:9082 #. Run a consumer to confirm that the destination cluster got the data. You can then confirm delivery in the destination cluster using the console consumer. :: ./bin/kafka-console-consumer --from-beginning --topic test-topic.replica \ --bootstrap-server localhost:9092 .. _replicator_executable_as_docker_image: ----------------------------------------- Run |crep| executable from a Docker image ----------------------------------------- The simplest way to run |crep| is as an executable from a script or :ref:`from a Docker image `. You can find the ``docker run`` commands and configuration parameters for |crep| in that documentation. .. _replicator_executable_command_line_parameters: -------------------------------------------- Command line parameters of |crep| executable -------------------------------------------- The available commands line parameters are: =================================== ================================ ========================================================= Command line parameter Value Description =================================== ================================ ========================================================= ``--blacklist`` A comma-separated list of topics that should not be replicated, even if they are included in the whitelist or matched by the regular expression. ``--cluster.id`` Specifies the unique identifier for the |crep| cluster. (required) ``--cluster.threads`` The total number of threads across all workers in the |crep| cluster. If this command starts another |crep| worker in an existing cluster, this can be used to change the number of threads in the whole cluster. ``--confluent.license`` Your Confluent license key that enables you to use |crep|. Without the license key, you can use |crep| for a 30-day trial period. If you are a subscriber, contact Confluent Support for more information. ``--consumer.config`` Specifies the location of the file that contains the configuration settings for the consumer reading from the origin cluster. (required) ``--consumer.monitoring.config`` Specifies the location of the file that contains the producer settings for the |ak| cluster where monitoring information about the |crep| consumer is to be sent. This must be specified if monitoring is to be enabled, but may point to a different |ak| cluster than the origin or destination clusters. Use the same file as `--producer-config` to write metrics to the destination cluster. ``-h``, ``--help`` Display help information ``--producer.config`` Specifies the location of the file that contains the configuration settings for the producer writing to the destination cluster. (required) ``--producer.monitoring.config`` Specifies the location of the file that contains the producer settings for the |ak| cluster where monitoring information about the |crep| producer is to be sent. This must be specified if monitoring is to be enabled, but may point to a different |ak| cluster than the origin or destination clusters. Use the same file as --producer-config to write metrics to the destination cluster. ``--replication.config`` Specifies the location of the file that contains the configuration settings for replication. When used, any property in this file can be overridden via a command line parameter. When this is not supplied, all of the properties defining how topics are to be replicated should be specified on the command line. ``--topic.auto.create`` true/false Whether to automatically create topics in the destination cluster if required. ``--topic.config.sync`` true/false Whether to periodically sync topic configuration to the destination cluster. ``--topic.config.sync.interval.ms`` How often to check for configuration changes when 'topic.config.sync' is enabled. ``--topic.create.backoff.ms`` Time to wait before retrying auto topic creation or expansion. ``--topic.poll.interval.ms`` Specifies how frequently to poll the source cluster for new topics ``--topic.preserve.partitions`` true/false Whether to automatically increase the number of partitions in the destination cluster to match the source cluster and ensure that messages replicated from the source cluster use the same partition in the destination cluster. ``--topic.regex`` ``--topic.rename.format`` A format string for the topic name in the destination cluster, which may contain ${topic} as a placeholder for the originating topic name. For example, ${topic}_dc1 for the topic 'orders' will map to the destination topic name 'orders_dc1.' Can be placed inside the file specified by --replication.config. ``--topic.timestamp.type`` The timestamp type for the topics in the destination cluster. ``--whitelist`` A comma-separated list of the names of topics that should be replicated. Any topic that is in this list and not in the blacklist will be replicated. =================================== ================================ ========================================================= .. _convert_replicator_from_connect_to_executable: Convert |crep| connector configurations to |crep| executable configurations --------------------------------------------------------------------------- |crep| connect configuration can be converted to a |crep| executable configuration. One of the key differences between the two is that the Connect configuration has two configuration files (a worker properties file and a connector properties or JSON file) while |crep| executable has three configuration files (a consumer, a producer, and a replication properties file). It's helpful to think about this in the following way: * The consumer configuration file contains all the properties you need to configure the consumer embedded within |crep| that consumes from the source cluster. This would include any special configurations you want to use to tune the source consumer, in addition to the necessary security and connection details needed for the consumer to connect to the source cluster. * The producer configuration file contains all the properties you need to configure the producer embedded within |crep| that produces to the destination cluster. This would include any special configurations you want to use to tune the destination producer, in addition to the necessary security and connection details needed for the producer to connect to the destination cluster. * The replication configuration file contains all the properties you need to configure the actual |crep| that does the work of taking the data from the source consumer and passing it to the destination producer. This would include all Connect-specific configurations needed for |crep| as well as any necessary |crep| configurations. If you have the following worker properties: :: config.storage.replication.factor=3 offset.storage.replication.factor=3 status.storage.replication.factor=3 connect.protocol=eager connector.client.config.override.policy=All bootstrap.servers=destination-cluster:9092 ssl.endpoint.identification.algorithm=https sasl.mechanism=PLAIN security.protocol=SASL_SSL sasl.jaas.config=org.apache.kafka.common.security.plain.PlainLoginModule required username=\"destUser\" password=\"destPassword\"; And the following |crep| JSON: .. codewithvars:: json { "connector.class":"io.confluent.connect.replicator.ReplicatorSourceConnector", "tasks.max":4, "topic.whitelist":"test-topic", "topic.rename.format":"${topic}.replica", "confluent.license":"XYZ" "name": "replicator", "header.converter": "io.confluent.connect.replicator.util.ByteArrayConverter", "key.converter": "io.confluent.connect.replicator.util.ByteArrayConverter", "value.converter": "io.confluent.connect.replicator.util.ByteArrayConverter", "src.consumer.max.poll.records":"10000", "producer.override.linger.ms":"10", "producer.override.compression.type":"lz4", "src.kafka.bootstrap.servers": "source-cluster:9092", "src.kafka.ssl.endpoint.identification.algorithm": "https", "src.kafka.security.protocol": "SASL_SSL", "src.kafka.sasl.mechanism": "PLAIN", "src.kafka.sasl.jaas.config": "org.apache.kafka.common.security.plain.PlainLoginModule required username=\"sourceUser\" password=\"sourcePassword\";", "dest.kafka.bootstrap.servers": "destination-cluster:9092", "dest.kafka.ssl.endpoint.identification.algorithm": "https", "dest.kafka.security.protocol": "SASL_SSL", "dest.kafka.sasl.mechanism": "PLAIN", "dest.kafka.sasl.jaas.config": "org.apache.kafka.common.security.plain.PlainLoginModule required username=\"destUser\" password=\"destPassword\";" } You can convert the configuration shown above in these two configuration files to the following Consumer, Producer, and Replication configurations needed to use |crep| executable: **Consumer Configurations**:: bootstrap.servers=source-cluster:9092 ssl.endpoint.identification.algorithm=https security.protocol=SASL_SSL sasl.mechanism=PLAIN sasl.jaas.config=org.apache.kafka.common.security.plain.PlainLoginModule required username=\"sourceUser\" password=\"sourcePassword\"; max.poll.records=10000 For the consumer configurations, strip the ``src.kafka`` and ``src.consumer`` prefixes and simply list the actual configuration you want for the source consumer. The |crep| executable will know that because this has been placed in the consumer configuration, it needs to apply these configurations to the source consumer that will poll the source cluster. **Producer Configurations**:: bootstrap.servers=destination-cluster:9092 ssl.endpoint.identification.algorithm=https security.protocol=SASL_SSL sasl.mechanism=PLAIN sasl.jaas.config=org.apache.kafka.common.security.plain.PlainLoginModule required username=\"destUser\" password=\"destPassword\"; linger.ms=10 compression.type=lz4 For the producer configurations, strip the ``dest.kafka`` and ``producer.overrides`` prefixes and simply list the actual configuration you want for the destination producer. The |crep| executable will know that because this has been placed in the producer configuration, it needs to apply these configurations to the destination producer that will write to the destination cluster. **Replication Configurations**:: config.storage.replication.factor=3 offset.storage.replication.factor=3 status.storage.replication.factor=3 connect.protocol=eager tasks.max=4 topic.whitelist=test-topic topic.rename.format=${topic}.replica confluent.license=XYZ name=replicator header.converter=io.confluent.connect.replicator.util.ByteArrayConverter key.converter=io.confluent.connect.replicator.util.ByteArrayConverter value.converter=io.confluent.connect.replicator.util.ByteArrayConverter For the replication configurations, only include the configurations that are important for |crep| or Connect. It's important to note that here you don't need the ``connector.client.config.override.policy`` configuration anymore, as the |crep| executable directly passes in the producer configurations specified in the configuration file. This makes it easier to think about configuring the important consumers and producers for replication, rather than incorporating an extra |kconnect| configuration. .. _replicator_run_manual_config: Manually configure and run |crep| on |kconnect| clusters -------------------------------------------------------- After downloading and installing |cp|, per :ref:`replicator_install`, bring up two clusters of |ak| brokers. To learn more, refer to :ref:`replicator_quickstart`. |crep| is a |kconnect-long| Plugin. To run |crep|, you need to take the following steps: * Install and Configure the |kconnect| Cluster * Configure and run a |crep-full| on the |kconnect| Cluster This section walks you through both these steps in detail, and reviews the available configuration options for |crep|. ------------------------------------------- Configure the |kconnect| cluster for |crep| ------------------------------------------- |crep| runs as a plugin (:connect-common:`Connector|supported.html`) in |kconnect|, so you'll need to run |kconnect| Workers before you can run |crep|. The :ref:`quick start `, shows how to run |crep| in a single node |kconnect| cluster. Refer to |kconnect| documentation to learn how to run |kconnect| in ::connect-common:`distributed mode|userguide.html#distributed-mode`. Keep in mind the following recommendations and best practices when configuring distributed |kconnect| Workers for |crep|. Configuring origin and destination brokers ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~ |kconnect| clusters are associated with a cluster of |ak| brokers. The brokers of the |ak| cluster are specified in ``bootstrap.servers`` configuration parameter of the |kconnect| Workers. If you are configuring a new |kconnect| Worker cluster for running |crep|, make sure this parameter contains the **destination** |ak| brokers cluster. If you are planning to run |crep| on an existing |kconnect| cluster, make sure it is already associated with the **destination** brokers. .. note:: |crep| is responsible for reading events from the origin cluster. It then passes the events to the |kconnect| Worker responsible for writing the events to the destination cluster. Therefore, you configure |crep| with information about the origin and the Worker with information about the destination. Where to install |kconnect| workers ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~ If you are replicating events between different datacenters (rather than between two |ak| clusters in the same datacenter), best practice is to run the |kconnect| Workers in the **destination** datacenter. For example, if you are sending data from New York to San Francisco, |crep| should run in SF and consume data across the US from NYC. The reason for this is that long distance networks can be a bit less reliable than inside a datacenter. If there is a network partition and you lose connectivity between the datacenters, having a consumer that is unable to connect to a cluster is less disruptive than a producer that cannot connect. Remote consuming tends to be a better model than remote producing. That said, there is no inherent risk in running |crep| at the origin datacenter. |crep| will capture and forward all events, including in cases of connectivity loss. Running |crep| on existing |kconnect| cluster ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~ You can run |crep| on the same |kconnect| cluster as other connectors, but in some cases it is not recommended: * If you are replicating data between two datacenters that are far apart and thus have high latency, you’ll want to tune both the |kconnect| Worker and |crep| appropriately. Intra-DC tuning is different from inter-DC tuning for this very reason. By giving |crep| its own |kconnect| cluster, you can tune the |kconnect| Workers specifically for |crep| without worrying about other connectors being affected. * Any changes to a connector will cause |crep| to pause while connectors are being re-assigned to |kconnect| Workers. If you frequently start and stop connectors, you may want to run |crep| on its own cluster and allow it to run without interruptions. Configuring logging for |kconnect| cluster ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~ |kconnect| logging is configured in the file ``etc/kafka/connect-log4j.properties``. By default, it writes to the console, but for production deployments you should log to a file. Before you start |crep|, add these lines to the ``connect-log4j.properties`` file:: log4j.appender.file=org.apache.log4j.RollingFileAppender log4j.appender.file.File=logs/replicator.log log4j.appender.file.layout=org.apache.log4j.PatternLayout log4j.appender.file.layout.ConversionPattern=[%d] %p %m (%c:%L)%n log4j.appender.file.MaxFileSize=10MB log4j.appender.file.MaxBackupIndex=5 log4j.appender.file.append=true Add the appender file that you just created to the ``log4j.rootLogger`` parameter:: # By default log4j.rootLogger=INFO, stdout, file -------------------------------------------------- Configure and run |crep| on the |kconnect| cluster -------------------------------------------------- You should have at least one distributed mode |kconnect| Worker already up and running. To learn more, review the :connect-common:`distributed mode documentation |userguide.html#distributed-mode`. You can check if the |kconnect| Worker is up and running by checking its REST API: .. codewithvars:: bash curl http://localhost:8083/ {"version":"|kafka_release|","commit":"078e7dc02a100018"} If everything is fine, you will see a version number and commit hash for the version of the |kconnect| Worker you are running. Run |crep| by sending the |kconnect| REST API its configuration file in JSON format. Here's an example configuration:: { "name":"replicator", "config":{ "connector.class":"io.confluent.connect.replicator.ReplicatorSourceConnector", "tasks.max":4, "key.converter":"io.confluent.connect.replicator.util.ByteArrayConverter", "value.converter":"io.confluent.connect.replicator.util.ByteArrayConverter", "src.kafka.bootstrap.servers":"localhost:9082", "topic.whitelist":"test-topic", "topic.rename.format":"${topic}.replica", "confluent.license":"XYZ" } } You can send this to |crep| using ``curl``. This assumes the above JSON is in a file called ``example-replicator.json``:: curl -X POST -d @example-replicator.json http://localhost:8083/connectors --header "content-Type:application/json" This example demonstrates use of some important configuration parameters. For an explanation of all configuration parameters, see :ref:`replicator_config_options`. * ``key.converter`` and ``value.converter`` - Classes used to convert |ak| records to |kconnect|'s internal format. The |kconnect| Worker configuration specifies global converters and those will be used if you don't specify anything in the |crep| configuration. For Replication, however, no conversion is necessary. You just want to read bytes out of the origin cluster and write them to the destination with no changes. Therefore, you can override the global converters with the ``ByteArrayConverter``, which leaves the records as is. * ``src.kafka.bootstrap.servers`` - A list of brokers from the **origin** cluster * ``topic.whitelist`` - An explicit list of the topics that you want replicated. The quick start replicates a topic named ``test-topic``. .. tip:: You can also tell |crep| which topics to replicate using a regular expression with the :ref:`topic.regex parameter `. You should use a regular expression if you want |crep| to automatically start replicating new topics if they match a certain pattern. For example, to replicate all production topics, including new ones, configure |crep| to replicate topics that match ``prod.*``. If you add new topics to the list, you must bounce |crep| for the change to take effect. * ``topic.rename.format`` - A substitution string that is used to rename topics in the destination cluster. The snippet above uses ``${topic}.replica``, where ``${topic}`` will be substituted with the topic name from the origin cluster. That means that the ``test-topic`` being replicated from the origin cluster will be renamed to ``test-topic.replica`` in the destination cluster. * ``confluent.license`` - Without the license, you can use |crep| for a 30-day trial period. If you are a Confluent customer, you can contact customer support and ask for a |crep| license. Then use it as shown in the example. .. _run-crep-on-source-cluster: Run |crep| on the source cluster -------------------------------- |crep| should be run on the destination cluster if possible. If this is not practical it is possible to run |crep| on the source cluster from |cp| 5.4.0 onwards. Make the following changes to run |crep| in this way: * ``connector.client.config.override.policy`` to be set to ``All`` in the |kconnect| worker configuration or in ``--replication.config`` if using |crep| Executable. * ``bootstrap.servers`` in the |kconnect| worker configuration should point to the source cluster (for |crep| Executable specify this in ``--producer.config``) * any client configurations (security etc.) for the source cluster should be provided in the |kconnect| worker configuration (for |crep| Executable specify these in ``--producer.config``) * ``producer.override.bootstrap.servers`` in the connector configuration should point to the destination cluster (for |crep| Executable specify this in ``--replication.config``) * any client configurations (security etc.) for the destination cluster should be provided in the connector configuration with prefix ``producer.override.`` (for |crep| Executable specify these in ``--replication.config``) * configurations with the prefix ``src.kafka.`` and ``dest.kafka`` should be provided as usual An example configuration for |crep| running as a connector on the source cluster can be seen below: .. codewithvars:: bash { "connector.class": "io.confluent.connect.replicator.ReplicatorSourceConnector", "name": "replicator", "producer.override.ssl.endpoint.identification.algorithm": "https", "producer.override.sasl.mechanism": "PLAIN", "producer.override.request.timeout.ms": 20000, "producer.override.bootstrap.servers": "destination-cluster:9092", "producer.override.retry.backoff.ms": 500, "producer.override.sasl.jaas.config": "org.apache.kafka.common.security.plain.PlainLoginModule required username=\"someUser\" password=\"somePassword\";", "producer.override.security.protocol": "SASL_SSL", "key.converter": "io.confluent.connect.replicator.util.ByteArrayConverter", "value.converter": "io.confluent.connect.replicator.util.ByteArrayConverter", "topic.whitelist": "someTopic", "src.kafka.bootstrap.servers": "source-cluster:9092", "dest.kafka.bootstrap.servers": "destination-cluster:9092", "dest.kafka.ssl.endpoint.identification.algorithm": "https", "dest.kafka.security.protocol": "SASL_SSL", "dest.kafka.sasl.mechanism": "PLAIN", "dest.kafka.sasl.jaas.config": "org.apache.kafka.common.security.plain.PlainLoginModule required username=\"someUser\" password=\"somePassword\";" } In this configuration |crep| is producing between clusters rather than consuming and the default producer configurations are not optimal for this. Consider adjusting the following configurations to increase the throughput of the producer flow: * ``producer.override.linger.ms=500`` * ``producer.override.batch.size=600000`` These values are provided as a starting point only and should be further tuned to your environment and use case. For more detail on running |crep| on the source cluster when the destination is |ccloud|, see :cloud:`Confluent Replicator to Confluent Cloud Configurations|get-started/examples/ccloud/docs/replicator-to-cloud-configuration-types.html`. License key ----------- Without the license key, you can use |crep| Executable for a 30-day trial period. If you are a Confluent customer, you can contact customer support and ask for a |crep| license key. Then, use the key you received from Confluent support with ``--confluent.license`` command line parameter or by adding it to the ``confluent.license`` property within the replication configuration file you pass to ``--replication.config``. .. include:: includes/license-expiry.rst Related content --------------- - For a reference of all configuration options for the |crep| Connector, see :ref:`replicator_config_options`. - For basic information on the connector, see :ref:`replicator_detail`.