Optimizing for Throughput

To optimize for throughput, producers and consumers must move as much data as possible within a given amount of time. For high throughput, try maximizing the rate at which the data moves. The data rate should be the fastest possible rate.

The values for some of the configuration parameters in this page depend on other factors, such as the average message size and number of partitions. These can differ greatly from environment to environment.

Some configuration parameters have a range of values, so benchmarking helps to validate the configuration for your application and environment.

Number of Partitions

A topic partition is the unit of parallelism in Kafka. Producers can send messages to different partitions in parallel, across different brokers in parallel, and read by different consumers in parallel. In general, a higher number of topic partitions results in higher throughput, and to maximize throughput, you need enough partitions to distribute across the brokers in your Confluent Cloud cluster.

There are trade-offs to increasing the number of partitions. Review the guidelines on how to choose the number of partitions. Be sure to choose the partition count based on producer throughput and consumer throughput, and benchmark performance in your environment. Also, consider the design of your data patterns and key assignments so messages are distributed as evenly as possible across topic partitions. This prevents overloading certain topic partitions relative to others.

Batching Messages

With batching strategy of Kafka producers, you can batch messages going to the same partition, which means they collect multiple messages to send together in a single request. The most important step you can take to optimize throughput is to tune the producer batching to increase the batch size and the time spent waiting for the batch to populate with messages. Larger batch sizes result in fewer requests to Confluent Cloud, which reduces load on producers and the broker CPU overhead to process each request. With the Java client, you can configure the batch.size parameter to increase the maximum size in bytes of each message batch. To give more time for batches to fill, you can configure the linger.ms parameter to have the producer wait longer before sending. The delay allows the producer to wait for the batch to reach the configured batch.size. The trade-off is tolerating higher latency as messages aren’t sent as soon as they are ready to send.

Compression

To optimize for throughput, you can also enable compression, which means many bits can be sent as fewer bits. Enable compression by configuring the compression.type parameter which can be set to one of the following standard compression codecs:

  • lz4 (recommended for performance)
  • snappy
  • zstd
  • gzip

Use lz4 for performance instead of gzip, which is more compute intensive and may cause your application not to perform as well. Compression is applied on full batches of data, so better batching results in better compression ratios. When Confluent Cloud receives a compressed batch of messages from a producer, it always decompresses the data to validate it. Afterwards, it considers the compression codec of the destination topic.

If the compression codec of the destination topic are left at the default setting of producer, or if the codecs of the batch and destination topic are the same, Confluent Cloud takes the compressed batch from the client and writes it directly to the topic’s log file without taking cycles to recompress the data. Otherwise, Confluent Cloud needs to recompress the data to match the codec of the destination topic, and this can result in a performance impact. You should keep the compression codecs the same if possible.

Producer acks

When a producer sends a message to Confluent Cloud, the message is sent to the leader broker for the target partition. Then the producer awaits a response from the leader broker (assuming acks isn’t set to 0, in which case the producer does not wait for any acknowledgment from the broker at all) to know that its message has been committed before proceeding to send the next messages. There are automatic checks in place to make sure consumers cannot read messages that haven’t been committed yet. When leader brokers send those responses, it may impact the producer throughput: the sooner a producer receives a response, the sooner the producer can send the next message, which generally results in higher throughput. So producers can set the configuration parameter acks to specify the number of acknowledgments the leader broker must have received before responding to the producer with an acknowledgment. Setting acks=1 makes the leader broker write the record to its local log and then acknowledge the request without awaiting acknowledgment from all followers. The trade-off is you have to tolerate lower durability, because the producer doesn’t have to wait until the message is replicated to other brokers.

Memory Allocation

Kafka producers automatically allocate memory for the Java client to store unsent messages. If that memory limit is reached, then the producer blocks on additional sends until memory frees up or until max.block.ms time passes. You can adjust how much memory is allocated with the configuration parameter buffer.memory. If you don’t have a lot of partitions, you may not need to adjust this at all. However, if you have a lot of partitions, you can tune buffer.memory–while also taking into account the message size, linger time, and partition count—to maintain pipelines across more partitions. This in turn enables better use of the bandwidth across more brokers.

Consumer Fetching

Another way to optimize for throughput is adjust how much data consumers receive from each fetch from the leader broker in Confluent Cloud. You can increase how much data the consumers get from the leader for each fetch request by increasing the configuration parameter fetch.min.bytes. This parameter sets the minimum number of bytes expected for a fetch response from a consumer. Increasing this also reduces the number of fetch requests made to Confluent Cloud, reducing the broker CPU overhead to process each fetch, thereby also improving throughput. Similar to the consequence of increasing batching on the producer, there may be a resulting trade-off to higher latency when increasing this parameter on the consumer. This is because the broker won’t send the consumer new messages until the fetch request has enough messages to fulfill the size of the fetch request (fetch.min.bytes), or until the expiration of the wait time (configuration parameter fetch.max.wait.ms).

Assuming the application allows it, use consumer groups with multiple consumers to parallelize consumption. Parallelizing consumption may improve throughput because multiple consumers can balance the load, processing multiple partitions simultaneously. The upper limit on this parallelization is the number of partitions in the topic.

Summary of Configurations for Optimizing Throughput

Producer

  • batch.size: increase to 100000–200000 (default 16384)
  • linger.ms: increase to 10–100 (default 0)
  • compression.type=lz4 (default none, for example, no compression)
  • acks=1 (default 1)
  • buffer.memory: increase if there are a lot of partitions (default 33554432)

Consumer

  • fetch.min.bytes: increase to ~100000 (default 1)