Optimize Confluent Cloud Clients for Durability

Durability is all about reducing the chance for a message to get lost. Confluent Cloud enforces a replication factor of 3 to ensure data durability.

Some configuration parameters in this page have a range of values. How you set them depends on your requirements and other factors, such as the average message size, number of partitions, and other differences in the environment. So, benchmarking helps to validate the configuration for your application and environment.

Producer acks

Producers can control the durability of messages written to Kafka through the acks configuration parameter. Although you can use the acks parameter in throughput and latency optimization, it is primarily used in the context of durability. To optimize for high durability, set the parameter to acks=all (equivalent to acks=-1), which means the leader waits for the full set of in-sync replicas (ISRs) to acknowledge the message and consider it committed. This provides the strongest available guarantees that the record won’t be lost as long as at least one in-sync replica remains alive. The trade-off is tolerating a higher latency because the leader broker waits for acknowledgments from replicas before responding to the producer.

Duplication and Ordering

Producers can also increase durability by trying to resend messages if any sends fail to ensure that data isn’t lost. The producer automatically tries to resend messages up to the number of times specified by the configuration parameter retries (default MAX_INT) and up to the time duration specified by the configuration parameter delivery.timeout.ms (default 120000), the latter of which was introduced in KIP-91. You can tune delivery.timeout.ms to the desired upper bound for the total time between sending a message and receiving an acknowledgment from the broker, which should reflect business requirements of how long a message is valid for.

There are two things to consider with automatic producer retries:

  1. Duplication: If there are transient failures in Confluent Cloud that cause a producer retry, the producer may send duplicate messages to Confluent Cloud.
  2. Ordering: Multiple send attempts may be “in flight” at the same time, and a retry of a previously failed message send may occur after a newer message send succeeded.

To address both of these, configure the producer for idempotency (enable.idempotence=true) in which the brokers in Confluent Cloud track messages using incrementing sequence numbers, similar to TCP. Idempotent producers can handle duplicate messages and preserve message order even with request pipelining—there is no message duplication because the broker ignores duplicate sequence numbers, and message ordering is preserved because when there are failures, the producer temporarily constrains to a single message in flight until sequencing is restored. In the case where idempotence guarantees can’t be satisfied, the producer raises a fatal error and reject any further sends– so when configuring the producer for idempotency, the application developer needs to catch the fatal error and handle it appropriately.

If you don’t configure the producer for idempotency but your business requirements call for it, you must address the potential for message duplication and ordering issues in other ways. To handle possible message duplication if there are transient failures in Confluent Cloud, be sure to build your consumer application logic to process duplicate messages. To preserve message order while allowing the resending of failed messages, set the configuration parameter max.in.flight.requests.per.connection=1 to ensure only one request can be sent to the broker at a time. To preserve message order while allowing request pipelining, set the configuration parameter retries=0 if the application can tolerate some message loss.

Instead of letting the producer retry in sending failed messages on it own, you can also code the actions for exceptions returned to the producer client (for example, the onCompletion() method in the Callback interface in the Java client). If you want manual retry handling, disable automatic retries by setting retries=0. Producer idempotency tracks message sequence numbers, so it makes sense only when automatic retries are enabled. Otherwise, if you set retries=0 and the application manually tries to resend a failed message, then it just generates a new sequence number so the duplication detection won’t work. Disabling automatic retries can result in message gaps due to individual send failures, but the broker preserves the order of writes it receives.

Minimum Insync Replicas

Confluent Cloud provides durability by replicating data across multiple brokers. Each partition has a list of assigned replicas (or brokers) that should have copies of the data. The list of replicas that are “in-sync” with the leader are called in-sync replicas (ISRs). For each partition, leader brokers automatically replicate messages to other brokers that are in their ISR list. When a producer sets acks=all or acks=-1, then the configuration parameter min.insync.replicas specifies the minimum threshold for the replica count in the ISR list. If this minimum count can’t be met, then the producer raises an exception. When used together, min.insync.replicas and acks allow you to enforce greater durability guarantees. A typical scenario would be to create a topic with replication.factor=3, topic configuration override min.insync.replicas=2, and producer acks=all, thereby ensuring that the producer raises an exception if a majority of the replicas don’t receive a write.

Consumer Offsets and Auto Commit

You should also consider what happens to messages if there is an unexpected consumer failure to ensure messages don’t get lost during the processing phase. Consumer offsets track which messages have already been consumed, so how and when consumers commit message offsets is crucial for durability. Try avoiding situations where a consumer commits the offset of a message, starts processing that message, and then unexpectedly fails—this is because the subsequent consumer that starts reading from the same partition won’t reprocess messages with offsets that have already been committed.

By default, offsets are configured to be committed during the consumer’s poll() call at a periodic interval, and this is typically good enough for most use cases. But if the consumer is part of a transactional chain and you need strong message delivery guarantees, you may want the offsets to be committed only after the consumer finishes completely processing the messages. You can configure whether these consumer commits happen automatically or manually with the configuration parameter enable.auto.commit. For extra durability, you may disable the automatic commit by setting enable.auto.commit=false and explicitly call one of the commit methods in the consumer code (for example, commitSync() or commitAsync()).

Exactly Once Semantics (EOS)

For even stronger guarantees, you can configure your applications for EOS transactions, which enable atomic writes to many Kafka topics and partitions. Since some messages in the log may be in various states of a transaction, consumers can set the configuration parameter isolation.level to define the types of messages they should receive. By setting isolation.level=read_committed, consumers receive only non-transactional messages or committed transactional messages, and they won’t receive messages from open or aborted transactions. To use transactional semantics in a consume-process-produce pattern and ensure each message is processed exactly once, a client application should set enable.auto.commit=false and commit offsets manually using the sendOffsetsToTransaction() method in the KafkaProducer interface. You can also enable exactly once for your event streaming applications by setting the configuration parameter processing.guarantee.

Summary of Configurations for Optimizing Durability

Producer

  • replication.factor=3
  • acks=all (default: all - default prior to Kafka 3.0: 1)
  • enable.idempotence=true (default: true - default prior to Kafka 3.0: false), to prevent duplicate messages and out-of-order messages
  • max.in.flight.requests.per.connection=1 (default 5), to prevent out of order messages when not using an idempotent producer

Consumer

  • enable.auto.commit=false (default true)
  • isolation.level=read_committed (when using EOS transactions)

Streams

  • StreamsConfig.REPLICATION_FACTOR_CONFIG: 3 (default 1)
  • StreamsConfig.PROCESSING_GUARANTEE_CONFIG: StreamsConfig.EXACTLY_ONCE (default StreamsConfig.AT_LEAST_ONCE)
  • Streams applications have embedded producers and consumers, so also check those configuration recommendations