Kafka Producer

Confluent Platform includes the Java producer shipped with Apache Kafka®.

This section gives a high-level overview of how the producer works and an introduction to the configuration settings for tuning.

To see examples of producers and consumers written in various languages, refer to the specific language sections. For additional examples, including usage of Confluent Cloud, refer to GitHub examples.

Concepts

The Kafka producer is conceptually much simpler than the consumer since it has no need for group coordination. Its main function is to map each message to a topic partition and send a produce request to the leader of that partition. It does the first of these with a partitioner, which typically selects a partition using a hash function. The partitioners shipped with Kafka guarantee that all messages with the same non-empty key will be sent to the same partition. If no key is provided, then the partition is selected in a round-robin fashion to ensure an even distribution across the topic partitions.

Each partition in the Kafka cluster has a leader and a set of replicas among the brokers. All writes to the partition must go through the partition leader. The replicas are kept in sync by fetching from the leader. When the leader shuts down or fails, the next leader is chosen from among the in-sync replicas. Depending on how the producer is configured, each produce request to the partition leader can be held until the replicas have successfully acknowledged the write. This gives the producer some control over message durability at some cost to overall throughput.

Messages written to the partition leader are not immediately readable by consumers regardless of the producer’s acknowledgement settings. When all in-sync replicas have acknowledged the write, then the message is considered committed, which makes it available for reading. This ensures that messages cannot be lost by a broker failure after they have already been read. Note that this implies that messages which were acknowledged by the leader only (i.e. acks=1) can be lost if the partition leader fails before the replicas have copied the message. Nevertheless, this is often a reasonable compromise in practice to ensure durability in most cases while not impacting throughput too significantly.

Most of the subtlety around producers is tied to achieving high throughput with batching/compression and ensuring message delivery guarantees as mentioned above. In the next section, the most common settings to tune producer behavior are discussed.

Kafka Producer Configuration

The full list of configuration settings are available in Producer Configurations. The key configuration settings and how they affect the producer’s behavior are highlighted below.

Core Configuration: You are required to set the bootstrap.servers property so that the producer can find the Kafka cluster. Although not required, you should always set a client.id since this allows you to easily correlate requests on the broker with the client instance which made it. These settings are the same for Java, C/C++, Python, Go and .NET clients.

Message Durability: You can control the durability of messages written to Kafka through the acks setting. The default value of “1” requires an explicit acknowledgement from the partition leader that the write succeeded. The strongest guarantee that Kafka provides is with “acks=all”, which guarantees that not only did the partition leader accept the write, but it was successfully replicated to all of the in-sync replicas. You can also use a value of “0” to maximize throughput, but you will have no guarantee that the message was successfully written to the broker’s log since the broker does not even send a response in this case. This also means that you will not be able to determine the offset of the message. Note that for the C/C++, Python, Go and .NET clients, this is a per-topic configuration, but can be applied globally using the default_topic_conf sub-configuration in C/C++ and default.topic.config sub-configuration in Python, Go and .NET.

Message Ordering: In general, messages are written to the broker in the same order that they are received by the producer client. However, if you enable message retries by setting retries to a value larger than 0 (which is the default), then message reordering may occur since the retry may occur after a following write succeeded. To enable retries without reordering, you can set max.in.flight.requests.per.connection to 1 to ensure that only one request can be sent to the broker at a time. Without retries enabled, the broker will preserve the order of writes it receives, but there could be gaps due to individual send failures.

Batching and Compression: Kafka producers attempt to collect sent messages into batches to improve throughput. With the Java client, you can use batch.size to control the maximum size in bytes of each message batch. To give more time for batches to fill, you can use linger.ms to have the producer delay sending. Compression can be enabled with the compression.type setting. Compression covers full message batches, so larger batches will typically mean a higher compression ratio. With the C/C++, Python, Go and .NET clients, you can use batch.num.messages to set a limit on the number of messages contained in each batch. To enable compression, use compression.codec.

Queuing Limits: Use buffer.memory to limit the total memory that is available to the Java client for collecting unsent messages. When this limit is hit, the producer will block on additional sends for as long as max.block.ms before raising an exception. Additionally, to avoid keeping records queued indefinitely, you can set a timeout using request.timeout.ms. If this timeout expires before a message can be successfully sent, then it will be removed from the queue and an exception will be thrown. The C/C++, Python, Go and .NET clients have similar settings. Use queue.buffering.max.messages to limit the total number of messages that can be queued (for transmission, retries, or delivery reports) at any given time. queue.buffering.max.ms limits the amount of time the client waits to fill up a batch before sending it to the broker.