Optimizing for Throughput¶
To optimize for throughput, producers and consumers must move as much data as possible within a given amount of time. For high throughput, try maximizing the rate at which the data moves. The data rate should be the fastest possible rate.
The values for some of the configuration parameters in this page depend on other factors, such as the average message size and number of partitions. These can differ greatly from environment to environment.
Some configuration parameters have a range of values, so benchmarking helps to validate the configuration for your application and environment.
Number of Partitions¶
A topic partition is the unit of parallelism in Kafka. Producers can send messages to different partitions in parallel, across different brokers in parallel, and read by different consumers in parallel. In general, a higher number of topic partitions results in higher throughput, and to maximize throughput, you need enough partitions to distribute across the brokers in your Confluent Cloud cluster.
There are trade-offs to increasing the number of partitions. Review the guidelines on how to choose the number of partitions. Be sure to choose the partition count based on producer throughput and consumer throughput, and benchmark performance in your environment. Also, consider the design of your data patterns and key assignments so messages are distributed as evenly as possible across topic partitions. This prevents overloading certain topic partitions relative to others.
With batching strategy of Kafka producers, you can batch messages going to the
same partition, which means they collect multiple messages to send together in a
single request. The most important step you can take to optimize throughput is
to tune the producer batching to increase the batch size and the time spent
waiting for the batch to populate with messages. Larger batch sizes result in
fewer requests to Confluent Cloud, which reduces load on producers and the broker CPU
overhead to process each request. With the Java client, you can configure the
batch.size parameter to increase the maximum size in bytes of each message
batch. To give more time for batches to fill, you can configure the
linger.ms parameter to have the producer wait longer before sending. The
delay allows the producer to wait for the batch to reach the configured
batch.size. The trade-off is tolerating higher latency as messages aren’t
sent as soon as they are ready to send.
To optimize for throughput, you can also enable compression, which means many
bits can be sent as fewer bits. Enable compression by configuring the
compression.type parameter which can be set to one of the following standard
lz4(recommended for performance)
lz4 for performance instead of
gzip, which is more compute intensive
and may cause your application not to perform as well. Compression is applied on
full batches of data, so better batching results in better compression ratios.
When Confluent Cloud receives a compressed batch of messages from a producer, it always
decompresses the data to validate it. Afterwards, it considers the compression
codec of the destination topic.
If the compression codec of the destination topic are left at the default
producer, or if the codecs of the batch and destination topic are
the same, Confluent Cloud takes the compressed batch from the client and writes it
directly to the topic’s log file without taking cycles to recompress the data.
Otherwise, Confluent Cloud needs to recompress the data to match the codec of the
destination topic, and this can result in a performance impact. You should keep
the compression codecs the same if possible.
When a producer sends a message to Confluent Cloud, the message is sent to the leader
broker for the target partition. Then the producer awaits a response from the
leader broker (assuming
acks isn’t set to
0, in which case the producer
does not wait for any acknowledgment from the broker at all) to know that its
message has been committed before proceeding to send the next messages. There
are automatic checks in place to make sure consumers cannot read messages that
haven’t been committed yet. When leader brokers send those responses, it may
impact the producer throughput: the sooner a producer receives a response, the
sooner the producer can send the next message, which generally results in higher
throughput. So producers can set the configuration parameter
acks to specify
the number of acknowledgments the leader broker must have received before
responding to the producer with an acknowledgment. Setting
acks=1 makes the
leader broker write the record to its local log and then acknowledge the request
without awaiting acknowledgment from all followers. The trade-off is you have to
tolerate lower durability, because the producer doesn’t have to wait until the
message is replicated to other brokers.
Kafka producers automatically allocate memory for the Java client to store unsent
messages. If that memory limit is reached, then the producer blocks on
additional sends until memory frees up or until
max.block.ms time passes.
You can adjust how much memory is allocated with the configuration parameter
buffer.memory. If you don’t have a lot of partitions, you may not need to
adjust this at all. However, if you have a lot of partitions, you can tune
buffer.memory–while also taking into account the message size, linger time,
and partition count—to maintain pipelines across more partitions. This in turn
enables better use of the bandwidth across more brokers.
Another way to optimize for throughput is adjust how much data consumers receive
from each fetch from the leader broker in Confluent Cloud. You can increase how much
data the consumers get from the leader for each fetch request by increasing the
fetch.min.bytes. This parameter sets the minimum
number of bytes expected for a fetch response from a consumer. Increasing this
also reduces the number of fetch requests made to Confluent Cloud, reducing the broker
CPU overhead to process each fetch, thereby also improving throughput. Similar
to the consequence of increasing batching on the producer, there may be a
resulting trade-off to higher latency when increasing this parameter on the
consumer. This is because the broker won’t send the consumer new messages until
the fetch request has enough messages to fulfill the size of the fetch request
fetch.min.bytes), or until the expiration of the wait time (configuration
Assuming the application allows it, use consumer groups with multiple consumers to parallelize consumption. Parallelizing consumption may improve throughput because multiple consumers can balance the load, processing multiple partitions simultaneously. The upper limit on this parallelization is the number of partitions in the topic.
Summary of Configurations for Optimizing Throughput¶
batch.size: increase to 100000–200000 (default 16384)
linger.ms: increase to 10–100 (default 0)
none, for example, no compression)
buffer.memory: increase if there are a lot of partitions (default 33554432)
fetch.min.bytes: increase to ~100000 (default 1)