.. _replicator_quickstart: Tutorial: Replicate Data Across |ak| Clusters in |cp| ===================================================== This guide describes how to start two |ak-tm| clusters and then a |crep| process to replicate data between them. Both |kraft| and the deprecated |zk| configuration are shown. Note that for tutorial purposes, you run both clusters on the same machine. In order to do that, you have some additional steps to make sure each cluster has its own ports and data directories. You will not need to perform these changes on the |zk|, controller, or broker configuration if you are running in a normal environment where each cluster has its own servers. .. tabs:: .. group-tab:: |kraft| mode .. figure:: replicator-quickstart-configuration.png :align: center Replicator Quick Start Configuration .. group-tab:: |zk| mode .. figure:: replicator-quickstart-config-zoo.png :align: center Replicator Quick Start Configuration Install prerequisites and command examples ------------------------------------------ These instructions assume you have a local installation of |cp|, the :confluent-cli:`Confluent CLI|installing.html`, and Java 8, 11, or 17 (recommended). For details on Java requirements, see `Java `__ in :ref:`system-requirements`. If you are new to |cp|, you may want to work through the :ref:`quickstart` first, and then return to this tutorial. ---------------- |kraft| and |zk| ---------------- .. important:: As of |cp| 7.5, |zk| is deprecated for new deployments. Confluent recommends |kraft| mode for new deployments. To learn more about running |ak| in |kraft| mode, see :ref:`kraft-overview` and the |kraft| steps in the :ref:`Platform Quick Start `. This tutorial provides examples for both |kraft| mode and |zk| mode. For |kraft|, the examples show a *combined mode* configuration, where for each cluster the broker and controller run on the same server. Currently, combined mode is not intended for production use but is shown here to simplify the tutorial. If you want to run controllers and brokers on separate servers, use |kraft| in isolated mode. To learn more, see :ref:`kraft-overview` and `Kraft mode `__ under :ref:`config-cp-for-production`. ---------------- Command examples ---------------- - The commands shown to start servers and |crep| assume you are running from your |cp| home directory or have |cp| in your CLASSPATH. The examples also assume that your properties files are in the default locations on your |cp| installation, except as otherwise noted. This should make it easier to copy/paste example commands directly into your terminal in most cases. - The examples in this section assume you are running commands from ``$CONFLUENT_HOME`` and using ``my-examples/`` to store your properties files. If you follow this model, you can use the copy-paste options on the code blocks. - ``$CONFLUENT_HOME`` represents ````. On Linux systems, you can use this notation to set a shell environment variable. .. _replicator_tutorial_ports_map: Ports for |ak| brokers and |cp| components ------------------------------------------ The examples in this tutorial define the following port configurations. .. tabs:: .. group-tab:: |kraft| mode ======================================== ====== =============================================== \ Origin Destination ======================================== ====== =============================================== |ak| brokers 9082 9092 |kraft| controllers 9071 9093 Metadata server listeners (in brokers) 8091 8090 |kconnect| |crep| worker 8083 (copies topics from origin -> destination) |c3-short| 9021 ======================================== ====== =============================================== .. group-tab:: |zk| mode ======================================== ====== =============================================== \ Origin Destination ======================================== ====== =============================================== |ak| brokers 9082 9092 |zk| 2171 2181 Metadata server listeners (in brokers) 8091 8090 |kconnect| |crep| worker 8083 (copies topics from origin -> destination) |c3-short| 9021 ======================================== ====== =============================================== Start the destination cluster ----------------------------- The destination cluster configurations (ports, data directories, and so on) are based on the defaults for the template properties files. .. tabs:: .. group-tab:: |kraft| mode #. Change directories to the location where |cp| is installed: .. code:: bash cd $CONFLUENT_HOME #. Create a directory to be used for all of your example files: .. code:: bash mkdir my-examples #. Copy ``etc/kafka/kraft/server.properties`` into the examples directory and rename it to match its purpose: .. code:: bash cp etc/kafka/kraft/server.properties my-examples/server_destination.properties #. Generate a ``random-uuid`` using the kafka-storage tool: .. code:: bash KAFKA_CLUSTER_ID="$(bin/kafka-storage random-uuid)" #. Format the log directories for this server: .. code:: bash ./bin/kafka-storage format -t $KAFKA_CLUSTER_ID -c my-examples/server_destination.properties .. tip:: The ``kafka-storage`` command is run only once per broker/controller. You cannot use this command to update an existing cluster. If you make a mistake in configurations at this point, you must recreate the directories from scratch, and work through the steps again. #. Start the destination server: .. code:: bash ./bin/kafka-server-start my-examples/server_destination.properties .. group-tab:: |zk| mode #. Change directories to the location where |cp| is installed: .. code:: bash cd $CONFLUENT_HOME #. Create a directory to be used for all of your example files: .. code:: bash mkdir my-examples #. Copy ``etc/kafka/zookeeper.properties`` into the examples directory and rename it: .. code:: bash cp etc/kafka/zookeeper.properties my-examples/zookeeper_destination.properties #. Copy ``etc/kafka/server.properties`` into the examples directory and rename it: .. code:: bash cp etc/kafka/server.properties my-examples/server_destination.properties #. Start a |zk| server. In this guide, we are assuming services will run on ``localhost``. - Start |zk| by running this command in its own terminal. .. codewithvars:: bash ./bin/zookeeper-server-start my-examples/zookeeper_destination.properties #. Next, start a |ak| broker to serve as the single node |ak| cluster for the destination. - Start |ak| by running this command in its own terminal. .. codewithvars:: bash ./bin/kafka-server-start my-examples/server_destination.properties For complete details on getting these services up and running see the :ref:`quick start ` instructions for |cp|. .. note:: The destination cluster should be running the same (or higher) version of |cp| as the source cluster. |crep| runs within a |kconnect| cluster linked to the destination cluster, and reads messages from the source cluster. Therefore, |crep| will not be able to interpret the message format if the destination is running an older version. Start the origin cluster ------------------------ Configure and start the origin cluster in a new terminal window. .. tabs:: .. group-tab:: |kraft| mode While you configured the destination node to run on default ports, you will need to run the origin node on a different port to avoid collisions. The |ak| broker on the origin node is configured on port 9082 and the controller is on port 9071, as shown in the :ref:`ports mapping `. Copy the configuration files to a temporary location and modify them as shown below to prevent conflicts with the destination cluster. #. Change directories to the location where |cp| is installed: .. code:: bash cd $CONFLUENT_HOME #. Copy ``etc/kafka/kraft/server.properties`` into the examples directory and rename it to match its purpose: .. code:: bash cp etc/kafka/kraft/server.properties my-examples/server_origin.properties #. Update the port numbers. .. code:: bash sed -i '' -e "s/9093/9071/g" my-examples/server_origin.properties .. code:: bash sed -i '' -e "s/9092/9082/g" my-examples/server_origin.properties .. code:: bash sed -i '' -e "s/8090/8091/g" my-examples/server_origin.properties .. code:: bash sed -i '' -e "s/#confluent.metadata.server.listeners/confluent.metadata.server.listeners/g" my-examples/server_origin.properties .. code:: bash sed -i '' -e "s/confluent.metrics.reporter.bootstrap.servers=localhost:9092/confluent.metrics.reporter.bootstrap.servers=localhost:9082/g" my-examples/server_origin.properties #. Update data directories. .. code:: bash sed -i '' -e "s/kraft-combined-logs/kraft-combined-logs-origin/g" my-examples/server_origin.properties #. Generate a ``random-uuid`` using the kafka-storage tool: .. code:: bash KAFKA_CLUSTER_ID="$(bin/kafka-storage random-uuid)" #. Format the log directories for this server: .. code:: bash ./bin/kafka-storage format -t $KAFKA_CLUSTER_ID -c my-examples/server_origin.properties #. Next, start a |ak| broker to serve as the single node |ak| cluster for origin. - Start |ak| by running this command in its own terminal. .. code:: bash ./bin/kafka-server-start my-examples/server_origin.properties .. group-tab:: |zk| mode While you configured the destination cluster to run on default ports, you will need to run the origin cluster on a different port to avoid collisions. The |ak| broker in the origin cluster is configured on port 9082 and |zk| is configured on 2171, as shown in the :ref:`ports mapping `. Copy the configuration files to your examples directory and modify them as shown below to prevent conflicts. #. Copy the config files to ``my-examples`` (or a directory of your choice). .. code:: bash cp etc/kafka/zookeeper.properties my-examples/zookeeper_origin.properties .. code:: bash cp etc/kafka/server.properties my-examples/server_origin.properties #. Update the port numbers. .. code:: bash sed -i '' -e "s/2181/2171/g" my-examples/zookeeper_origin.properties .. code:: bash sed -i '' -e "s/9092/9082/g" my-examples/server_origin.properties .. code:: bash sed -i '' -e "s/2181/2171/g" my-examples/server_origin.properties .. code:: bash sed -i '' -e "s/#listeners/listeners/g" my-examples/server_origin.properties .. code:: bash sed -i '' -e "s/8090/8091/g" my-examples/server_origin.properties .. code:: bash sed -i '' -e "s/#confluent.metadata.server.listeners/confluent.metadata.server.listeners/g" my-examples/server_origin.properties #. Update the broker ID for origin. .. code:: bash sed -i '' -e "s/broker.id=0/broker.id=1/g" my-examples/server_origin.properties .. tip:: This is not entirely necessary, as the brokers are in two different clusters, but it is nice to have unique broker IDs if you want to manage them with |ak| from the command line. #. Update data directories. .. code:: bash sed -i '' -e "s/zookeeper/zookeeper_origin/g" my-examples/zookeeper_origin.properties .. code:: bash sed -i '' -e "s/kafka-logs/kafka-logs-origin/g" my-examples/server_origin.properties #. From here, you can start up the origin cluster. - Start |zk| by running this command in its own terminal. .. code:: bash ./bin/zookeeper-server-start my-examples/zookeeper_origin.properties - Start |ak| by running this command in its own terminal. .. code:: bash ./bin/kafka-server-start my-examples/server_origin.properties Create a topic -------------- Bring up a new command window to run |ak| commands. Create a topic named "test-topic" in the origin cluster with the following command: :: kafka-topics --create --topic test-topic --replication-factor 1 --partitions 1 --bootstrap-server localhost:9082 .. tip:: Depending on your environment, you may have to use the ``.sh`` extension (for example, ``kafka-topics.sh``) for |ak| commands. You can verify that the topic was created as follows: :: kafka-topics --list --bootstrap-server localhost:9082 Your output should look similar to this (the ``_confluent`` topics are internal topics): :: __confluent.support.metrics _confluent-command test-topic When we configure and run |crep|, this "test-topic" will get replicated to the destination cluster (on port ``2181``) with the exact configuration we defined above. For the sake of this example, the test topic was created with just one partition. |crep| will work with any number of topics and partitions. .. _config-and-run-replicator: Configure and run Replicator ----------------------------- |crep-full| can run as an executable or as a Connector in the |kconnect-long| framework. For this quick start, start |crep| as an executable. ------------------------------------------------------------- Create consumer, producer, and replicator configuration files ------------------------------------------------------------- The |crep| executable script expects three configuration files: - Configuration for the origin cluster - Configuration for the destination cluster - |crep| configuration .. tip:: You can place these config files in any directory you choose, but these steps assume a path of ``$CONFLUENT_HOME/my-examples/``, so as not to conflict with the default properties files that ship with |cp|. (The default configs for ``producer.properties`` and ``consumer.properties`` are in ``etc/kafka/``, and ``replicator.properties`` is in ``etc/kafka-connect-replicator/``.) Create the following files in ``$CONFLUENT_HOME/my-examples/``: #. Configure the origin cluster in a new file named ``consumer.properties``. :: cp etc/kafka/consumer.properties my-examples/. Edit the file and make sure it contains the addresses of brokers from the **origin** cluster. The default broker list will match the origin cluster you started earlier. .. codewithvars:: bash # Origin cluster connection configuration bootstrap.servers=localhost:9082 #. Configure the destination cluster in a new file named ``producer.properties``. :: cp etc/kafka/producer.properties my-examples/. Edit the file and make sure it contains the addresses of brokers from the **destination** cluster. The default broker list will match the destination cluster you started earlier. .. codewithvars:: bash # Destination cluster connection configuration bootstrap.servers=localhost:9092 #. Define the |crep| configuration in a new file named ``replication.properties`` for the |kconnect| worker. This quick start shows a configuration for ``topic.rename.format`` but any of the :ref:`replicator_config_options` that are not connection related can be supplied in this file. .. codewithvars:: bash # Replication configuration topic.rename.format=${topic}.replica replication.factor=1 config.storage.replication.factor=1 offset.storage.replication.factor=1 status.storage.replication.factor=1 confluent.topic.replication.factor=1 .. tip:: - If no port is defined in ``replication.properties``, this worker runs on its default port ``8083``, which is the desired config for this deployment. - The replication factor properties (all set to ``1``) are used because these test clusters are small. The recommended minimum cluster size in production is ``3`` and this is the default for these properties. -------------------- Start the replicator -------------------- After you have created the necessary configuration files, start |crep| executable in its own terminal with the command below (assuming the properties files are in ``my-examples``). :: ./bin/replicator --cluster.id replicator --consumer.config my-examples/consumer.properties --producer.config my-examples/producer.properties --replication.config my-examples/replication.properties --whitelist 'test-topic' Some |crep| executable parameters can be passed on the command line: * ``--cluster.id`` - An identifier used to determine which |crep| cluster this executable should join. Multiple |crep| executable instances with the same ``cluster.id`` will work together. * ``--consumer.config`` - The path to the origin cluster configuration * ``--producer.config`` - The path to the destination cluster configuration * ``--replication.config`` - The path to a file containing any non connection specific configuration. Command line arguments will override these configurations. * ``--whitelist`` - A list of topics to replicate from origin to destination For a full list of command line options see :ref:`replicator_executable_command_line_parameters`. Look for success messages related to starting the source task and creating the replicated topic that indicate |crep| is up and running, and copying topics. -------------------------------------------- Verify topic replication across the clusters -------------------------------------------- When |crep| finishes initialization, it checks the origin cluster for topics that need to be replicated. In this case, it finds ``test-topic`` and creates the corresponding topic in the destination cluster. You can verify this with the following command. :: ./bin/kafka-topics --describe --topic test-topic.replica --bootstrap-server localhost:9092 Note that you are checking the existence of ``test-topic.replica`` because ``test-topic`` was renamed when it was replicated to the destination cluster, according to your configuration. Your output should look similar to this: :: ./bin/kafka-topics --describe --topic test-topic.replica --bootstrap-server localhost:9092 Topic: test-topic.replica PartitionCount: 1 ReplicationFactor: 1 Configs: message.timestamp.type=CreateTime,segment.bytes=1073741824 Topic: test-topic.replica Partition: 0 Leader: 0 Replicas: 0 Isr: 0 Offline: 0 You can also list and describe the topics on the destination cluster. Replicated topics, like ``test-topic.replica`` will be listed. :: ./bin/kafka-topics --list --bootstrap-server localhost:9092 .. tip:: - To list topics on the origin cluster, run ``kafka-topics --list`` against ``localhost:9082``. - To view a description of the original topic, run ``kafka-topics --describe`` but look for ``test-topic`` and target ``localhost:9082``. At any time after you've created the topic in the origin cluster, you can begin sending data to it using a |ak| producer to write to ``test-topic`` in the origin cluster. You can then confirm that the data has been replicated by consuming from ``test-topic.replica`` in the destination cluster. For example, to send a sequence of numbers using |ak|'s console producer, run the following command in a new terminal window:: seq 10000 | ./bin/kafka-console-producer --topic test-topic --broker-list localhost:9082 You can confirm delivery in the destination cluster using the console consumer in its own terminal window: :: ./bin/kafka-console-consumer --from-beginning --topic test-topic.replica --bootstrap-server localhost:9092 If the numbers 1 to 10,000 appear in the consumer output, this indicates that you have successfully created multi-cluster replication. Press ``Ctl-C`` to end the consumer readout and return to the command prompt. .. _rep-quickstart-monitoring: Use |c3-short| to monitor replicators ------------------------------------- You can use |c3-short| to monitor the replicators in your current deployment: #. Stop |crep| and brokers on both the origin and destination clusters, and then stop the |zk| instances (in that order). Press ``Ctl-C`` in the each command window to stop the processes, but keep the windows open to make it easy to restart each one. #. Activate the monitoring extension for |crep| by doing the following, as fully described in :ref:`replicator_monitoring_extension`. - Add the full path to ``replicator-rest-extension-.jar`` to your CLASSPATH. - Add ``rest.extension.classes=io.confluent.connect.replicator.monitoring.ReplicatorMonitoringExtension`` to ``my-examples/replication.properties``. #. Uncomment or add the following lines to the |ak| configuration files for `both` the destination and origin, ``my-examples/server_destination.properties`` and ``my-examples/server_origin.properties``, respectively. The configuration for ``confluent.metrics.reporter.bootstrap.servers`` must point to ``localhost`` on port ``9092`` in both files, so you may need to edit one or both of these port numbers. (Searching on ``confluent.metrics`` will take you to these lines in the files.) :: confluent.metrics.reporter.topic.replicas=1 metric.reporters=io.confluent.metrics.reporter.ConfluentMetricsReporter confluent.metrics.reporter.bootstrap.servers=localhost:9092 - The first line indicates to |c3-short| that your deployment is in development mode, using a replication factor of ``1``. - The other two lines enable metrics reporting on |c3-short|, and provide access to the Confluent internal topic that collects and stores the monitoring data. .. tip:: - For this example, the metrics reporter must point to the cluster that |c3| bootstraps to, which is the destination cluster. If this is not set properly, metrics on source topics will not show up in |c3-short|. This is why ``my-examples/server_destination.properties`` and ``my-examples/server_origin.properties`` must have the same configuration for ``confluent.metrics.reporter.bootstrap.servers=localhost:9092``. - When adapting these steps to more complex, real-world environments, you may decide to use a different approach. For example, in a deployment with multiple instances of |c3-short| for source and destination, each monitoring its own respective cluster, ``confluent.metrics.reporter.bootstrap.servers`` should point to source or destination, as appropriate. To learn more, see the scenarios for :ref:`config-c3-multi-cluster` with |c3-short|, :ref:`replicator_monitoring`, and :ref:`bmrr`. #. Edit ``my-examples/producer.properties`` to add the monitoring interceptor for the producer: .. codewithvars:: bash # Monitoring interceptor for producer interceptor.classes=io.confluent.monitoring.clients.interceptor.MonitoringProducerInterceptor #. Edit ``my-examples/consumer.properties`` to add the monitoring interceptor for the consumer: .. codewithvars:: bash # Monitoring interceptor for consumer interceptor.classes=io.confluent.monitoring.clients.interceptor.MonitoringConsumerInterceptor #. Edit ``etc/confluent-control-center/control-center-dev.properties`` to add the following two lines that specify origin and destination bootstrap servers for |c3-short|, as is required for monitoring multiple clusters. (A convenient place to add these lines is near the top of the file under "Control Center Settings", immediately after the line that specifies ``confluent.controlcenter.id``.) .. codewithvars:: bash # multi-cluster monitoring confluent.controlcenter.kafka.origin.bootstrap.servers=localhost:9082 confluent.controlcenter.kafka.destination.bootstrap.servers=localhost:9092 .. tip:: - |c3-short| requires the host and port of the |kconnect| REST endpoint to know where to look for |crep| monitoring metrics. In the config file used for this example (``control-center-dev.properties``), this is configured for you on the default port, and so works out-of-the-box: .. codewithvars:: bash # A comma separated list of Connect host names confluent.controlcenter.connect.cluster=http://localhost:8083 - The production-ready config file (``control-center-production.properties``) has the default commented out. If you use this file instead, have multiple Connectors, or want to configure |kconnect| clusters differently, you must specify the |kconnect| endpoint(s), either by uncommenting the default or specifying hosts for your own |kconnect| clusters. To learn more, see :ref:`Control Center Configuration Reference ` descriptions for ``confluent.controlcenter.connect..cluster`` and ``confluent.controlcenter.connect.cluster`` (deprecated). - If you are running both |crep| and a |kconnect| cluster in your deployment. You must specify these separately: - |kconnect| .. code:: bash cluster: confluent.controlcenter.connect..cluster=http://connect-host-1:8083 - |crep| .. code:: bash confluent.controlcenter.connect..cluster=http://replicator-host:8083 #. Restart the |zk| instances on the destination and origin clusters with the same commands used above, for example: :: ./bin/zookeeper-server-start etc/kafka/zookeeper.properties :: ./bin/zookeeper-server-start my-examples/zookeeper_origin.properties #. Restart the brokers on the destination and origin clusters with the same commands used above, for example: :: ./bin/kafka-server-start my-examples/server_destination.properties :: ./bin/kafka-server-start my-examples/server_origin.properties #. Restart |crep| and the |kconnect| worker with the same command as above. For example: :: ./bin/replicator --cluster.id replicator --consumer.config my-examples/consumer.properties --producer.config my-examples/producer.properties --replication.config my-examples/replication.properties --whitelist 'test-topic' #. Launch |c3-short| with the following command. :: ./bin/control-center-start etc/confluent-control-center/control-center-dev.properties If no port is defined in ``control-center-dev.properties``, |c3-short| runs by default on port ``9021``, as described in :ref:`control_center`. This is the desired config for this deployment. #. Open |c3-short| at `http://localhost:9021/ `_ in your web browser. The clusters are rendered on |c3-short| with auto-generated names, based on your configuration. .. figure:: ../../images/c3-replicators-multi-cluster.png :scale: 65% #. (Optional) On |c3-short|, edit the cluster names to suit your use case, as described in :ref:`rep-rename-C3-displayed-cluster-names` in "Replicators" in the |c3-short| User Guide. #. On |c3-short|, select the destination cluster, click **Replicators** on the navigation panel, and use |c3-short| to monitor replication performance and drill down on source and replicated topics. .. figure:: ../../images/c3-replicators-all.png :scale: 75% To see messages produced to both the original and replicated topic on |c3-short|, try out ``kafka-consumer-perf-test`` in its own command window to auto-generate test data to ``test-topic``. :: kafka-producer-perf-test \ --producer-props bootstrap.servers=localhost:9082 \ --topic test-topic \ --record-size 1000 \ --throughput 1000 \ --num-records 3600000 The command provides status output on messages sent, as shown: :: 4999 records sent, 999.8 records/sec (0.95 MB/sec), 1.1 ms avg latency, 240.0 ms max latency. 5003 records sent, 1000.2 records/sec (0.95 MB/sec), 0.5 ms avg latency, 4.0 ms max latency. 5003 records sent, 1000.2 records/sec (0.95 MB/sec), 0.6 ms avg latency, 5.0 ms max latency. 5001 records sent, 1000.2 records/sec (0.95 MB/sec), 0.3 ms avg latency, 3.0 ms max latency. 5001 records sent, 1000.0 records/sec (0.95 MB/sec), 0.3 ms avg latency, 4.0 ms max latency. 5000 records sent, 1000.0 records/sec (0.95 MB/sec), 0.8 ms avg latency, 24.0 ms max latency. 5001 records sent, 1000.2 records/sec (0.95 MB/sec), 0.6 ms avg latency, 3.0 ms max latency. ... Like before, you can consume these messages from the command line, using kafka-console-consumer to verify that the replica topic is receiving them: :: ./bin/kafka-console-consumer --from-beginning --topic test-topic.replica --bootstrap-server localhost:9092 You can also verify this on |c3-short|. Navigate to ``test-topic`` on the origin cluster to view messages on the original topic, and to ``test-topic.replica`` on the destination to view messages on the replicated topic. .. figure:: ../../images/c3-replicator-topic-drilldown-messages.png :scale: 70% #. To learn more about monitoring Replicators in |c3-short|, see :ref:`"Replicators" in Control Center User Guide `. #. When you have completed your experiments with the tutorial, be sure to perform clean up as follows: - Stop any producers and consumers using ``Ctl-C`` in the each command window. - Use ``Ctl-C`` in each command window to stop each service in reverse order to which you started them (stop |c3-short| first, then |crep|, |ak| brokers, and finally ZooKeepers). Troubleshooting --------------- If you run into trouble at any point with getting things up and running or with |c3-short| monitoring of replicators, here are some things to check: - Make sure the configurations in all properties files are correct, and port numbers match origin and destination ports as described in :ref:`replicator_tutorial_ports_map`. - For monitoring with |c3-short|, make sure that your configurations match the monitoring requirements per the steps in :ref:`rep-quickstart-monitoring`. If you are using the production-ready |c3-short| configuration file instead of the "dev" version shown in this example, make sure you have specified the |kconnect| endpoint per the tip in :ref:`rep-quickstart-monitoring`. - Verify that the monitoring extension is installed per :ref:`replicator_monitoring_extension` and is in your CLASSPATH, especially in the shells where you start the |ak| brokers. Check this by running ``echo $CLASSPATH`` in open command windows. - If you are using the ``systemctl`` command to start the monitoring service, make sure you follow the steps in :ref:`replicator-monitoring-systemctl-command`. If you don't configure the environment variables properly, |kconnect| will fail to start. - To retry the tutorial: 1. Use ``Ctl-C`` in each command window to stop each service in reverse order to which you started them (stop |c3-short| first, then |crep|, |ak| brokers, and finally ZooKeepers). 2. Delete stale log and data files in ``/tmp`` that may conflict with a new run of the clusters and topics. For example, remove these files: ``/tmp/confluent/control-center/``, ``/tmp/zookeeper``, ``/tmp/zookeeper_origin``, ``/tmp/kafka-logs``, ``/tmp/kafka-logs-origin``, ``/tmp/control-center-logs`` 3. From here, you can start again with your current install of |cp| or even :ref:`reinstall ` |cp| and try the entire process from scratch. Teardown -------- Run shutdown and cleanup tasks. #. Stop |crep| with Ctl-C in its command window. #. Stop all of the other components with Ctl-C in their respective command windows, in reverse order in which you started them. - For |kraft| mode, stop the origin server first (the node 2 broker), then stop the destination server (combined controller and node 1 broker). - For |zk| mode, stop the |ak| brokers, then their respective ZooKeepers. #. Delete the log directories from ``/tmp``. This will clear out the metadata from your system and enable you to configure and run new local deployments with no collisions with the legacy metadata. For example, if you used |kraft|: - Delete the logs for the destination cluster: .. code:: bash rm -rf /tmp/kraft-combined-logs - Delete the logs for the origin cluster: .. code:: bash rm -rf /tmp/kraft-combined-logs-origin .. _rep-tutorial-troubleshoot: Suggested Reading ----------------- - :ref:`mdc_replicator_demos` - :ref:`"Replicators" in Control Center User Guide ` - :ref:`Replicator Monitoring Setup and Reference ` - :ref:`controlcenter_clients`