Monitor Replicator for Confluent Platform¶
The recommended way to monitor Replicator is with Confluent Control Center and the Replicator monitoring extension. For more information regarding the monitoring extension see Replicator monitoring extension.
Additionally the following tools can be used to monitor Replicator:
- Replicator metrics
- Consumer metrics
- Producer metrics
- Replication lag
Replicator monitoring extension¶
The Confluent Replicator Monitoring Extension allows for detailed metrics from Replicator tasks to be collected using an exposed REST API. These endpoints provide the following information:
- Throughput - the number of messages replicated per second.
- Message Lag - the number of messages that have been produced to the origin cluster that have not yet been replicated to the destination.
- Latency - the average time period between message production to the origin cluster and message production to the destination cluster.
The metrics are broken down by connector, task and topic/partition.
These metrics have been designed to integrate with Control Center to provide a centralized view of replication within your clusters.
The Replicator latency metric does not support reporting latency for Replicator when replicating historical data. It only supports reporting latency when data is being replicated in real time. So when replicating very old data, you can disregard the latency reported by Replicator.
Install¶
The Monitoring extension is installed with Replicator. For more information regarding installation see Download and install Replicator.
Activate the extension¶
After the installation is complete, you must configure Kafka Connect to recognize the installed extension. First add the extension library to the Connect
worker classpath by setting the CLASSPATH
environment variable.
export CLASSPATH=<path to replicator install>/replicator-rest-extension-<version>.jar
Tip
- The location of this REST extension JAR file is dependent on your platform and type of Confluent install.
For example, on Mac OS install with zip.tar, by default,
replicator-rest-extension-<version>.jar
is in the Confluent directory in/share/java/kafka-connect-replicator/
. If you install with the default Ansible settings, the JAR file is located in/usr/share/java/kafka-connect-replicator/
. - To make sure the CLASSPATH sticks, you might want to add it to your shell configuration file (for example,
.bash_profile
,.bashrc
,.zshrc
), source the profile in any open command shells, and check it withecho $CLASSPATH
.
Then, to activate the extension, add the following property to the list of extension classes in the configuration file that you use to start Replicator (for example, my-examples/replication.properties
, etc/kafka/replication.properties
, or /etc/kafka/connect-distributed.properties
):
rest.extension.classes=io.confluent.connect.replicator.monitoring.ReplicatorMonitoringExtension
With the extension is activated, you can use Control Center to monitor replicators. See Use Control Center to monitor replicators in the Replicator Tutorial and Replicators in the Control Center User Guide.
Additional configuration for secure endpoints¶
When the Connect distributed cluster hosting Replicator has a REST endpoint with TLS/SSL authentication enabled (https), you must configure security properties for the TLS/SSL keystore and truststore used by the Replicator monitoring extension to communicate with other Connect nodes in the cluster. These are set with the following environment variables at the JVM level:
export KAFKA_OPTS: -Djavax.net.ssl.trustStore=/etc/kafka/secrets/kafka.connect.truststore.jks
-Djavax.net.ssl.trustStorePassword=<password>
-Djavax.net.ssl.keyStore=/etc/kafka/secrets/kafka.connect.keystore.jks
-Djavax.net.ssl.keyStorePassword=<password>
See also
For an example that shows how to set Docker environment variables for Confluent Platform running in ZooKeeper mode, see the Confluent Platform demo. Refer to the demo’s docker-compose.yml file for a configuration reference.
You should also set listeners.https.
properties, as specified at here, for example:
"listeners.https.ssl.keystore.location": "/etc/kafka/connect.jks"
"listeners.https.keystore.password": "xxxxx"
systemctl command configurations¶
If you are not using ``systemctl``, skip these steps.
If you are using the systemctl
command to start the monitoring service, you must configure the environment variables using
sudo systemctl edit confluent-kafka-connect
, or edit file /etc/systemd/system/confluent-kafka-connect.service.d/<service.conf>
.
If you use this method and don’t configure the environment variables properly for systemctl
, Connect will fail to start.
The steps for this use case are as follows.
As previously described, activate the extension by adding the following property in the configuration file that you use to start Replicator as previously described (for example,
my-examples/replication.properties
,etc/kafka/replication.properties
, or/etc/kafka/connect-distributed.properties
):rest.extension.classes=io.confluent.connect.replicator.monitoring.ReplicatorMonitoringExtension
Configure environment variables using either
sudo systemctl edit confluent-kafka-connect
, or by editing the file/etc/systemd/system/confluent-kafka-connect.service.d/<service.conf>
[Service] Environment="CLASSPATH=<path to replicator install>/replicator-rest-extension-6.0.0.jar"
Reload systemd configuration and restart the component.
sudo systemctl reload sudo systemctl restart confluent-kafka-connect
Replicator monitoring extension API reference¶
-
GET
/WorkerMetrics/(string: connector_name)
¶ Get Metrics for tasks running on this Connect worker for a given connector.
Parameters: - connector_name (string) – the name of the connector
Example request:
GET /WorkerMetrics/replicate-topic HTTP/1.1 Host: kafkaconnect.example.com
Example response:
HTTP/1.1 200 OK Content-Type: application/json { "connectorName": "replicate-topic", "srcClusterId": "qhS9FOVFQkmokuIvo1ZZMA", "destClusterId": "qhS9FOVFQkmokuIvo1ZZMA", "tasks": [{ "id": "replicate-topic-0", "workerId": null, "state": null, "metrics": [{ "timestamp": 1570742038165, "throughput": 0.1075268817204301, "messageLag": 0.0, "latency": 12.764705882352942, "srcTopic": "wikipedia.parsed", "srcPartition": 1, "destTopic": "wikipedia.parsed.replica" }, { "timestamp": 1570742038166, "throughput": 0.10976948408342481, "messageLag": 0.0, "latency": 16.433333333333334, "srcTopic": "wikipedia.parsed", "srcPartition": 0, "destTopic": "wikipedia.parsed.replica" }] }] }
-
GET
/ReplicatorMetrics
¶ Get Metrics for all Replicators running in this Connect cluster.
Example request:
GET /ReplicatorMetrics HTTP/1.1 Host: kafkaconnect.example.com
Example response:
HTTP/1.1 200 OK Content-Type: application/json { "connectors": [{ "name": "replicate-topic", "srcClusterBootstrapServers": "kafka1:9091", "destClusterBootstrapServers": "kafka1:9091", "srcClusterId": "qhS9FOVFQkmokuIvo1ZZMA", "destClusterId": "qhS9FOVFQkmokuIvo1ZZMA", "tasks": [{ "id": "0", "workerId": "connect:8083", "state": "RUNNING", "metrics": [{ "timestamp": 1570742222631, "throughput": 0.11024142872891633, "messageLag": 0.0, "latency": 34.86666666666667, "srcTopic": "wikipedia.parsed", "srcPartition": 1, "destTopic": "wikipedia.parsed.replica" }, { "timestamp": 1570742222631, "throughput": 0.10187449062754686, "messageLag": 0.0, "latency": 15.307692307692308, "srcTopic": "wikipedia.parsed", "srcPartition": 0, "destTopic": "wikipedia.parsed.replica" }] }] }] }
Monitoring JMX metrics¶
Like Kafka brokers, Kafka Connect reports metrics via JMX. To monitor Connect and Replicator, set the JMX_PORT
environment variable before starting the Connect Workers. Then collect the reported metrics using your usual monitoring tools. JMXTrans, Graphite and Grafana are a popular combination for collecting and reporting JMX metrics from Kafka.
When you look at the metrics reported using JMX, you will see that Connect exposes Replicator’s consumer metrics and Connect’s producer metrics. You can view the full list of metrics in Monitoring Kafka with JMX in Confluent Platform. Here are some of the important metrics and their significance.
Important Replicator metrics¶
MBean: confluent.replicator:type=confluent-replicator-task-metrics,confluent-replicator-task=([-.w]+),confluent-replicator-task-topic-partition=([-.w]+),confluent-replicator-name=([-.w]+),confluent-replicator-topic-name=([-.w]+)
confluent-replicator-source-cluster
- The id of the source cluster from which Replicator is replicating.
confluent-replicator-destination-cluster
- The id of the destination cluster to which Replicator is replicating.
confluent-replicator-destination-topic-name
- The name of the destination topic to which Replicator is replicating.
confluent-replicator-task-topic-partition-message-lag
- The average number of messages that were produced to the origin cluster, but have not yet arrived to the destination cluster. This metric will report NaN if no new records are replicated for the partition for some time.
confluent-replicator-task-topic-partition-throughput
- The number of messages replicated per second from the source to destination cluster.
confluent-replicator-task-topic-partition-byte-throughput
- The number of bytes replicated per second from the source to destination cluster.
confluent-replicator-task-topic-partition-latency
- The average time between message production to the source cluster and message production to the destination cluster.
Important producer metrics¶
MBean: kafka.producer:type=producer-metrics,client-id=([-.w]+)
io-ratio
orio-wait-ratio
- If the
io-ratio
is low orio-wait-ratio
is high, this means the producer is not very busy and is unlikely to be a bottleneck. outgoing-byte-rate
- Reports the producer throughput when writing to destination Kafka.
batch-size-avg
andbatch-size-max
- If they are consistently close to the configured
batch.size
, you may be producing as fast as possible and you’ll want to increase the batch size to get better batching. record-retry-rate
andrecord-error-rate
- The average per-second number of retried record sends and failed record sends for a topic. High number of those can indicate issues writing to the destination cluster.
produce-throttle-time-avg
andproduce-throttle-time-max
- Produce requests may be throttled to meet quotas configured on the destination cluster. If these are non-zero, it indicates that the destination brokers are slowing the producer down and the quotas configuration should be reviewed. For more information on quotas see Enforcing client quotas.
waiting-threads
andbufferpool-wait-time
- Non-zero values here indicate memory pressure. Connect producers can’t send events fast enough, resulting in full memory buffers that cause Replicator threads to block.
Important consumer metrics¶
MBean: kafka.consumer:type=consumer-fetch-manager-metrics,client-id=([-.w]+)
io-ratio
orio-wait-ratio
- If the
io-ratio
is low orio-wait-ratio
is high, this means the consumer is not very busy and is unlikely to be a bottleneck. bytes-consumed-rate
- Indicates throughput of Replicator reading events from origin cluster.
fetch-size-avg
andfetch-size-max
- If they are close to the configured maximum fetch size consistently, it means that Replicator is reading at the maximum possible rate. Increase the maximum fetch size and check if the throughput per task is improved.
records-lag-max
- The maximum lag in terms of number of records for any partition. An increasing value over time indicates that Replicator is not keeping up with the rate at which events are written to the origin cluster.
fetch-rate
,fetch-size-avg
andfetch-size-max
- If fetch-rate is high but fetch-size-avg and fetch-size-max are not close to the maximum configured fetch size, perhaps the consumer is “churning”. Try increasing the
fetch.min.bytes
andfetch.max.wait
configuration. This can help the consumer batch more efficiently. fetch-throttle-time-max
andfetch-throttle-time-avg
- Fetch requests may be throttled to meet quotas configured on the origin cluster. If these are non-zero, it indicates that the origin brokers are slowing the consumer down and the quotas configuration should be reviewed. For more information on quotas see Enforcing client quotas
Monitoring Replicator lag (Legacy versions only)¶
Important
- For current versions of Replicator, it is recommended that you use the previously mentioned JMX metrics to monitor Replicator lag as it is more accurate than using the consumer group lag tool. The following methodology to monitor Replicator lag is only recommended if you are using Replicator with a legacy version below 5.4.0.
- Replicator latency is calculated by taking the timestamp of the record consumed on the source and subtracting that from when the message offset to the destination is flushed. If old records are processed or if the time setting on the source records is not the same for producers and consumers, then the metric will spike. This is misleading and should not be construed as a latency issue, but rather is a limitation of this type of metrics calculation.
You can monitor Replicator lag by using the Consumer Group Command tool
(kafka-consumer-groups
). To use this functionality, you must set the Replicator offset.topic.commit
config to
true
(the default value).
Replicator does not consume using a consumer group, instead it manually assigns partitions. When offset.topic.commit
is true, Replicator commits consumer offsets (again manually),
but these are for reference only and do not represent an active consumer group. Since Replicator only commits offsets and does not actually form a consumer group, the kafka-consumer-groups
command output will show no active members in the group (correctly); only the committed offsets. This is expected behavior for Replicator. To check membership information, use
Connect status endpoints rather than kafka-consumer-groups
.
Replication lag is the number of messages that were produced to the origin cluster, but have not yet arrived to the destination cluster. It can also be measured as the amount of time it currently takes for a message to get replicated from origin to destination. Note that this can be higher than the latency between the two datacenters if Replicator is behind for some reason and needs time to catch up.
The main reasons to monitor replication lag are:
- If there is a need to failover from origin to destination and if the origin cannot be restored, all events that were produced to origin and not replicated to the target will be lost. (If the origin can be restored, the events will not be lost.)
- Any event processing that happens at the destination will be delayed by the lag.
The lag is typically just a few hundred milliseconds (depending on the network latency between the two datacenters), but it can grow larger if network partitions or configuration changes temporarily pause replication and the replicator needs to catch up. If the replication lag keeps growing, it indicates that Replicator throughput is lower than what gets produced to the origin cluster and that additional Replicator tasks or Connect Workers are necessary. For example, if producers are writing 100 MBps to the origin cluster, but the Replicator only replicates 50 MBps.
To increase the throughput, the TCP socket buffer should be increased on the Replicator and the brokers. When Replicator is running in the destination cluster (recommended), you must also increase the following:
- The TCP send socket buffer (
socket.send.buffer.bytes
) on the source cluster brokers. - The receive TCP socket buffer (
socket.receive.buffer.bytes
) on the consumers. A value of 512 KB is reasonable but you may want to experiment with values up to 12 MB.
If you are using Linux, you might need to change the default socket buffer maximum for the Kafka settings to take effect. For more information about tuning your buffers, see this article.