Optimize Confluent Cloud Clients for Durability¶
Durability is all about reducing the chance for a message to get lost. Confluent Cloud
enforces a replication factor of 3
to ensure data durability.
Some configuration parameters in this page have a range of values. How you set them depends on your requirements and other factors, such as the average message size, number of partitions, and other differences in the environment. So, benchmarking helps to validate the configuration for your application and environment.
Producer acks¶
Producers can control the durability of messages written to Kafka through the
acks
configuration parameter. Although you can use the acks
parameter in
throughput and latency optimization, it is primarily used in the context of
durability. To optimize for high durability, set the parameter to acks=all
(equivalent to acks=-1
), which means the leader waits for the full set of
in-sync replicas (ISRs) to acknowledge the message and consider it committed.
This provides the strongest available guarantees that the record won’t be lost
as long as at least one in-sync replica remains alive. The trade-off is
tolerating a higher latency because the leader broker waits for acknowledgments
from replicas before responding to the producer.
Duplication and Ordering¶
Producers can also increase durability by trying to resend messages if any sends
fail to ensure that data isn’t lost. The producer automatically tries to resend
messages up to the number of times specified by the configuration parameter
retries
(default MAX_INT
) and up to the time duration specified by the
configuration parameter delivery.timeout.ms
(default 120000), the latter of
which was introduced in KIP-91.
You can tune delivery.timeout.ms
to the desired upper bound for the total
time between sending a message and receiving an acknowledgment from the broker,
which should reflect business requirements of how long a message is valid for.
There are two things to consider with automatic producer retries:
- Duplication: If there are transient failures in Confluent Cloud that cause a producer retry, the producer may send duplicate messages to Confluent Cloud.
- Ordering: Multiple send attempts may be “in flight” at the same time, and a retry of a previously failed message send may occur after a newer message send succeeded.
To address both of these, configure the producer for idempotency
(enable.idempotence=true
) in which the brokers in Confluent Cloud track messages
using incrementing sequence numbers, similar to TCP. Idempotent producers can
handle duplicate messages and preserve message order even with request
pipelining—there is no message duplication because the broker ignores duplicate
sequence numbers, and message ordering is preserved because when there are
failures, the producer temporarily constrains to a single message in flight
until sequencing is restored. In the case where idempotence guarantees can’t be
satisfied, the producer raises a fatal error and reject any further sends– so
when configuring the producer for idempotency, the application developer needs
to catch the fatal error and handle it appropriately.
If you don’t configure the producer for idempotency but your business
requirements call for it, you must address the potential for message duplication
and ordering issues in other ways. To handle possible message duplication if
there are transient failures in Confluent Cloud, be sure to build your consumer
application logic to process duplicate messages. To preserve message order while
allowing the resending of failed messages, set the configuration parameter
max.in.flight.requests.per.connection=1
to ensure only one request can be
sent to the broker at a time. To preserve message order while allowing request
pipelining, set the configuration parameter retries=0
if the application can
tolerate some message loss.
Instead of letting the producer retry in sending failed messages on it own, you
can also code the actions for exceptions returned to the producer client (for
example, the onCompletion()
method in the Callback
interface in the Java
client). If you want manual retry handling, disable automatic retries by setting
retries=0
. Producer idempotency tracks message sequence numbers, so it makes
sense only when automatic retries are enabled. Otherwise, if you set retries=0
and the application manually tries to resend a failed message, then it just
generates a new sequence number so the duplication detection won’t work.
Disabling automatic retries can result in message gaps due to individual send
failures, but the broker preserves the order of writes it receives.
Minimum Insync Replicas¶
Confluent Cloud provides durability by replicating data across multiple brokers. Each
partition has a list of assigned replicas (or brokers) that should have copies
of the data. The list of replicas that are “in-sync” with the leader are called
in-sync replicas (ISRs). For each partition, leader brokers automatically
replicate messages to other brokers that are in their ISR list. When a producer
sets acks=all
or acks=-1
, then the configuration parameter
min.insync.replicas
specifies the minimum threshold for the replica count in
the ISR list. If this minimum count can’t be met, then the producer raises an
exception. When used together, min.insync.replicas
and acks
allow you to
enforce greater durability guarantees. A typical scenario would be to create a
topic with replication.factor=3
, topic configuration override
min.insync.replicas=2
, and producer acks=all
, thereby ensuring that the
producer raises an exception if a majority of the replicas don’t receive a
write.
Consumer Offsets and Auto Commit¶
You should also consider what happens to messages if there is an unexpected consumer failure to ensure messages don’t get lost during the processing phase. Consumer offsets track which messages have already been consumed, so how and when consumers commit message offsets is crucial for durability. Try avoiding situations where a consumer commits the offset of a message, starts processing that message, and then unexpectedly fails—this is because the subsequent consumer that starts reading from the same partition won’t reprocess messages with offsets that have already been committed.
By default, offsets are configured to be committed during the consumer’s
poll()
call at a periodic interval, and this is typically good enough for
most use cases. But if the consumer is part of a transactional chain and you
need strong message delivery guarantees, you may want the offsets to be
committed only after the consumer finishes completely processing the messages.
You can configure whether these consumer commits happen automatically or
manually with the configuration parameter enable.auto.commit
. For extra
durability, you may disable the automatic commit by setting
enable.auto.commit=false
and explicitly call one of the commit methods in
the consumer code (for example, commitSync()
or commitAsync()
).
Exactly Once Semantics (EOS)¶
For even stronger guarantees, you can configure your applications for EOS
transactions, which enable atomic writes to many Kafka topics and partitions.
Since some messages in the log may be in various states of a transaction,
consumers can set the configuration parameter isolation.level
to define the
types of messages they should receive. By setting
isolation.level=read_committed
, consumers receive only non-transactional
messages or committed transactional messages, and they won’t receive messages
from open or aborted transactions. To use transactional semantics in a
consume-process-produce
pattern and ensure each message is processed exactly
once, a client application should set enable.auto.commit=false
and commit
offsets manually using the sendOffsetsToTransaction()
method in the
KafkaProducer
interface. You can also enable exactly once for your
event streaming applications by setting the configuration parameter
processing.guarantee
.
Summary of Configurations for Optimizing Durability¶
Producer¶
replication.factor=3
acks=all
(default:all
- default prior to Kafka 3.0:1
)enable.idempotence=true
(default:true
- default prior to Kafka 3.0:false
), to prevent duplicate messages and out-of-order messagesmax.in.flight.requests.per.connection=1
(default5
), to prevent out of order messages when not using an idempotent producer
Consumer¶
enable.auto.commit=false
(defaulttrue
)isolation.level=read_committed
(when using EOS transactions)
Streams¶
StreamsConfig.REPLICATION_FACTOR_CONFIG
:3
(default1
)StreamsConfig.PROCESSING_GUARANTEE_CONFIG
:StreamsConfig.EXACTLY_ONCE
(defaultStreamsConfig.AT_LEAST_ONCE
)- Streams applications have embedded producers and consumers, so also check those configuration recommendations