.. _replicator_run: Configure and Run |crep| ======================== .. include:: includes/replicator-run-methods.rst .. _replicator_executable: Run |crep| as an Executable --------------------------- |crep| Executable is a quick and easy way to configure and start |crep| within a |kconnect| cluster. This is recommended for deployments where a pre-existing |kconnect| cluster cannot be used. After |cp| is installed, find |crep| Executable as ``bin/replicator`` (ZIP and TAR) within your installation directory. Running ``bin/replicator`` without arguments will print on your terminal a list of all the available command line arguments. .. important:: The Replicator Executable uses a version of the Replicator connector that matches the installed version of |cp|. Although the Replicator connector is available from Confluent Hub, do not change or upgrade the Replicator connector after installation unless instructed by Confluent. ---------------------------- Origin Cluster Configuration ---------------------------- |crep| Executable requires all configurations appropriate to the consumer from the origin cluster to be placed in a properties file. Use ``--consumer.config`` to point to the file that contains these properties. For example, this could be a file named ``consumer.properties`` with contents :: zookeeper.connect=localhost:2171 bootstrap.servers=localhost:9082 .. tip:: The |zk| properties are optional. All Kafka consumer properties are valid in this file, for a full list of these see :ref:`cp-config-consumer`. --------------------------------- Destination Cluster Configuration --------------------------------- |crep| Executable requires all configurations appropriate to the producer to the origin cluster to be placed in a properties file. Use ``--producer.config`` to point to the file that contains these properties. For example, this could be a file named ``producer.properties`` containing:: zookeeper.connect=localhost:2181 bootstrap.servers=localhost:9092 .. tip:: The |zk| properties are optional. All Kafka producer properties are valid in this file, for a full list of these see :ref:`cp-config-producer`. .. note:: The property names defined in ``--consumer.config`` and ``--producer.config`` should not be prefixed and should match exactly those in the consumer/producer configurations referenced. -------------------- |crep| Configuration -------------------- |crep| Executable allows any non-connection related |crep| properties to be overridden in a properties file referenced by ``--replication.config``. Configuration for the following sections of |crep| properties can be placed here: - :ref:`rep-destination-topics` - :ref:`consumer_offset_translation` - :ref:`replicator-connector-license-config` - :ref:`replicator_offset_management` For example, this could be a file named ``replication.properties`` containing the following. :: confluent.topic.bootstrap.servers=localhost:9092 offset.start=consumer .. note:: The property ``group.id`` is a special case in |crep| Executable and should not be provided. For more information see :ref:`replicator_cluster_or_group_id`. ------------------------------ Connect Cluster Configurations ------------------------------ |crep| Executable allows any |kconnect| related properties to be overridden in a properties file referenced by ``--replication.config``. For a full list of |kconnect| configurations see :ref:`cp-config-connect` For example, this could be a file named ``replication.properties`` containing:: offset.storage.topic=connect-offsets status.storage.topic=connect-status config.storage.topic=connect-configs ------------------------------------- Monitoring Interceptor Configurations ------------------------------------- To enable monitoring interceptors, you may include their properties in the same files, or in separate ones, that you will pass to |crep| Executable using the parameters ``--consumer.monitoring.config`` and ``--producer.monitoring.config`` respectively. These properties do not require a ``producer.`` or ``consumer.`` prefix. For example, you can use ``interceptor.classes`` as opposed to ``producer.interceptor.classes``. An example, configuration could be a file named ``interceptors.properties`` containing:: interceptor.classes=io.confluent.monitoring.clients.interceptor.MonitoringProducerInterceptor confluent.monitoring.interceptor.bootstrap.servers=localhost:9092 .. _replicator_exec_cluster-id: ----------------------------- Configure Replication and Run ----------------------------- |crep| Executable requires the parameter ``--cluster.id``. This parameter defines a unique identifier for the |kconnect| cluster created when the |crep| Executable is started. |crep| Executable instances with the same ``cluster.id`` form a cluster and share |crep| workload. .. note:: - All instances of the |crep| Executable with the same ``--cluster.id`` should be started with the exact same overall configuration. - For non-executable deployments (using |kconnect| workers), the ``group.id`` of the |kconnect| worker(s) is used as the unique ID for the cluster, and serves the same purpose as ``--cluster.id``. To learn more about both parameters, see :ref:`replicator_cluster_or_group_id`. You can now specify the configuration properties related for data replication. There are multiple ways to do this: * Store all the configuration in a file, and pass this file to |crep| Executable using the parameter ``--replication.config``. For example:: replicator \ --consumer.config ./consumer.properties \ --producer.config ./producer.properties \ --cluster.id replicator \ --replication.config ./replication.properties * Pass the replication properties from the command line using individual parameters, each corresponding to a property. For example, specify the original topics using ``--topic.whitelist``. Confluent license can be passed using ``--confluent.license``. For example:: replicator \ --consumer.config ./consumer.properties \ --producer.config ./producer.properties \ --cluster.id replicator \ --topic.whitelist test-topic \ --confluent.license "XYZ" * Use a mixture of some replication properties in a file and the rest using command line arguments. For example:: replicator \ --consumer.config ./consumer.properties \ --producer.config ./producer.properties \ --cluster.id replicator \ --replication.config ./replication.properties \ --topic.whitelist test-topic ----------------- Configure Logging ----------------- |crep| Executable reads logging settings from the file ``etc/kafka-connect-replicator/replicator-log4j.properties``. By default, it writes to the console, but for production deployments you should log to a file. Before you start |crep| Executable, add these lines to the ``replicator-log4j.properties`` file:: log4j.appender.file=org.apache.log4j.RollingFileAppender log4j.appender.file.File=logs/replicator.log log4j.appender.file.layout=org.apache.log4j.PatternLayout log4j.appender.file.layout.ConversionPattern=[%d] %p %m (%c:%L)%n log4j.appender.file.MaxFileSize=10MB log4j.appender.file.MaxBackupIndex=5 log4j.appender.file.append=true Add the appender file that you created above to the ``log4j.rootLogger`` parameter:: # By default log4j.rootLogger=INFO, stdout, file .. _replicator_as_a_connector: Run |crep| as a Connector ------------------------- .. important:: This section covers running |crep| in a Standalone |kconnect| environment. This is useful for testing but is not recommended for production deployments. To run |crep| as a Connector in the recommended distributed |kconnect| cluster see :ref:`replicator_run_manual_config`. #. The script that runs the stand-alone Connect Worker takes two configuration files. The first is the configuration for the Connect Worker itself and the second is the configuration for the Replicator. The Worker configuration file is, by default, ``/etc/kafka/connect-standalone.properties``. Make sure that this configuration matches the settings in the destination cluster.:: # Connect Standalone Worker configuration bootstrap.servers=localhost:9092 .. note:: If the destination cluster is secured you may need to specify security properties for the |kconnect| worker. For more information on this see :ref:`replicator_security_overview`. The |crep| configuration file is, by default, ``/etc/kafka-connect-replicator/quickstart-replicator.properties``. This should contain details of source and destination clusters along with the topics to replicate. The following example shows a minimal configuration, for a complete set of configuration options see :ref:`here ` :: name=replicator connector.class=io.confluent.connect.replicator.ReplicatorSourceConnector tasks.max=4 key.converter=io.confluent.connect.replicator.util.ByteArrayConverter value.converter=io.confluent.connect.replicator.util.ByteArrayConverter src.kafka.bootstrap.servers=localhost:9082 dest.kafka.bootstrap.servers=localhost:9092 topic.whitelist=test-topic topic.rename.format=${topic}.replica .. note:: If the destination cluster is secured you may need to specify security properties for |crep|. For more information on this see :ref:`replicator_security_overview`. #. Run the connector in a standalone |kconnect-long| worker:: ./bin/connect-standalone ./etc/kafka/connect-standalone.properties \ ./etc/kafka-connect-replicator/quickstart-replicator.properties .. _testing_replicator: -------------------- Test Your Replicator -------------------- #. Create a test topic. If you haven't already, create a topic named ``test-topic`` in the source cluster with the following command. :: ./bin/kafka-topics --create --topic test-topic --replication-factor \ 1 --partitions 4 --zookeeper localhost:2171 ./bin/kafka-topics --describe --topic test-topic.replica --zookeeper localhost:2181 The ``kafka-topics --describe --topic`` step in the above command checks whether ``test-topic.replica`` exits. After verifying that the topic exists, confirm that four partitions were created. In general, the replicator makes sure that the destination topic has at least as many partitions as the source topic. It is fine if it has more, but since the replicator preserves the partition assignment of the source data, any additional partitions will not be utilized. #. Send data to the source cluster. At any time after you've created the topic in the source cluster, you can begin sending data to it using a |ak| producer to write to ``test-topic`` in the source cluster. You can then confirm that the data has been replicated by consuming from ``test-topic.replica`` in the destination cluster. For example, to send a sequence of numbers using |ak|'s console producer, you can use the following command. :: seq 10000 | ./bin/kafka-console-producer --topic test-topic --broker-list localhost:9082 #. Run a consumer to confirm that the destination cluster got the data. You can then confirm delivery in the destination cluster using the console consumer. :: ./bin/kafka-console-consumer --from-beginning --topic test-topic.replica \ --bootstrap-server localhost:9092 .. _replicator_run_docker_image: Run |crep| Executable from a Docker Image ----------------------------------------- The simplest way to run |crep| is as an executable from a script or :ref:`from a Docker image `. You can find the ``docker run`` commands and configuration parameters for |crep| in that documentation. -------------------------------------------- Command Line Parameters of |crep| Executable -------------------------------------------- The available commands line parameters are: =================================== ================================ ========================================================= Command line parameter Value Description =================================== ================================ ========================================================= ``--blacklist`` A comma-separated list of topics that should not be replicated, even if they are included in the whitelist or matched by the regular expression. ``--cluster.id`` Specifies the unique identifier for the |crep| cluster. (required) ``--cluster.threads`` The total number of threads across all workers in the |crep| cluster. If this command starts another |crep| worker in an existing cluster, this can be used to change the number of threads in the whole cluster. ``--confluent.license`` Your Confluent license key that enables you to use |crep|. Without the license key, you can use |crep| for a 30-day trial period. If you are a subscriber, please contact Confluent Support for more information. ``--consumer.config`` Specifies the location of the file that contains the configuration settings for the consumer reading from the origin cluster. (required) ``--consumer.monitoring.config`` Specifies the location of the file that contains the producer settings for the |ak-tm| cluster where monitoring information about the |crep| consumer is to be sent. This must be specified if monitoring is to be enabled, but may point to a different |ak| cluster than the origin or destination clusters. Use the same file as `--producer-config` to write metrics to the destination cluster. ``-h``, ``--help`` Display help information ``--producer.config`` Specifies the location of the file that contains the configuration settings for the producer writing to the destination cluster. (required) ``--producer.monitoring.config`` Specifies the location of the file that contains the producer settings for the |ak| cluster where monitoring information about the |crep| producer is to be sent. This must be specified if monitoring is to be enabled, but may point to a different |ak| cluster than the origin or destination clusters. Use the same file as --producer-config to write metrics to the destination cluster. ``--replication.config`` Specifies the location of the file that contains the configuration settings for replication. When used, any property in this file can be overridden via a command line parameter. When this is not supplied, all of the properties defining how topics are to be replicated should be specified on the command line. ``--topic.auto.create`` true/false Whether to automatically create topics in the destination cluster if required. ``--topic.config.sync`` true/false Whether to periodically sync topic configuration to the destination cluster. ``--topic.config.sync.interval.ms`` How often to check for configuration changes when 'topic.config.sync' is enabled. ``--topic.create.backoff.ms`` Time to wait before retrying auto topic creation or expansion. ``--topic.poll.interval.ms`` Specifies how frequently to poll the source cluster for new topics ``--topic.preserve.partitions`` true/false Whether to automatically increase the number of partitions in the destination cluster to match the source cluster and ensure that messages replicated from the source cluster use the same partition in the destination cluster. ``--topic.regex`` ``--topic.rename.format`` A format string for the topic name in the destination cluster, which may contain ${topic} as a placeholder for the originating topic name. For example, ${topic}_dc1 for the topic 'orders' will map to the destination topic name 'orders_dc1.' Can be placed inside the file specified by --replication.config. ``--topic.timestamp.type`` The timestamp type for the topics in the destination cluster. ``--whitelist`` A comma-separated list of the names of topics that should be replicated. Any topic that is in this list and not in the blacklist will be replicated. =================================== ================================ ========================================================= .. _replicator_run_manual_config: Manually Configure and Run |crep| on |ak| Clusters -------------------------------------------------- After downloading and installing |cp|, per :ref:`replicator_install`, bring up two clusters of |ak-tm| brokers. If you are not sure about how to do this, refer to :ref:`replicator_quickstart`. |crep| is a |kconnect-long| Plugin. To run |crep|, you need to take the following steps: * Install and Configure the |kconnect| Cluster * Configure and run a |crep-full| on the |kconnect| Cluster This section walks you through both these steps in detail, and reviews the available configuration options for |crep|. ------------------------------------------- Configure the |kconnect| Cluster for |crep| ------------------------------------------- |crep| runs as a plugin (:ref:`Connector`) in |kconnect|, so you'll need to run |kconnect| Workers before you can run |crep|. The :ref:`quick start `, shows how to run |crep| in |kconnect|'s *stand-alone* mode. Stand-alone mode is recommended for proof-of-concepts, tests, and small deployments where the throughput from a single |crep| node is sufficient. For larger-scale production deployments, you'll want to run multiple |kconnect| Workers in *distributed mode*. Refer to |kconnect| documentation to learn how to run |kconnect| in :ref:`distributed mode`. Keep in mind the following recommendations and best practices when configuring distributed |kconnect| Workers for |crep|. Configuring Origin and Destination Brokers ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~ |kconnect| clusters are associated with a cluster of |ak| brokers. The brokers of the |ak| cluster are specified in ``bootstrap.servers`` configuration parameter of the |kconnect| Workers. If you are configuring a new |kconnect| Worker cluster for running |crep|, make sure this parameter contains the **destination** |ak| brokers cluster. If you are planning to run |crep| on an existing |kconnect| cluster, make sure it is already associated with the **destination** brokers. .. note:: |crep| is responsible for reading events from the origin cluster. It then passes the events to the |kconnect| Worker responsible for writing the events to the destination cluster. Therefore, you configure |crep| with information about the origin and the Worker with information about the destination. Where to Install |kconnect| Workers ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~ If you are replicating events between different datacenters (rather than between two |ak| clusters in the same datacenter), best practice is to run the |kconnect| Workers in the **destination** datacenter. For example, if you are sending data from New York to San Francisco, |crep| should run in SF and consume data across the US from NYC. The reason for this is that long distance networks can be a bit less reliable than inside a datacenter. If there is a network partition and you lose connectivity between the datacenters, having a consumer that is unable to connect to a cluster is less disruptive than a producer that cannot connect. Remote consuming tends to be a better model than remote producing. That said, there is no inherent risk in running |crep| at the origin datacenter. |crep| will capture and forward all events, including in cases of connectivity loss. Running |crep| on Existing |kconnect| Cluster ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~ You can run |crep| on the same |kconnect| cluster as other connectors, but in some cases it is not recommended: * If you are replicating data between two datacenters that are far apart and thus have high latency, you’ll want to tune both the |kconnect| Worker and |crep| appropriately. Intra-DC tuning is different from inter-DC tuning for this very reason. By giving |crep| its own |kconnect| cluster, you can tune the |kconnect| Workers specifically for |crep| without worrying about other connectors being affected. * Any changes to a connector will cause |crep| to pause while connectors are being re-assigned to |kconnect| Workers. If you frequently start and stop connectors, you may want to run |crep| on its own cluster and allow it to run without interruptions. Configuring Logging for |kconnect| Cluster ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~ |kconnect| logging is configured in the file ``etc/kafka/connect-log4j.properties``. By default, it writes to the console, but for production deployments you should log to a file. Before you start |crep|, add these lines to the ``connect-log4j.properties`` file:: log4j.appender.file=org.apache.log4j.RollingFileAppender log4j.appender.file.File=logs/replicator.log log4j.appender.file.layout=org.apache.log4j.PatternLayout log4j.appender.file.layout.ConversionPattern=[%d] %p %m (%c:%L)%n log4j.appender.file.MaxFileSize=10MB log4j.appender.file.MaxBackupIndex=5 log4j.appender.file.append=true Add the appender file that you just created to the ``log4j.rootLogger`` parameter:: # By default log4j.rootLogger=INFO, stdout, file -------------------------------------------------- Configure and Run |crep| on the |kconnect| Cluster -------------------------------------------------- The :ref:`quick start ` shows how to run |crep| in |kconnect|'s stand-alone mode. This section shows how to run |crep| on a distributed |kconnect| cluster. You should have at least one distributed mode |kconnect| Worker already up and running. If you are not sure how to do that, review the :ref:`distributed mode documentation`. You can check if the |kconnect| Worker is up and running by checking its REST API: .. codewithvars:: bash $ curl http://localhost:8083/ {"version":"|kafka_release|","commit":"078e7dc02a100018"} If everything is fine, you will see a version number and commit hash for the version of the |kconnect| Worker you are running. Run Replicator by sending the |kconnect| REST API its configuration file in JSON format (note that this is different from stand-alone mode that uses Java's property file format). Here's an example configuration:: { "name":"replicator", "config":{ "connector.class":"io.confluent.connect.replicator.ReplicatorSourceConnector", "tasks.max":4, "key.converter":"io.confluent.connect.replicator.util.ByteArrayConverter", "value.converter":"io.confluent.connect.replicator.util.ByteArrayConverter", "src.kafka.bootstrap.servers":"localhost:9082", "src.zookeeper.connect":"localhost:2171", "dest.zookeeper.connect":"localhost:2181", "topic.whitelist":"test-topic", "topic.rename.format":"${topic}.replica", "confluent.license":"XYZ" } } You can send this to |crep| using ``curl``. This assumes the above JSON is in a file called ``example-replicator.json``:: curl -X POST -d @example-replicator.json http://localhost:8083/connectors --header "content-Type:application/json" This example demonstrates use of some important configuration parameters. For an explanation of all configuration parameters, see :ref:`connect_replicator_config_options`. * ``key.converter`` and ``value.converter`` - Classes used to convert |ak| records to |kconnect|'s internal format. The |kconnect| Worker configuration specifies global converters and those will be used if you don't specify anything in the |crep| configuration. For Replication, however, no conversion is necessary. You just want to read bytes out of the origin cluster and write them to the destination with no changes. Therefore, you can override the global converters with the ``ByteArrayConverter``, which leaves the records as is. * ``src.kafka.bootstrap.servers`` - A list of brokers from the **origin** cluster * ``src.zookeeper.connect`` and ``dest.zookeeper.connect`` - Connection strings for |zk| in the origin and destination clusters respectively. These are used to replicate topic configuration from origin to destination. * ``topic.whitelist`` - An explicit list of the topics that you want replicated. This quick start replicates a topic named ``test-topic``. .. tip:: You can also tell |crep| which topics to replicate using a regular expression with the :ref:`topic.regex parameter `. You should use a regular expression if you want |crep| to automatically start replicating new topics if they match a certain pattern. For example, to replicate all production topics, including new ones, configure |crep| to replicate topics that match ``prod.*``. If you add new topics to the list, you must bounce |crep| for the change to take effect. * ``topic.rename.format`` - A substitution string that is used to rename topics in the destination cluster. The snippet above uses ``${topic}.replica``, where ``${topic}`` will be substituted with the topic name from the origin cluster. That means that the ``test-topic`` being replicated from the origin cluster will be renamed to ``test-topic.replica`` in the destination cluster. * ``confluent.license`` - Without the license, you can use |crep| for a 30-day trial period. If you are a Confluent customer, you can contact customer support and ask for a |crep| license. Then use it as shown in the example. License Key ----------- Without the license key, you can use |crep| Executable for a 30-day trial period. If you are a Confluent customer, you can contact customer support and ask for a |crep| license key. Then, use the key you received from Confluent support with ``--confluent.license`` command line parameter or by adding it to the ``confluent.license`` property within the replication configuration file you pass to ``--replication.config``. Suggested Reading ----------------- - For a reference of all configuration options for the |crep| Connector, see :ref:`connect_replicator_config_options`. - For basic information on the connector, see :ref:`connect_replicator`.