Monitoring Replicator

The recommended way to monitor Replicator is with Confluent Control Center and the Replicator monitoring extension. For more information regarding the monitoring extension see Replicator Monitoring Extension.

Additionally the following tools can be used to monitor Replicator:

  • Replicator metrics
  • Consumer metrics
  • Producer metrics
  • Replication lag

Replicator Monitoring Extension

The Confluent Replicator Monitoring Extension allows for detailed metrics from Replicator tasks to be collected using an exposed REST API. These endpoints provide the following information:

  • Throughput - the number of messages replicated per second.
  • Message Lag - the number of messages that have been produced to the origin cluster that have not yet been replicated to the destination.
  • Latency - the average time period between message production to the origin cluster and message production to the destination cluster.

The metrics are broken down by connector, task and topic/partition.

These metrics have been designed to integrate with Control Center to provide a centralized view of replication within your clusters.

The Replicator latency metric does not support reporting latency for Replicator when replicating historical data. It only supports reporting latency when data is being replicated in real time. So when replicating very old data, you can disregard the latency reported by Replicator.

Install

The Monitoring extension is installed with Replicator. For more information regarding installation see Download and Install Replicator.

Activate the Extension

After the installation is complete, you must configure Kafka Connect to recognize the installed extension. First add the extension library to the Connect worker classpath by setting the CLASSPATH environment variable.

export CLASSPATH=<path to replicator install>/replicator-rest-extension-<version>.jar

Tip

  • The location of this REST extension JAR file is dependent on your platform and type of Confluent install. For example, on Mac OS install with zip.tar, by default, replicator-rest-extension-<version>.jar is in the Confluent directory in /share/java/kafka-connect-replicator/. If you install with the default Ansible settings, the JAR file is located in /usr/share/java/kafka-connect-replicator/.
  • To make sure the CLASSPATH sticks, you might want to add it to your shell configuration file (for example, .bash_profile, .bashrc, .zshrc), source the profile in any open command shells, and check it with echo $CLASSPATH.

Then, to activate the extension, add the following property to the list of extension classes in the configuration file that you use to start Replicator (for example, my-examples/replication.properties, etc/kafka/replication.properties, or /etc/kafka/connect-distributed.properties):

rest.extension.classes=io.confluent.connect.replicator.monitoring.ReplicatorMonitoringExtension

With the extension is activated, you can use Control Center to monitor replicators. See Use Control Center to monitor replicators in the Replicator Tutorial and Replicators in the Control Center User Guide.

Additional Configuration for Secure Endpoints

When the Connect distributed cluster hosting Replicator has a REST endpoint with SSL authentication enabled (https), you must configure security properties for the SSL keystore and truststore used by the Replicator monitoring extension to communicate with other Connect nodes in the cluster. These are set with the following environment variables at the JVM level:

export KAFKA_OPTS: -Djavax.net.ssl.trustStore=/etc/kafka/secrets/kafka.connect.truststore.jks
                   -Djavax.net.ssl.trustStorePassword=<password>
                   -Djavax.net.ssl.keyStore=/etc/kafka/secrets/kafka.connect.keystore.jks
                   -Djavax.net.ssl.keyStorePassword=<password>

See also

For an example that shows this in action, see the Confluent Platform demo. Refer to the demo’s docker-compose.yml file for a configuration reference.

You should also set listeners.https. properties, as specified at here, for example:

"listeners.https.ssl.keystore.location": "/etc/kafka/connect.jks"
"listeners.https.keystore.password": "xxxxx"

systemctl Command Configurations

Tip

If you are not using systemctl, skip these steps.

If you are using the systemctl command to start the monitoring service, you must configure the environment variables using sudo systemctl edit confluent-kafka-connect, or edit file /etc/systemd/system/confluent-kafka-connect.service.d/<service.conf>. If you use this method and don’t configure the environment variables properly for systemctl, Connect will fail to start.

The steps for this use case are as follows.

  1. As previously described, activate the extension by adding the following property in the configuration file that you use to start Replicator as previously described (for example, my-examples/replication.properties, etc/kafka/replication.properties, or /etc/kafka/connect-distributed.properties):

    rest.extension.classes=io.confluent.connect.replicator.monitoring.ReplicatorMonitoringExtension
    
  2. Configure environment variables using either sudo systemctl edit confluent-kafka-connect, or by editing the file /etc/systemd/system/confluent-kafka-connect.service.d/<service.conf>

    [Service]
    Environment="CLASSPATH=<path to replicator install>/replicator-rest-extension-6.0.0.jar"
    
  3. Reload systemd configuration and restart the component.

    sudo systemctl reload
    sudo systemctl restart confluent-kafka-connect
    

Replicator Monitoring Extension API Reference

GET /WorkerMetrics/(string: connector_name)

Get Metrics for tasks running on this Connect worker for a given connector.

Parameters:
  • connector_name (string) – the name of the connector

Example request:

GET /WorkerMetrics/replicate-topic HTTP/1.1
Host: kafkaconnect.example.com

Example response:

HTTP/1.1 200 OK
Content-Type: application/json

{
   "connectorName": "replicate-topic",
   "srcClusterId": "qhS9FOVFQkmokuIvo1ZZMA",
   "destClusterId": "qhS9FOVFQkmokuIvo1ZZMA",
   "tasks": [{
     "id": "replicate-topic-0",
     "workerId": null,
     "state": null,
     "metrics": [{
        "timestamp": 1570742038165,
        "throughput": 0.1075268817204301,
        "messageLag": 0.0,
        "latency": 12.764705882352942,
        "srcTopic": "wikipedia.parsed",
        "srcPartition": 1,
        "destTopic": "wikipedia.parsed.replica"
     }, {
        "timestamp": 1570742038166,
        "throughput": 0.10976948408342481,
        "messageLag": 0.0,
        "latency": 16.433333333333334,
        "srcTopic": "wikipedia.parsed",
        "srcPartition": 0,
        "destTopic": "wikipedia.parsed.replica"
     }]
   }]
}
GET /ReplicatorMetrics

Get Metrics for all Replicators running in this Connect cluster.

Example request:

GET /ReplicatorMetrics HTTP/1.1
Host: kafkaconnect.example.com

Example response:

HTTP/1.1 200 OK
Content-Type: application/json

{
  "connectors": [{
    "name": "replicate-topic",
    "srcClusterBootstrapServers": "kafka1:9091",
    "destClusterBootstrapServers": "kafka1:9091",
    "srcClusterId": "qhS9FOVFQkmokuIvo1ZZMA",
    "destClusterId": "qhS9FOVFQkmokuIvo1ZZMA",
    "tasks": [{
      "id": "0",
      "workerId": "connect:8083",
      "state": "RUNNING",
      "metrics": [{
        "timestamp": 1570742222631,
        "throughput": 0.11024142872891633,
        "messageLag": 0.0,
        "latency": 34.86666666666667,
        "srcTopic": "wikipedia.parsed",
        "srcPartition": 1,
        "destTopic": "wikipedia.parsed.replica"
      }, {
        "timestamp": 1570742222631,
        "throughput": 0.10187449062754686,
        "messageLag": 0.0,
        "latency": 15.307692307692308,
        "srcTopic": "wikipedia.parsed",
        "srcPartition": 0,
        "destTopic": "wikipedia.parsed.replica"
      }]
    }]
  }]
}

Monitoring JMX Metrics

Like Kafka brokers, Kafka Connect reports metrics via JMX. To monitor Connect and Replicator, set the JMX_PORT environment variable before starting the Connect Workers. Then collect the reported metrics using your usual monitoring tools. JMXTrans, Graphite and Grafana are a popular combination for collecting and reporting JMX metrics from Kafka.

When you look at the metrics reported using JMX, you will see that Connect exposes Replicator’s consumer metrics and Connect’s producer metrics. You can view the full list of metrics in Monitoring Kafka with JMX. Here are some of the important metrics and their significance.

Important Replicator Metrics

MBean: confluent.replicator:type=confluent-replicator-task-metrics,confluent-replicator-task=([-.w]+),confluent-replicator-task-topic-partition=([-.w]+),confluent-replicator-name=([-.w]+),confluent-replicator-topic-name=([-.w]+)

confluent-replicator-source-cluster
The id of the source cluster from which Replicator is replicating.
confluent-replicator-destination-cluster
The id of the destination cluster to which Replicator is replicating.
confluent-replicator-destination-topic-name
The name of the destination topic to which Replicator is replicating.
confluent-replicator-task-topic-partition-message-lag
The average number of messages that were produced to the origin cluster, but have not yet arrived to the destination cluster. This metric will report NaN if no new records are replicated for the partition for some time.
confluent-replicator-task-topic-partition-throughput
The number of messages replicated per second from the source to destination cluster.
confluent-replicator-task-topic-partition-byte-throughput
The number of bytes replicated per second from the source to destination cluster.
confluent-replicator-task-topic-partition-latency
The average time between message production to the source cluster and message production to the destination cluster.

Important Producer Metrics

MBean: kafka.producer:type=producer-metrics,client-id=([-.w]+)

io-ratio or io-wait-ratio
If the io-ratio is low or io-wait-ratio is high, this means the producer is not very busy and is unlikely to be a bottleneck.
outgoing-byte-rate
Reports the producer throughput when writing to destination Kafka.
batch-size-avg and batch-size-max
If they are consistently close to the configured batch.size, you may be producing as fast as possible and you’ll want to increase the batch size to get better batching.
record-retry-rate and record-error-rate
The average per-second number of retried record sends and failed record sends for a topic. High number of those can indicate issues writing to the destination cluster.
produce-throttle-time-avg and produce-throttle-time-max
Produce requests may be throttled to meet quotas configured on the destination cluster. If these are non-zero, it indicates that the destination brokers are slowing the producer down and the quotas configuration should be reviewed. For more information on quotas see Enforcing client quotas.
waiting-threads and bufferpool-wait-time
Non-zero values here indicate memory pressure. Connect producers can’t send events fast enough, resulting in full memory buffers that cause Replicator threads to block.

Important Consumer Metrics

MBean: kafka.consumer:type=consumer-fetch-manager-metrics,client-id=([-.w]+)

io-ratio or io-wait-ratio
If the io-ratio is low or io-wait-ratio is high, this means the consumer is not very busy and is unlikely to be a bottleneck.
bytes-consumed-rate
Indicates throughput of Replicator reading events from origin cluster.
fetch-size-avg and fetch-size-max
If they are close to the configured maximum fetch size consistently, it means that Replicator is reading at the maximum possible rate. Increase the maximum fetch size and check if the throughput per task is improved.
records-lag-max
The maximum lag in terms of number of records for any partition. An increasing value over time indicates that Replicator is not keeping up with the rate at which events are written to the origin cluster.
fetch-rate, fetch-size-avg and fetch-size-max
If fetch-rate is high but fetch-size-avg and fetch-size-max are not close to the maximum configured fetch size, perhaps the consumer is “churning”. Try increasing the fetch.min.bytes and fetch.max.wait configuration. This can help the consumer batch more efficiently.
fetch-throttle-time-max and fetch-throttle-time-avg
Fetch requests may be throttled to meet quotas configured on the origin cluster. If these are non-zero, it indicates that the origin brokers are slowing the consumer down and the quotas configuration should be reviewed. For more information on quotas see Enforcing client quotas

Monitoring Replicator Lag

Important

For Replicator with version 5.4.0 and above, it is recommended that you use the previously mentioned JMX metrics to monitor Replicator lag as it is more accurate than using the consumer group lag tool. The following methodology to monitor Replicator lag is only recommended if you are using Replicator with a version below 5.4.0.

You can monitor Replicator lag by using the Consumer Group Command tool (kafka-consumer-groups). To use this functionality, you must set the Replicator offset.topic.commit config to true (the default value).

Tip

Replicator does not consume using a consumer group, instead it manually assigns partitions. When offset.topic.commit is true, Replicator commits consumer offsets (again manually), but these are for reference only and do not represent an active consumer group. Since Replicator only commits offsets and does not actually form a consumer group, the kafka-consumer-groups command output will show no active members in the group (correctly); only the committed offsets. This is expected behavior for Replicator. To check membership information, use Connect status endpoints rather than kafka-consumer-groups.

Replication lag is the number of messages that were produced to the origin cluster, but have not yet arrived to the destination cluster. It can also be measured as the amount of time it currently takes for a message to get replicated from origin to destination. Note that this can be higher than the latency between the two datacenters if Replicator is behind for some reason and needs time to catch up.

The main reasons to monitor replication lag are:

  • If there is a need to failover from origin to destination and if the origin cannot be restored, all events that were produced to origin and not replicated to the target will be lost. (If the origin can be restored, the events will not be lost.)
  • Any event processing that happens at the destination will be delayed by the lag.

The lag is typically just a few hundred milliseconds (depending on the network latency between the two datacenters), but it can grow larger if network partitions or configuration changes temporarily pause replication and the replicator needs to catch up. If the replication lag keeps growing, it indicates that Replicator throughput is lower than what gets produced to the origin cluster and that additional Replicator tasks or Connect Workers are necessary. For example, if producers are writing 100 MBps to the origin cluster, but the Replicator only replicates 50 MBps.

Tip

To increase the throughput, the TCP socket buffer should be increased on the Replicator and the brokers. When Replicator is running in the destination cluster (recommended), you must also increase the following:

  • The TCP send socket buffer (socket.send.buffer.bytes) on the source cluster brokers.
  • The receive TCP socket buffer (socket.receive.buffer.bytes) on the consumers. A value of 512 KB is reasonable but you may want to experiment with values up to 12 MB.

If you are using Linux, you might need to change the default socket buffer maximum for the Kafka settings to take effect. For more information about tuning your buffers, see this article.