Confluent Platform includes the Java producer shipped with Apache Kafka®.
This section gives a high-level overview of how the producer works and an introduction to the configuration settings for tuning.
To see examples of producers and consumers written in various languages, refer to the specific language sections. For additional examples, including usage of Confluent Cloud, refer to GitHub examples.
The Kafka producer is conceptually much simpler than the consumer since it has no need for group coordination. A producer partitioner maps each message to a topic partition, and the producer sends a produce request to the leader of that partition. The partitioners shipped with Kafka guarantee that all messages with the same non-empty key will be sent to the same partition.
If you explicitly set the partition field when creating a ProducerRecord, the default behavior described in this section will be overridden.
If the key is provided, the partitioner will hash the key with murmur2 algorithm and divide it by the number of partitions. The result is that the same key is always assigned to the same partition. If a key is not provided, behavior is Confluent Platform version-dependent:
- In Confluent Platform versions 5.4.x and later, the partition is assigned with awareness to batching. If a batch of records is not full and has not yet been sent to the broker, it will select the same partition as a prior record. Partitions for newly created batches are assigned randomly. For more information, see KIP-480: Sticky Partitioner and the related Confluent blog post.
- In Confluent Platform versions prior to 5.4.x, the partition is assigned in a round robin method, starting at a random partition.
Each partition in the Kafka cluster has a leader and a set of replicas among the brokers. All writes to the partition must go through the partition leader. The replicas are kept in sync by fetching from the leader. When the leader shuts down or fails, the next leader is chosen from among the in-sync replicas. Depending on how the producer is configured, each produce request to the partition leader can be held until the replicas have successfully acknowledged the write. This gives the producer some control over message durability at some cost to overall throughput.
Messages written to the partition leader are not immediately readable
by consumers regardless of the producer’s acknowledgement settings.
When all in-sync replicas have acknowledged the write, then the
message is considered committed, which makes it available for
reading. This ensures that messages cannot be lost by a broker failure
after they have already been read. Note that this implies that
messages which were acknowledged by the leader only (i.e.
can be lost if the partition leader fails before the replicas have
copied the message. Nevertheless, this is often a reasonable
compromise in practice to ensure durability in most cases while not
impacting throughput too significantly.
Most of the subtlety around producers is tied to achieving high throughput with batching/compression and ensuring message delivery guarantees as mentioned above. In the next section, the most common settings to tune producer behavior are discussed.
Kafka Producer Configuration¶
The full list of configuration settings are available in Producer Configurations. The key configuration settings and how they affect the producer’s behavior are highlighted below.
Core Configuration: You are required to set the
bootstrap.servers property so that the producer can find the Kafka
cluster. Although not required, you should always set a
client.id since this allows you to easily correlate requests on
the broker with the client instance which made it. These settings are
the same for Java, C/C++, Python, Go and .NET clients.
Message Durability: You can control the durability of messages
written to Kafka through the
acks setting. The default value of
“1” requires an explicit acknowledgement from the partition leader
that the write succeeded. The strongest guarantee that Kafka provides
is with “acks=all”, which guarantees that not only did the partition
leader accept the write, but it was successfully replicated to all of
the in-sync replicas. You can also use a value of “0” to maximize
throughput, but you will have no guarantee that the message was
successfully written to the broker’s log since the broker does not
even send a response in this case. This also means that you will not
be able to determine the offset of the message. Note that for the C/C++,
Python, Go and .NET clients, this is a per-topic configuration, but
can be applied globally using the
in C/C++ and
default.topic.config sub-configuration in Python, Go
Message Ordering: In general, messages are written to the broker
in the same order that they are received by the producer client.
However, if you enable message retries by setting
retries to a
value larger than 0 (which is the default), then message reordering
may occur since the retry may occur after a following write
succeeded. To enable retries without reordering, you can set
max.in.flight.requests.per.connection to 1 to ensure that only one
request can be sent to the broker at a time. Without retries enabled,
the broker will preserve the order of writes it receives, but there
could be gaps due to individual send failures.
Batching and Compression: Kafka producers attempt to collect sent
messages into batches to improve throughput. With the Java client, you
batch.size to control the maximum size in bytes of each
message batch. To give more time for batches to fill, you can use
linger.ms to have the producer delay sending. Compression can be
enabled with the
compression.type setting. Compression covers full
message batches, so larger batches will typically mean a higher
compression ratio. With the C/C++, Python, Go and .NET clients, you can use
batch.num.messages to set a limit on the number of messages contained
in each batch.
When using the snappy compression, you need write access to the
/tmp directory. If you don’t have write access to the
/tmp directory because it’s set to
noexec, pass in a directory path for snappy that you have write access to:
Queuing Limits: Use
buffer.memory to limit the total memory
that is available to the Java client for collecting unsent
messages. When this limit is hit, the producer will block on
additional sends for as long as
max.block.ms before raising an
exception. Additionally, to avoid keeping records queued indefinitely,
you can set a timeout using
request.timeout.ms. If this timeout
expires before a message can be successfully sent, then it will be removed
from the queue and an exception will be thrown. The C/C++, Python, Go
and .NET clients have similar settings. Use
to limit the total number of messages that can be queued (for
transmission, retries, or delivery reports) at any given time.
queue.buffering.max.ms limits the amount of time the client waits
to fill up a batch before sending it to the broker.