Tune Replicator for Confluent Platform

This section of the document contains advice on getting maximum replication throughput with a minimal number of Connect Workers and machines.

We will assume that you already have a Connect cluster with Replicator running. For the purpose of this part of the documentation we’ll also assume that you are running a dedicated Connect cluster for Replicator, that is - no other Connector is running on the same cluster and all resources of the cluster are available for Replicator.

Sizing Replicator cluster

Sizing a Replicator cluster comes down to two questions:

  • How many nodes do we need in the cluster and how large should these nodes be
  • How many Replicator tasks do we need to run

We’ll describe a method to determine the number of tasks and then use this information to determine the number of nodes you’ll need.

The first step is to find out the throughput per task. You can do this by running just one task for a bit and checking the throughput. One way to do it is to fill a topic at the origin cluster with large amounts of data, start Replicator with a single task (configuration parameter tasks.max=1) and measure the rate at which events are writen to the destination cluster. This will show you what you can achieve with a single task.

Then take the desired throughput (how many MBps do you need to replicate between the clusters), divide by the throughput per task and this is the number of tasks you need.

For example, suppose that I ran replicator with a single task and saw that it can replicate 30 MBps. I know that between all my topics, I need to replicate 100 MBps. This means I need at least 4 replicator tasks.

There are two caveats to this formula:

  • You can’t have more tasks than partitions. If you find out that your throughput requires more tasks than you have partitions - you’ll need to add partitions. Having more partitions than tasks is fine, one task can consume from multiple partitions.
  • Having too many tasks on a single machine will cause you to saturate the machine resources. Those will be either the network or the CPU. If you are adding more tasks and the throughput doesn’t increase, you’ve probably saturated one of these resources. The easy solution is to add additional nodes, so you’ll have more resources to use.

Recommended Guidelines:

  • No more than 150 partitions per task
  • At most 20 tasks per worker

Getting more throughput from Replicator tasks

One way to increase throughput for Replicator is to run more tasks across more Connect Worker nodes. The other is to try to squeeze more throughput from each Replicator task. In reality, your tuning effort are likely to be a combination of both - start by tuning a single task and then achieving the desired throughput with multiple tasks running in parallel.

When tuning performance of Connect tasks, you can try to make each task use less CPU or to use the network more efficiently. We recommend first checking which of these are the bottleneck and tuning for the right resource. One way to know which one is the bottleneck is to add tasks to Replicator running on a single node until adding tasks no longer increases the throughput. If at this point you are seeing high CPU utilization, you want to improve CPU utilization of Connect tasks. If CPU utilization is low, but adding tasks does not improve throughput - you’ll want to tune the network utilization instead.

Improving CPU utilization of a Connect task

  • Make sure you are not seeing excessive garbage collection pauses by enabling and checking Java’s garbage collection log. Use G1GC and a reasonable heap size (4G is a good default to start with).
  • Make sure Replicator is configured with key.converter and value.converter set to io.confluent.connect.replicator.util.ByteArrayConverter. If you have src.key.converter and src.value.converter configured, they should also be set to io.confluent.connect.replicator.util.ByteArrayConverter (the default value). This will eliminate costly conversion of events to Connect’s internal data format and back to bytes.
  • Disable CRC checks. This isn’t recommended since it can lead to data corruption, but CRC checks for data integrity use CPU and can be disabled for improved performance by setting src.consumer.check.crcs=false in Replicator configuration.

Improving network utilization of a Connect Task

Replicator typically reads or writes events between two datacenters, which means latency is usually very high. High network latency can lead to reduced throughput. The suggestions below all aim at configuring the TCP stack to improve throughput in this environment. Note that this is different from configuring applications that are running within the same datacenter as the Apache Kafka® cluster. Inside a datacenter, you are usually trading off latency vs throughput and you can decide which of these to tune for. When replicating events between two datacenters, high latency is typically a fact you must deal with.

Use a tool like https://www.switch.ch/network/tools/tcp_throughput/?do+new+calculation=do+new+calculation to compute TCP buffer sizes based on the network link properties. Increase TCP buffer sizes to handle networks with high bandwidth and high latency (this includes most inter data center links). This needs to be done in two places:

  • Increase application-level requested by setting the buffer size used to send and receive requests. On producers and consumers, use send.buffer.bytes and receive.buffer.bytes. On brokers, use socket.send.buffer.bytes and socket.receive.buffer.bytes. This is the level requested by the application and may still be silently overridden by the operating system if you don’t follow the second step.
  • Increase operating system level TCP buffer size (net.core.rmem_default, net.core.rmem_max, net.core.wmem_default, net.core.wmem_max, net.core.optmem_max). You can use sysctl for testing in the current session, but will need the values in /etc/sysctl.conf to make them permanent.
  • Important: Enable logging to double check this actually took effect. There are instances where the operating system silently overrode / ignored settings. log4j.logger.org.apache.kafka.common.network.Selector=DEBUG.

In addition, you can try two additional network optimizations:

  • Enable automatic window scaling ( sysctl –w net.ipv4.tcp_window_scaling=1 or add net.ipv4.tcp_window_scaling=1 to /etc/sysctl.conf). This will allow the TCP buffer to grow beyond its usual maximum of 64K if the latency justifies it.
  • Reducing the TCP slow start time (set /proc/sys/net/ipv4/tcp_slow_start_after_idle to 0) will make the network connection reach its maximum capacity sooner.
  • Increase producer batch.size, linger.ms and consumer fetch.max.bytes, fetch.min.bytes and fetch.max.wait in order to improve throughput by reading and writing bigger batches.

Important

Depending upon the method you use to run Replicator, where and how you configure these settings will vary:

  • When running Replicator as a connector, configure consumer settings in the Replicator configuration file and producer settings in the Connect worker configuration. For example, in this type of deployment, to tune consumer fetch.max.bytes, set src.consumer.fetch.max.bytes in the Replicator configuration. To tune producer batch.size, set producer.batch.size in the Connect worker configuration. In this scenario, these configuration files are used because Replicator is consuming from the origin cluster, and passing the records to the Connect worker. The worker is producing to the destination cluster.
  • When running Replicator as an executable, configure consumer settings in consumer.properties and producer settings in producer.properties. In this scenario, these configuration files are used because the consumer.properties file controls the configurations for the embedded source consumer within Replicator, while producer.properties controls the settings for the producer within the Connect framework.

Setting compression to improve performance with increased data loads

Increased data loads can impact Replicator performance with slow processing. You can address this by using data compression on both the source topic and the destination topic. Setting compression on the source topic alone is typically not sufficient; you must also explicitly set compression on the destination Replicator connector or worker configuration as follows: