Tuning and Monitoring Replicator

This section of the document contains advice on getting maximum replication throughput with a minimal number of Connect Workers and machines.

We will assume that you already have a Connect cluster with Replicator running. For the purpose of this part of the documentation we’ll also assume that you are running a dedicated Connect cluster for Replicator, that is - no other Connector is running on the same cluster and all resources of the cluster are available for Replicator.

Sizing Replicator Cluster

Sizing a Replicator cluster comes down to two questions:

  • How many nodes do we need in the cluster and how large should these nodes be
  • How many Replicator tasks do we need to run

We’ll describe a method to determine the number of tasks and then use this information to determine the number of nodes you’ll need.

The first step is to find out the throughput per task. You can do this by running just one task for a bit and checking the throughput. One way to do it is to fill a topic at the origin cluster with large amounts of data, start Replicator with a single task (configuration parameter tasks.max=1) and measure the rate at which events are writen to the destination cluster. This will show you what you can achieve with a single task.

Then take the desired throughput (how many MBps do you need to replicate between the clusters), divide by the throughput per task and this is the number of tasks you need.

For example, suppose that I ran replicator with a single task and saw that it can replicate 30 MBps. I know that between all my topics, I need to replicate 100 MBps. This means I need at least 4 replicator tasks.

There are two caveats to this formula:

  • You can’t have more tasks than partitions. If you find out that your throughput requires more tasks than you have partitions - you’ll need to add partitions. Having more partitions than tasks is fine, one task can consume from multiple partitions.
  • Having too many tasks on a single machine will cause you to saturate the machine resources. Those will be either the network or the CPU. If you are adding more tasks and the throughput doesn’t increase, you’ve probably saturated one of these resources. The easy solution is to add additional nodes, so you’ll have more resources to use.

Getting More Throughput From Replicator Tasks

One way to increase throughput for Replicator is to run more tasks across more Connect Worker nodes. The other is to try to squeeze more throughput from each Replicator task. In reality, your tuning effort are likely to be a combination of both - start by tuning a single task and then achieving the desired throughput with multiple tasks running in parallel.

When tuning performance of Connect tasks, you can try to make each task use less CPU or to use the network more efficiently. We recommend first checking which of these are the bottleneck and tuning for the right resource. One way to know which one is the bottleneck is to add tasks to Replicator running on a single node until adding tasks no longer increases the throughput. If at this point you are seeing high CPU utilization, you want to improve CPU utilization of Connect tasks. If CPU utilization is low, but adding tasks does not improve throughput - you’ll want to tune the network utilization instead.

Improving CPU Utilization of a Connect Task

  • Make sure you are not seeing excessive garbage collection pauses by enabling and checking Java’s garbage collection log. Use G1GC and a reasonable heap size (4G is a good default to start with).
  • Make sure Replicator is configured with key.converter and value.converter set to io.confluent.connect.replicator.util.ByteArrayConverter. If you have src.key.converter and src.value.converter configured, they should also be set to io.confluent.connect.replicator.util.ByteArrayConverter (the default value). This will eliminate costly conversion of events to Connect’s internal data format and back to bytes.
  • Disable CRC checks. This isn’t recommended since it can lead to data corruption, but CRC checks for data integrity use CPU and can be disabled for improved performance by setting src.consumer.check.crcs=false in Replicator configuration.

Improving Network Utilization of a Connect Task

Replicator typically reads or writes events between two datacenters, which means latency is usually very high. High network latency can lead to reduced throughput. The suggestions below all aim at configuring the TCP stack to improve throughput in this environment. Note that this is different from configuring applications that are running within the same datacenter as the Kafka cluster - inside a datacenter, you are usually trading off latency vs throughput and you can decide which of these to tune for. When replicating events between two datacenters, high latency is typically a fact we need to deal with.

Use a tool like https://www.switch.ch/network/tools/tcp_throughput/?do+new+calculation=do+new+calculation to compute TCP buffer sizes based on the network link properties. Increase TCP buffer sizes to handle networks with high bandwidth and high latency (this includes most inter-DC links). This needs to be done in two places:

  • Increase application-level requested send.buffer.bytes and receive.buffer.bytes on producer, consumer and brokers. This is the level requested by the application and may still be silently overridden by the OS if you don’t follow the second step.
  • Increase OS-level TCP buffer size (net.core.rmem_default, net.core.rmem_max, net.core.wmem_default, net.core.wmem_max, net.core.optmem_max). See, e.g. https://wwwx.cs.unc.edu/~sparkst/howto/network_tuning.php for some examples. You can use sysctl for testing in the current session, but will need the values in /etc/sysctl.conf to make them permanent.
  • Important: Enable logging to double check this actually took effect. There are instances where the OS silently overrode / ignored settings. log4j.logger.org.apache.kafka.common.network.Selector=DEBUG.

In addition, you can try two additional network optimizations:

  • Enable automatic window scaling ( sysctl –w net.ipv4.tcp_window_scaling=1 or add net.ipv4.tcp_window_scaling=1 to /etc/sysctl.conf). This will allow the TCP buffer to grow beyond its usual maximum of 64K if the latency justifies it.
  • Reducing the TCP slow start time (set /proc/sys/net/ipv4/tcp_slow_start_after_idle to 0) will make the network connection reach its maximum capacity sooner.
  • Increase producer batch.size, linger.ms and consumer fetch.max.bytes, fetch.min.bytes and fetch.max.wait in order to improve throughput by reading and writing bigger batches.

Note that configuring Consumer settings is done via the Replicator configuration file, while configuring Producer settings is done via the Connect Worker configuration. For example, tuning consumer fetch.max.bytes will be done by setting src.consumer.fetch.max.bytes in the replicator configuration while tuning producer batch.size will be done by setting producer.batch.size in the Connect Worker configuration. This is because Replicator is consuming from the origin cluster, passing the records to the Connect Worker and the Worker is producing to the destination cluster.

Monitoring Replicator

Currently the recommended way to monitor Replicator is by monitoring:

  • Consumer metrics
  • Producer metrics
  • Replication lag

Monitoring Replicator Lag

You can monitor Replicator lag by using the Consumer Group Command tool (kafka-consumer-groups). To use this functionality, you must set the Replicator offset.topic.commit config to true (the default value).

Replication lag is the number of messages that were produced to the origin cluster, but did not yet arrive to the destination cluster. It can also be measured as the amount of time it currently takes for a message to get replicated from origin to destination. Note that this can be higher than the latency between the two datacenters if Replicator is behind for some reason and needs time to catch up.

The main reasons to monitor replication lag are:

  • If there is a need to failover from origin to destination, all events that were produced to origin and didn’t arrive to destination yet will be lost.
  • Any event processing that happens at the destination will be delayed by the lag.

The lag is typically just few hundred milliseconds (depending on the network latency between the two datacenters), but it can grow larger if network partitions or configuration changes temporarily pause replication and the replicator needs to catch up. If the replication lag keeps growing, it indicates that Replicator throughput is lower than what gets produced to the origin cluster and that additional Replicator tasks or Connect Workers are necessary. For example, if producers are writing 100 MBps to the origin cluster, but the Replicator only replicates 50 MBps.

Tip

To increase the throughput, the TCP socket buffer should be increased on the Replicator and the brokers. When Replicator is running in the destination cluster (recommended), you must also increase the following:

  • The TCP send socket buffer (socket.send.buffer.bytes) on the source cluster brokers.
  • The receive TCP socket buffer (socket.receive.buffer.bytes) on the consumers. A value of 512 KB is reasonable but you may want to experiment with values up to 12 MB.

If you are using Linux, you might need to change the default socket buffer maximum for the Kafka settings to take effect. For more information about tuning your buffers, see this article

Monitoring Producer and Consumer Metrics

Like Kafka brokers, Kafka Connect reports metrics via JMX. To monitor Kafka Connect and Replicator, you’ll need to set JMX_PORT environment variable before you start the Connect Workers. Then collect the reported metrics using your usual monitoring tools. JMXTrans, Graphite and Grafana are a popular combination for collecting and reporting JMX metrics from Kafka.

When you look at the metrics reported via JMX, you’ll notice that Connect exposes Replicator’s consumer metrics and Connect’s producer metrics. You can see the full list of metrics in Monitoring Kafka. Here are some of the important metrics and their significance.

Important Producer Metrics

io-ratio or io-wait-ratio
If the io-ratio is low or io-wait-ratio is high, this means the producer is not very busy and is unlikely to be a bottleneck.
outgoing-byte-rate
Reports the producer throughput when writing to destination Kafka.
batch-size-avg and batch-size-max
If they are consistently close to the configured batch.size, you may be producing as fast as possible and you’ll want to increase the batch size to get better batching.
record-retry-rate and record-error-rate
The average per-second number of retried record sends and failed record sends for a topic. High number of those can indicate issues writing to the destination cluster.
produce-throttle-time-avg and produce-throttle-time-max
If they are non-zero, it indicates that the destination brokers are configured to slow producers down
waiting-threads and bufferpool-wait-time
Non-zero values here indicate memory pressure. Connect producers can’t send events fast enough, resulting in full memory buffers that cause Replicator threads to block.

Important Consumer Metrics

io-ratio or io-wait-ratio
If the io-ratio is low or io-wait-ratio is high, this means the consumer is not very busy and is unlikely to be a bottleneck.
bytes-consumed-rate
Indicates throughput of Replicator reading events from origin cluster.
fetch-size-avg and fetch-size-max
If they are close to the configured maximum fetch size consistently, it means that Replicator is reading as fast as it can. Try to increase the maximum fetch size and check if the throughput per task is improved.
records-lag-max
The maximum lag in terms of number of records for any partition. An increasing value over time is your best indication that Replicator is not keeping up with the rate at which events are written to the origin cluster.
fetch-rate, fetch-size-avg and fetch-size-max
If fetch-rate is high but fetch-size-avg and fetch-size-max are not close to the maximum configured fetch size, perhaps the consumer is “churning”. Try increasing the fetch.min.bytes and fetch.max.wait configuration. This can help the consumer batch more efficiently.
fetch-throttle-time-max and fetch-throttle-time-avg
If those are above zero, it means that the origin brokers are configured to intentionally reduce Replicator throughput by throttling.