.. _replicator_monitoring: Monitor |crep| for |cp| ======================= The recommended way to monitor Replicator is with |c3| and the |crep| monitoring extension. For more information regarding the monitoring extension see :ref:`replicator_monitoring_extension`. Additionally the following tools can be used to monitor |crep|: * Replicator metrics * Consumer metrics * Producer metrics * Replication lag .. _replicator_monitoring_extension: |crep| monitoring extension --------------------------- The |crep-full| Monitoring Extension allows for detailed metrics from |crep| tasks to be collected using an exposed REST API. These endpoints provide the following information: * Throughput - the number of messages replicated per second. * Message Lag - the number of messages that have been produced to the origin cluster that have not yet been replicated to the destination. * Latency - the average time period between message production to the origin cluster and message production to the destination cluster. The metrics are broken down by connector, task and topic/partition. These metrics have been designed to integrate with Control Center to provide a centralized view of replication within your clusters. The |crep| latency metric does not support reporting latency for |crep| when replicating historical data. It only supports reporting latency when data is being replicated in real time. So when replicating very old data, you can disregard the latency reported by |crep|. Install ^^^^^^^ The Monitoring extension is installed with |crep|. For more information regarding installation see :ref:`replicator_install`. Activate the extension ^^^^^^^^^^^^^^^^^^^^^^ After the installation is complete, you must configure |kconnect-long| to recognize the installed extension. First add the extension library to the |kconnect| worker classpath by setting the ``CLASSPATH`` environment variable. .. codewithvars:: bash export CLASSPATH=/replicator-rest-extension-.jar .. tip:: - The location of this REST extension JAR file is dependent on your platform and type of :ref:`Confluent install`. For example, on Mac OS install with zip.tar, by default, ``replicator-rest-extension-.jar`` is in the Confluent directory in ``/share/java/kafka-connect-replicator/``. If you install with the default `Ansible `__ settings, the JAR file is located in ``/usr/share/java/kafka-connect-replicator/``. - To make sure the CLASSPATH sticks, you might want to add it to your shell configuration file (for example, ``.bash_profile``, ``.bashrc``, ``.zshrc``), source the profile in any open command shells, and check it with ``echo $CLASSPATH``. Then, to activate the extension, add the following property to the list of extension classes in the configuration file that you use to start |crep| (for example, ``my-examples/replication.properties``, ``etc/kafka/replication.properties``, or ``/etc/kafka/connect-distributed.properties``): .. codewithvars:: bash rest.extension.classes=io.confluent.connect.replicator.monitoring.ReplicatorMonitoringExtension With the extension is activated, you can use |c3-short| to monitor replicators. See :ref:`rep-quickstart-monitoring` in the Replicator Tutorial and :ref:`Replicators ` in the |c3-short| User Guide. Additional configuration for secure endpoints ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^ When the |kconnect| distributed cluster hosting |crep| has a REST endpoint with TLS/SSL authentication enabled (https), you must configure security properties for the TLS/SSL keystore and truststore used by the |crep| monitoring extension to communicate with other |kconnect| nodes in the cluster. These are set with the following environment variables at the JVM level: .. codewithvars:: bash export KAFKA_OPTS: -Djavax.net.ssl.trustStore=/etc/kafka/secrets/kafka.connect.truststore.jks -Djavax.net.ssl.trustStorePassword= -Djavax.net.ssl.keyStore=/etc/kafka/secrets/kafka.connect.keystore.jks -Djavax.net.ssl.keyStorePassword= .. include:: ../../includes/cp-demo-tip.rst You should also set ``listeners.https.`` properties, as specified at :ref:`here `, for example: .. codewithvars:: bash "listeners.https.ssl.keystore.location": "/etc/kafka/connect.jks" "listeners.https.keystore.password": "xxxxx" .. _replicator-monitoring-systemctl-command: systemctl command configurations ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^ **If you are not using ``systemctl``, skip these steps.** If you are using the ``systemctl`` command to start the monitoring service, you must configure the environment variables using ``sudo systemctl edit confluent-kafka-connect``, or edit file ``/etc/systemd/system/confluent-kafka-connect.service.d/``. If you use this method and don't configure the environment variables properly for ``systemctl``, |kconnect| will fail to start. The steps for this use case are as follows. #. As previously described, activate the extension by adding the following property in the configuration file that you use to start |crep| as previously described (for example, ``my-examples/replication.properties``, ``etc/kafka/replication.properties``, or ``/etc/kafka/connect-distributed.properties``): .. code:: bash rest.extension.classes=io.confluent.connect.replicator.monitoring.ReplicatorMonitoringExtension #. Configure environment variables using either ``sudo systemctl edit confluent-kafka-connect``, or by editing the file ``/etc/systemd/system/confluent-kafka-connect.service.d/`` .. code:: properties [Service] Environment="CLASSPATH=/replicator-rest-extension-6.0.0.jar" #. Reload systemd configuration and restart the component. .. code:: bash sudo systemctl reload sudo systemctl restart confluent-kafka-connect |crep| monitoring extension API reference ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^ .. http:get:: /WorkerMetrics/(string: connector_name) Get Metrics for tasks running on this |kconnect| worker for a given connector. :param string connector_name: the name of the connector **Example request**: .. sourcecode:: http GET /WorkerMetrics/replicate-topic HTTP/1.1 Host: kafkaconnect.example.com **Example response**: .. sourcecode:: http HTTP/1.1 200 OK Content-Type: application/json { "connectorName": "replicate-topic", "srcClusterId": "qhS9FOVFQkmokuIvo1ZZMA", "destClusterId": "qhS9FOVFQkmokuIvo1ZZMA", "tasks": [{ "id": "replicate-topic-0", "workerId": null, "state": null, "metrics": [{ "timestamp": 1570742038165, "throughput": 0.1075268817204301, "messageLag": 0.0, "latency": 12.764705882352942, "srcTopic": "wikipedia.parsed", "srcPartition": 1, "destTopic": "wikipedia.parsed.replica" }, { "timestamp": 1570742038166, "throughput": 0.10976948408342481, "messageLag": 0.0, "latency": 16.433333333333334, "srcTopic": "wikipedia.parsed", "srcPartition": 0, "destTopic": "wikipedia.parsed.replica" }] }] } .. http:get:: /ReplicatorMetrics Get Metrics for all Replicators running in this |kconnect| cluster. **Example request**: .. sourcecode:: http GET /ReplicatorMetrics HTTP/1.1 Host: kafkaconnect.example.com **Example response**: .. sourcecode:: http HTTP/1.1 200 OK Content-Type: application/json { "connectors": [{ "name": "replicate-topic", "srcClusterBootstrapServers": "kafka1:9091", "destClusterBootstrapServers": "kafka1:9091", "srcClusterId": "qhS9FOVFQkmokuIvo1ZZMA", "destClusterId": "qhS9FOVFQkmokuIvo1ZZMA", "tasks": [{ "id": "0", "workerId": "connect:8083", "state": "RUNNING", "metrics": [{ "timestamp": 1570742222631, "throughput": 0.11024142872891633, "messageLag": 0.0, "latency": 34.86666666666667, "srcTopic": "wikipedia.parsed", "srcPartition": 1, "destTopic": "wikipedia.parsed.replica" }, { "timestamp": 1570742222631, "throughput": 0.10187449062754686, "messageLag": 0.0, "latency": 15.307692307692308, "srcTopic": "wikipedia.parsed", "srcPartition": 0, "destTopic": "wikipedia.parsed.replica" }] }] }] } .. _replicator_metrics: Monitoring JMX metrics ---------------------- Like |ak| brokers, |kconnect-long| reports metrics via JMX. To monitor |kconnect| and |crep|, set the ``JMX_PORT`` environment variable before starting the Connect Workers. Then collect the reported metrics using your usual monitoring tools. JMXTrans, Graphite and Grafana are a popular combination for collecting and reporting JMX metrics from |ak|. When you look at the metrics reported using JMX, you will see that Connect exposes Replicator's consumer metrics and Connect's producer metrics. You can view the full list of metrics in :ref:`kafka_monitoring`. Here are some of the important metrics and their significance. Important |crep| metrics ^^^^^^^^^^^^^^^^^^^^^^^^ **MBean: confluent.replicator:type=confluent-replicator-task-metrics,confluent-replicator-task=([-.\w]+),confluent-replicator-task-topic-partition=([-.\w]+),confluent-replicator-name=([-.\w]+),confluent-replicator-topic-name=([-.\w]+)** ``confluent-replicator-source-cluster`` The id of the source cluster from which |crep| is replicating. ``confluent-replicator-destination-cluster`` The id of the destination cluster to which |crep| is replicating. ``confluent-replicator-destination-topic-name`` The name of the destination topic to which |crep| is replicating. ``confluent-replicator-task-topic-partition-message-lag`` The average number of messages that were produced to the origin cluster, but have not yet arrived to the destination cluster. This metric will report NaN if no new records are replicated for the partition for some time. ``confluent-replicator-task-topic-partition-throughput`` The number of messages replicated per second from the source to destination cluster. ``confluent-replicator-task-topic-partition-byte-throughput`` The number of bytes replicated per second from the source to destination cluster. ``confluent-replicator-task-topic-partition-latency`` The average time between message production to the source cluster and message production to the destination cluster. Important producer metrics ^^^^^^^^^^^^^^^^^^^^^^^^^^ **MBean: kafka.producer:type=producer-metrics,client-id=([-.w]+)** ``io-ratio`` or ``io-wait-ratio`` If the ``io-ratio`` is low or ``io-wait-ratio`` is high, this means the producer is not very busy and is unlikely to be a bottleneck. ``outgoing-byte-rate`` Reports the producer throughput when writing to destination |ak|. ``batch-size-avg`` and ``batch-size-max`` If they are consistently close to the configured ``batch.size``, you may be producing as fast as possible and you'll want to increase the batch size to get better batching. ``record-retry-rate`` and ``record-error-rate`` The average per-second number of retried record sends and failed record sends for a topic. High number of those can indicate issues writing to the destination cluster. ``produce-throttle-time-avg`` and ``produce-throttle-time-max`` Produce requests may be throttled to meet quotas configured on the destination cluster. If these are non-zero, it indicates that the destination brokers are slowing the producer down and the quotas configuration should be reviewed. For more information on quotas see :ref:`quotas`. ``waiting-threads`` and ``bufferpool-wait-time`` Non-zero values here indicate memory pressure. Connect producers can't send events fast enough, resulting in full memory buffers that cause Replicator threads to block. Important consumer metrics ^^^^^^^^^^^^^^^^^^^^^^^^^^ **MBean: kafka.consumer:type=consumer-fetch-manager-metrics,client-id=([-.w]+)** ``io-ratio`` or ``io-wait-ratio`` If the ``io-ratio`` is low or ``io-wait-ratio`` is high, this means the consumer is not very busy and is unlikely to be a bottleneck. ``bytes-consumed-rate`` Indicates throughput of Replicator reading events from origin cluster. ``fetch-size-avg`` and ``fetch-size-max`` If they are close to the configured maximum fetch size consistently, it means that |crep| is reading at the maximum possible rate. Increase the maximum fetch size and check if the throughput per task is improved. ``records-lag-max`` The maximum lag in terms of number of records for any partition. An increasing value over time indicates that |crep| is not keeping up with the rate at which events are written to the origin cluster. ``fetch-rate``, ``fetch-size-avg`` and ``fetch-size-max`` If fetch-rate is high but fetch-size-avg and fetch-size-max are not close to the maximum configured fetch size, perhaps the consumer is "churning". Try increasing the ``fetch.min.bytes`` and ``fetch.max.wait`` configuration. This can help the consumer batch more efficiently. ``fetch-throttle-time-max`` and ``fetch-throttle-time-avg`` Fetch requests may be throttled to meet quotas configured on the origin cluster. If these are non-zero, it indicates that the origin brokers are slowing the consumer down and the quotas configuration should be reviewed. For more information on quotas see :ref:`quotas` .. _monitor-replicator-lag: Monitoring |crep| lag (Legacy versions only) -------------------------------------------- .. important:: - For current versions of |crep|, it is recommended that you use the previously mentioned JMX metrics to monitor |crep| lag as it is more accurate than using the consumer group lag tool. The following methodology to monitor |crep| lag is only recommended if you are using |crep| with a legacy version below 5.4.0. - |crep| latency is calculated by taking the timestamp of the record consumed on the source and subtracting that from when the message offset to the destination is flushed. If old records are processed or if the time setting on the source records is not the same for producers and consumers, then the metric will spike. This is misleading and should not be construed as a latency issue, but rather is a limitation of this type of metrics calculation. You can monitor Replicator lag by using the `Consumer Group Command tool `__ (``kafka-consumer-groups``). To use this functionality, you must set the Replicator ``offset.topic.commit`` config to ``true`` (the default value). |crep| does not consume using a consumer group, instead it manually assigns partitions. When ``offset.topic.commit`` is true, |crep| commits consumer offsets (again manually), but these are for reference only and do not represent an active consumer group. Since |crep| only commits offsets and does not actually form a consumer group, the ``kafka-consumer-groups`` command output will show no active members in the group (correctly); only the committed offsets. This is expected behavior for |crep|. To check membership information, use |kconnect| status endpoints rather than ``kafka-consumer-groups``. Replication lag is the number of messages that were produced to the origin cluster, but have not yet arrived to the destination cluster. It can also be measured as the amount of time it currently takes for a message to get replicated from origin to destination. Note that this can be higher than the latency between the two datacenters if Replicator is behind for some reason and needs time to catch up. The main reasons to monitor replication lag are: * If there is a need to failover from origin to destination and *if the origin cannot be restored*, all events that were produced to origin and not replicated to the target will be lost. (If the origin *can* be restored, the events will not be lost.) * Any event processing that happens at the destination will be delayed by the lag. The lag is typically just a few hundred milliseconds (depending on the network latency between the two datacenters), but it can grow larger if network partitions or configuration changes temporarily pause replication and the replicator needs to catch up. If the replication lag keeps growing, it indicates that |crep| throughput is lower than what gets produced to the origin cluster and that additional |crep| tasks or |kconnect| Workers are necessary. For example, if producers are writing 100 MBps to the origin cluster, but the |crep| only replicates 50 MBps. To increase the throughput, the TCP socket buffer should be increased on the Replicator and the brokers. When |crep| is running in the destination cluster (recommended), you must also increase the following: - The TCP send socket buffer (``socket.send.buffer.bytes``) on the source cluster brokers. - The receive TCP socket buffer (``socket.receive.buffer.bytes``) on the consumers. A value of 512 KB is reasonable but you may want to experiment with values up to 12 MB. If you are using Linux, you might need to change the default socket buffer maximum for the |ak| settings to take effect. For more information about tuning your buffers, `see this article `__.