Important

You are viewing documentation for an older version of Confluent Platform. For the latest, click here.

Kafka Producer

Confluent Platform includes the Java producer shipped with Apache Kafka®.

This section gives a high-level overview of how the producer works and an introduction to the configuration settings for tuning.

To see examples of producers written in various languages, refer to the specific language sections. For additional examples, including usage of Confluent Cloud, refer to Code Examples.

Concepts

The Kafka producer is conceptually much simpler than the consumer since it has no need for group coordination. A producer partitioner maps each message to a topic partition, and the producer sends a produce request to the leader of that partition. The partitioners shipped with Kafka guarantee that all messages with the same non-empty key will be sent to the same partition.

Important

If you explicitly set the partition field when creating a ProducerRecord, the default behavior described in this section will be overridden.

If the key is provided, the partitioner will hash the key with murmur2 algorithm and divide it by the number of partitions. The result is that the same key is always assigned to the same partition. If a key is not provided, behavior is Confluent Platform version-dependent:

  • In Confluent Platform versions 5.4.x and later, the partition is assigned with awareness to batching. If a batch of records is not full and has not yet been sent to the broker, it will select the same partition as a prior record. Partitions for newly created batches are assigned randomly. For more information, see KIP-480: Sticky Partitioner and the related Confluent blog post.
  • In Confluent Platform versions prior to 5.4.x, the partition is assigned in a round robin method, starting at a random partition.

Each partition in the Kafka cluster has a leader and a set of replicas among the brokers. All writes to the partition must go through the partition leader. The replicas are kept in sync by fetching from the leader. When the leader shuts down or fails, the next leader is chosen from among the in-sync replicas. Depending on how the producer is configured, each produce request to the partition leader can be held until the replicas have successfully acknowledged the write. This gives the producer some control over message durability at some cost to overall throughput.

Messages written to the partition leader are not immediately readable by consumers regardless of the producer’s acknowledgement settings. When all in-sync replicas have acknowledged the write, then the message is considered committed, which makes it available for reading. This ensures that messages cannot be lost by a broker failure after they have already been read. Note that this implies that messages which were acknowledged by the leader only (that is, acks=1) can be lost if the partition leader fails before the replicas have copied the message. Nevertheless, this is often a reasonable compromise in practice to ensure durability in most cases while not impacting throughput too significantly.

Most of the subtlety around producers is tied to achieving high throughput with batching/compression and ensuring message delivery guarantees as mentioned above. In the next section, the most common settings to tune producer behavior are discussed.

Kafka Producer Configuration

The full list of configuration settings are available in Producer Configurations. The key configuration settings and how they affect the producer’s behavior are highlighted below.

Core Configuration: You are required to set the bootstrap.servers property so that the producer can find the Kafka cluster. Although not required, you should always set a client.id since this allows you to easily correlate requests on the broker with the client instance which made it. These settings are the same for Java, C/C++, Python, Go and .NET clients.

Message Durability: You can control the durability of messages written to Kafka through the acks setting. The default value of 1 requires an explicit acknowledgement from the partition leader that the write succeeded. The strongest guarantee that Kafka provides is with acks=all, which guarantees that not only did the partition leader accept the write, but it was successfully replicated to all of the in-sync replicas. You can also use a value of 0 to maximize throughput, but you will have no guarantee that the message was successfully written to the broker’s log since the broker does not even send a response in this case. This also means that you will not be able to determine the offset of the message. Note that for the C/C++, Python, Go and .NET clients, this is a per-topic configuration, but can be applied globally using the default_topic_conf sub-configuration in C/C++ and default.topic.config sub-configuration in Python, Go and .NET.

Message Ordering: In general, messages are written to the broker in the same order that they are received by the producer client. However, if you enable message retries by setting retries to a value larger than 0 (which is the default), then message reordering may occur since the retry may occur after a following write succeeded. To enable retries without reordering, you can set max.in.flight.requests.per.connection to 1 to ensure that only one request can be sent to the broker at a time. Without retries enabled, the broker will preserve the order of writes it receives, but there could be gaps due to individual send failures.

Batching and Compression: Kafka producers attempt to collect sent messages into batches to improve throughput. With the Java client, you can use batch.size to control the maximum size in bytes of each message batch. To give more time for batches to fill, you can use linger.ms to have the producer delay sending. Compression can be enabled with the compression.type setting. Compression covers full message batches, so larger batches will typically mean a higher compression ratio. With the C/C++, Python, Go and .NET clients, you can use batch.num.messages to set a limit on the number of messages contained in each batch.

When using the snappy compression, you need write access to the /tmp directory. If you don’t have write access to the /tmp directory because it’s set to noexec, pass in a directory path for snappy that you have write access to:

-Dorg.xerial.snappy.tempdir=/path/to/newtmp

Queuing Limits: Use buffer.memory to limit the total memory that is available to the Java client for collecting unsent messages. When this limit is hit, the producer will block on additional sends for as long as max.block.ms before raising an exception. Additionally, to avoid keeping records queued indefinitely, you can set a timeout using request.timeout.ms. If this timeout expires before a message can be successfully sent, then it will be removed from the queue and an exception will be thrown. The C/C++, Python, Go and .NET clients have similar settings. Use queue.buffering.max.messages to limit the total number of messages that can be queued (for transmission, retries, or delivery reports) at any given time. queue.buffering.max.ms limits the amount of time the client waits to fill up a batch before sending it to the broker.

Example code

For Hello World examples of Kafka clients in various programming languages including Java, see Code Examples. All examples include a producer and consumer that can connect to any Kafka cluster running on-premises or in Confluent Cloud. They also include examples of how to produce and consume Avro data with Schema Registry.