Tuning and Monitoring Replicator

This section of the document contains advice on getting maximum replication throughput with a minimal number of Connect Workers and machines.

We will assume that you already have a Connect cluster with Replicator running. For the purpose of this part of the documentation we’ll also assume that you are running a dedicated Connect cluster for Replicator, that is - no other Connector is running on the same cluster and all resources of the cluster are available for Replicator.

Sizing Replicator Cluster

Sizing a Replicator cluster comes down to two questions:

  • How many nodes do we need in the cluster and how large should these nodes be
  • How many Replicator tasks do we need to run

We’ll describe a method to determine the number of tasks and then use this information to determine the number of nodes you’ll need.

The first step is to find out the throughput per task. You can do this by running just one task for a bit and checking the throughput. One way to do it is to fill a topic at the origin cluster with large amounts of data, start Replicator with a single task (configuration parameter max.tasks=1) and measure the rate at which events are writen to the destination cluster. This will show you what you can achieve with a single task.

Then take the desired throughput (how many MB/s do you need to replicate between the clusters), divide by the throughput per task and this is the number of tasks you need.

For example, suppose that I ran replicator with a single task and saw that it can replicate 30Mb/s. I know that between all my topics, I need to replicate 100Mb/s. This means I need at least 4 replicator tasks.

There are two caveats to this formula:

  • You can’t have more tasks than partitions. If you find out that your throughput requires more tasks than you have partitions - you’ll need to add partitions. Having more partitions than tasks is fine, one task can consume from multiple partitions.
  • Having too many tasks on a single machine will cause you to saturate the machine resources. Those will be either the network or the CPU. If you are adding more tasks and the throughput doesn’t increase, you’ve probably saturated one of these resources. The easy solution is to add additional nodes, so you’ll have more resources to use.

Getting More Throughput From Replicator Tasks

One way to increase throughput for Replicator is to run more tasks across more Connect Worker nodes. The other is to try to squeeze more throughput from each Replicator task. In reality, your tuning effort are likely to be a combination of both - start by tuning a single task and then achieving the desired throughput with multiple tasks running in parallel.

When tuning performance of Connect tasks, you can try to make each task use less CPU or to use the network more efficiently. We recommend first checking which of these are the bottleneck and tuning for the right resource. One way to know which one is the bottleneck is to add tasks to Replicator running on a single node until adding tasks no longer increases the throughput. If at this point you are seeing high CPU utilization, you want to improve CPU utilization of Connect tasks. If CPU utilization is low, but adding tasks does not improve throughput - you’ll want to tune the network utilization instead.

Improving CPU Utilization of a Connect Task

  • Make sure you are not seeing excessive garbage collection pauses by enabling and checking Java’s garbage collection log. Use G1GC and a reasonable heap size (4G is a good default to start with).
  • Make sure Replicator is configured with key.converter and value.converter set to io.confluent.connect.replicator.util.ByteArrayConverter. If you have src.key.converter and src.value.converter configured, they should also be set to io.confluent.connect.replicator.util.ByteArrayConverter (the default value). This will eliminate costly conversion of events to Connect’s internal data format and back to bytes.
  • Disable CRC checks. This isn’t recommended since it can lead to data corruption, but CRC checks for data integrity use CPU and can be disabled for improved performance by setting src.consumer.check.crcs=false in Replicator configuration.

Improving Network Utilization of a Connect Task

Replicator typically reads or writes events between two data-centers, which means latency is usually very high. High network latency can lead to reduced throughput. The suggestions below all aim at configuring the TCP stack to improve throughput in this environment. Note that this is different from configuring applications that are running within the same data-center as the Kafka cluster - inside a data-center, you are usually trading off latency vs throughput and you can decide which of these to tune for. When replicating events between two data-centers, high latency is typically a fact we need to deal with.

Use a tool like https://www.switch.ch/network/tools/tcp_throughput/?do+new+calculation=do+new+calculation to compute TCP buffer sizes based on the network link properties. Increase TCP buffer sizes to handle networks with high bandwidth and high latency (this includes most inter-DC links). This needs to be done in two places:

  • Increase application-level requested send.buffer.bytes and receive.buffer.bytes on producer, consumer and brokers. This is the level requested by the application and may still be silently overridden by the OS if you don’t follow the second step.
  • Increase OS-level TCP buffer size (net.core.rmem_default, net.core.rmem_max, net.core.wmem_default, net.core.wmem_max, net.core.optmem_max). See, e.g. https://wwwx.cs.unc.edu/~sparkst/howto/network_tuning.php for some examples. You can use sysctl for testing in the current session, but will need the values in /etc/sysctl.conf to make them permanent.
  • Important: Enable logging to double check this actually took effect. There are instances where the OS silently overrode / ignored settings. log4j.logger.org.apache.kafka.common.network.Selector=DEBUG.

In addition, you can try two additional network optimizations:

  • Enable automatic window scaling ( sysctl –w net.ipv4.tcp_window_scaling=1 or add net.ipv4.tcp_window_scaling=1 to /etc/sysctl.conf). This will allow the TCP buffer to grow beyond its usual maximum of 64K if the latency justifies it.
  • Reducing the TCP slow start time (set /proc/sys/net/ipv4/tcp_slow_start_after_idle to 0) will make the network connection reach its maximum capacity sooner.
  • Increase producer batch.size, linger.ms and consumer fetch.max.bytes, fetch.min.bytes and fetch.max.wait in order to improve throughput by reading and writing bigger batches.

Note that configuring Consumer settings is done via the Replicator configuration file, while configuring Producer settings is done via the Connect Worker configuration. For example, tuning consumer fetch.max.bytes will be done by setting src.consumer.fetch.max.bytes in the replicator configuration while tuning producer batch.size will be done by setting producer.batch.size in the Connect Worker configuration. This is because Replicator is consuming from the origin cluster, passing the records to the Connect Worker and the Worker is producing to the destination cluster.

Monitoring Replicator

Currently the recommended way to monitor Replicator is by monitoring:

  • Replication lag
  • Consumer metrics
  • Producer metrics

We’ll discuss how to monitor Replicator lag using Confluent Control Center and then we’ll discuss the relevant producer and consumer metrics.

Monitoring Replicator Lag

Replication lag is the number of messages that were produced to the origin cluster, but did not yet arrive to the destination cluster. It can also be measured as the amount of time it currently takes for a message to get replicated from origin to destination (Note that this can be higher than the latency between the two data-centers if Replicator is behind for some reason and needs time to catch up).

There are two important reasons to monitor replication lag:

  1. If there is a need to failover from origin to destination, all events that were produced to origin and didn’t arrive to destination yet will be lost.
  2. Any event processing that happens at the destination will be delayed by the lag

The lag is typically just few hundred milliseconds (depending on the network latency between the two data-centers), but it can grow larger if network partitions or configuration changes temporarily pause replication and the replicator needs to catch up. If the replication lag keeps growing, it indicates that Replicator throughput is lower than what gets produced to the origin cluster (for example, producers are writing 100Mb/s to the origin cluster, but the Replicator only replicates 50MB/s) and that additional Replicator tasks or Connect Workers are necessary.

We recommend using Confluent Control Center to monitor replication lag, although other tools that can monitor consumer lag (such as Kafka’s consumer group tool) can be used as well.

To monitor Replicator using Control Center, you need to configure interceptors that will report timing information from producers and consumers to Control Center. We’ll assume that you have a single Control Center cluster, working with data reported to the destination Kafka cluster. To get end-to-end measurements, we want to measure 2 intervals:

  • From the time the events were produced to the origin Kafka to the time Replicator consumed them
  • From the time Replicator produced the events to the destination Kafka to the time destination consumers processed them

The combination of both intervals allows you to both find the replication lag and to track it down to the component responsible for the delay. In the usual setting where Replicator is installed in the same data center as the destination cluster and the origin cluster is in a different data center, the first interval is usually the more interesting one. All this timing information needs to get reported to the destination Kafka cluster so Control Center can analyze and display the information.

To report the first interval, you’ll need to:

  1. Configure interceptors on all producers writing events to topics that we are planning on Replicating. You can read how to configure interceptors for your producers in the Control Center client documentation.
  2. Make sure monitoring-interceptors-4.1.0.jar is in the Connect classpath (if you are running Confluent Platform, this is the case by default).
  3. Configure interceptor for Replicator consumer. You do this by setting src.consumer.interceptor.classes=io.confluent.monitoring.clients.interceptor.MonitoringConsumerInterceptor in Replicator configuration.
  4. This will get the interceptor events reported to the origin Kafka cluster. Since we need the events in the destination where Control Center is running, we’ll need to configure Replicator to replicate events in topic _confluent-monitoring from origin to destination.

To report the second interval, you’ll need to:

  1. Make sure monitoring-interceptors-4.0.0.jar is in the Connect classpath (if you are running Confluent Platform, this is the case by default).
  2. Configure the Connect cluster to report the time it produces events to destination cluster. You do this by setting producer.interceptor.classes=io.confluent.monitoring.clients.interceptor.MonitoringProducerInterceptor in the Worker configuration.
  3. Configure consumers on the destination cluster to run interceptors, so they’ll report the time they’ve read the replicated events. You can read how to configure interceptors for your consumers in the Control Center client documentation.

In either case, if you want the consumer group for Replicator to be named, you have to set src.consumer.group.id explicitly in the Replicator configuration.

Note that if you don’t complete all steps required to monitor an interval, Control Center will not be able to display any information regarding that time lag.

Monitoring Producer and Consumer Metrics

Like Kafka brokers, Kafka Connect reports metrics via JMX. To monitor Kafka Connect and Replicator, you’ll need to set JMX_PORT environment variable before you start the Connect Workers. Then collect the reported metrics using your usual monitoring tools. JMXTrans, Graphite and Grafana are a popular combination for collecting and reporting JMX metrics from Kafka.

When you look at the metrics reported via JMX, you’ll notice that Connect exposes Replicator’s consumer metrics and Connect’s producer metrics. You can see the full list of metrics in Monitoring Kafka. Here are some of the important metrics and their significance.

Important Producer Metrics

io-ratio or io-wait-ratio
If the io-ratio is low or io-wait-ratio is high, this means the producer is not very busy and is unlikely to be a bottleneck.
outgoing-byte-rate
Reports the producer throughput when writing to destination Kafka.
batch-size-avg and batch-size-max
If they are consistently close to the configured batch.size, you may be producing as fast as possible and you’ll want to increase the batch size to get better batching.
record-retry-rate and record-error-rate
The average per-second number of retried record sends and failed record sends for a topic. High number of those can indicate issues writing to the destination cluster.
produce-throttle-time-avg and produce-throttle-time-max
If they are non-zero, it indicates that the destination brokers are configured to slow producers down
waiting-threads and bufferpool-wait-time
Non-zero values here indicate memory pressure. Connect producers can’t send events fast enough, resulting in full memory buffers that cause Replicator threads to block.

Important Consumer Metrics

io-ratio or io-wait-ratio
If the io-ratio is low or io-wait-ratio is high, this means the consumer is not very busy and is unlikely to be a bottleneck.
bytes-consumed-rate
Indicates throughput of Replicator reading events from origin cluster.
fetch-size-avg and fetch-size-max
If they are close to the configured maximum fetch size consistently, it means that Replicator is reading as fast as it can. Try to increase the maximum fetch size and check if the throughput per task is improved.
records-lag-max
The maximum lag in terms of number of records for any partition. An increasing value over time is your best indication that Replicator is not keeping up with the rate at which events are written to the origin cluster.
fetch-rate, fetch-size-avg and fetch-size-max
If fetch-rate is high but fetch-size-avg and fetch-size-max are not close to the maximum configured fetch size, perhaps the consumer is “churning”. Try increasing the fetch.min.bytes and fetch.max.wait configuration. This can help the consumer batch more efficiently.
fetch-throttle-time-max and fetch-throttle-time-avg
If those are above zero, it means that the origin brokers are configured to intentionally reduce Replicator throughput by throttling.