Client Configuration Settings for Confluent Cloud¶
The following sections provide expert recommendations for configuring Apache Kafka® producers and consumers for Java and librdkafka clients. The following best practices are designed to optimize the performance and reliability of your client applications and help you leverage Kafka’s advanced features and capabilities to their fullest potential. Whether you’re a seasoned developer or just getting started with Kafka, the following information provides valuable insights to help you configure robust, resilient, and scalable producers and consumers for streaming applications.
To learn more about producers and consumers see, Kafka Producer for Confluent Cloud and Kafka Consumer for Confluent Cloud.
Recommendations¶
Consider the following client configuration recommendations:
- Always use current, supported clients. Current clients contain bug fixes and default settings tuned to allow clients to gracefully handle warnings without disrupting your streaming applications. For more information, see Client versions and support.
- Rely on the existing retry logic to resolve retriable errors and warnings.
- Trigger alerts on actual errors, not retriable errors and warnings. The warnings listed in Cluster upgrades and error handling occur regularly as part of normal cluster operations and may not have any impact on your workload.
For additional recommendations on how to architect, monitor, and optimize your Kafka applications on Confluent Cloud, refer to Build Kafka Client Applications on Confluent Cloud.
JVM settings for Java clients¶
There are two recommended JVM settings for Java clients when interacting with Confluent Cloud:
JVM Security configuration
java.security.Security.setProperty(“networkaddress.cache.ttl” , “30"); java.security.Security.setProperty(“networkaddress.cache.negative.ttl” , “0");
Kafka Producer and Consumer configuration
consumer.client.dns.lookup="use_all_dns_ips" producer.client.dns.lookup="use_all_dns_ips"
Cluster upgrades and error handling¶
Confluent Cloud regularly updates clusters to perform upgrades and maintenance. During this process, Confluent performs rolling restarts of all the brokers in a cluster. The Kafka protocol and architecture are designed for this type of highly-available, fault-tolerant operation. To ensure seamless client handling of cluster updates, you must configure your clients using current client libraries.
Confluent recommends you use the strategies for error handling outlined below. During normal cluster operations that use a rolling restart, clients may encounter the following warning exceptions:
UNKNOWN_TOPIC_OR_PARTITION: "This server does not host this topic-partition."
LEADER_NOT_AVAILABLE: "There is no leader for this topic-partition as we are in the middle of a leadership election."
NOT_COORDINATOR: "This is not the correct coordinator."
NOT_ENOUGH_REPLICAS: "Messages are rejected since there are fewer in-sync replicas than required."
NOT_ENOUGH_REPLICAS_AFTER_APPEND: "Messages are written to the log, but to fewer in-sync replicas than required."
NOT_LEADER_OR_FOLLOWER: "This server is not the leader for the given partition."
The following message is what a client would log at WARN
level should it attempts to connect to a broker
that is just restarted (in the context of a maintenance):
"Connection to node {} ({}) terminated during authentication. This may happen
due to any of the following reasons: (1) Authentication failed due to invalid
credentials with brokers older than 1.0.0, (2) Firewall blocking Kafka TLS
traffic (eg it may only allow HTTPS traffic), (3) Transient network issue."
Configure clients with a sufficient number of retries or retry time to prevent these warning exceptions from getting logged as errors.
- By default, Kafka producer clients retry for two minutes, print these warnings to logs, and recover without any intervention.
- By default, Kafka consumer and admin clients retry for one minute.
Timeout exceptions will occur if clients run out of memory buffer space while retrying or if clients run out of time while waiting for memory.
In general, planning for volatility is a basic tenet of building cloud-native client applications. In addition to normal cluster operations, brokers may disappear for a variety of reasons, such as issues with the underlying infrastructure at the cloud-provider layer. For more information, see Cloud-native applications.
Client configuration properties¶
Client configuration properties for an Apache Kafka® Producer or Consumer determine how the client interacts with a Kafka cluster. You can tweak several default configuration property settings to achieve better performance based on the workload. This document will help you understand how to configure your Kafka Producer and Consumer clients to optimize client performance based on your workload.
Why tuning client configurations is important¶
Kafka client configurations provide flexibility and control over various aspects of the client’s behavior, performance, security, and reliability. Properly tuning these configurations helps optimize the client’s interactions with the Kafka cluster and ensures efficient message processing. The following are two specific areas where ensuring correct settings positively impacts the workload:
- Performance: Client configurations can be adjusted to optimize performance. Adjusting properties that control batching, compression, linger, and prefetch can significantly impact client throughput, latency, and resource utilization.
- Error handling: Kafka clients need to handle errors with retries, or fail gracefully until a solution can be implemented to resolve the error. Ensuring the configuration is correct can enhance workload resilience and ensure reliability for mission-critical applications.
Configuration categories¶
Client configuration properties are grouped into the following configuration categories:
- Connection and network properties: A Kafka client must establish a connection with Confluent clusters to produce and consume messages. This category includes settings for bootstrap servers, connection timeout, and network buffer sizes. Optimizing these settings can ensure reliable and efficient communication between the client and the Kafka cluster.
- Security and authentication properties: Kafka supports various security mechanisms, such as SSL/TLS encryption, SASL authentication, and authorization using Access Control Lists (ACLs). This category includes security-related settings, such as SSL certificates, authentication protocols, and user credentials. Properly configuring security settings ensures the confidentiality, integrity, and authenticity of the communication between clients and the Kafka cluster.
- Message processing properties: Kafka clients can process messages in
various ways, such as consuming messages from specific topics, committing
message offsets, or specifying how to handle message errors. This category
includes
max.poll.records
,auto.commit.interval.ms
,acks
, and several others. Fine-tuning these property settings may improve client throughput, fault tolerance, and processing guarantees.
Configuration properties¶
The following tables provide several important configuration properties for Java and librdkafka clients. For a complete listing of configuration properties, see the following documentation:
Before you modify properties¶
Before you start modifying client configuration properties to find out if you can tweak client performance, be sure to complete the following steps.
- Verify your client is using default configuration properties. Someone may have changed configuration properties from their default settings.
- Update your client to the latest supported version available. Default configuration property settings are optimized in later clients. For more information, see Client versions and support.
Important
When modifying configuration properties, monitor the impact on your system and ensure it behaves as expected. Always test any changes in a staging or pre-production environment before rolling them out to production.
Common properties¶
The following table provides several common configuration properties for Producers and Consumers that you can review for potential modification.
Configuration property | Java default | librdkafka default | Notes |
---|---|---|---|
client.id |
empty string | rdkafka | You should set the client.id to something meaningful in your
application, especially if you are running multiple clients or want to
easily trace logs or activities to specific client instances. |
connections.max.idle.ms |
540000 ms (9 min) | See librdkafka socket.timeout.ms |
You can change this when an intermediate load balancer disconnects idle connections after inactivity. For example: AWS 350 seconds, Azure 4 minutes, Google Cloud 10 minutes. |
sasl.kerberos.service.name |
null | kafka | Changing the default service name will cause issues for those who don’t have it configured. |
socket.connection.setup.timeout.max.ms |
30000 ms (30 sec) | not available | librdkafka doesn’t have exponential backoff for this timeout. |
socket.connection.setup.timeout.ms |
10000 ms (10 sec) | 30000 ms (30 sec) | librdkafka doesn’t have exponential backoff for this timeout. |
metadata.max.age.ms |
300000 ms (5 min) | 900000 ms (15 min) | librdkafka has the topic.metadata.refresh.interval.ms property that
defaults to 300000 milliseconds (5 minutes). |
reconnect.backoff.max.ms |
1000 ms (1 sec) | 10000 ms (10 sec) | |
reconnect.backoff.ms |
50 ms | 100 ms | |
max.in.flight.requests.per.connection |
5 | 1000000 | librdkafka produces to a single partition per batch, setting it to 5 limits producing to 5 partitions per broker. |
Producer properties¶
The following table provides a few configuration properties for Producers that you can review for potential modification.
Configuration property | Java default | librdkafka default | Notes |
---|---|---|---|
batch.size |
16384 | 1000000 | |
delivery.timeout.ms |
120000 ms (2 min) | 300000 ms (5 min) | |
linger.ms |
0 ms | 5 ms | librdkafka linger.ms reduces the number of in-flight Produce requests
and increases batching (see max.in.flight.requests.per.connection ) |
enable.idempotence |
true | false | Enabling idempotence sets max.in.flight.requests.per.connection to
5 (see max.in.flight.requests.per.connection ) |
partitioner |
murmur2_random (default Kafka partitioner) | consistent_random | Changing the default partitioner causes the client to send keyed messages to different partitions. If both a librdkafka-based and a Java Client are producing to the same topic, change this property to murmur2_random for the librdkafka client so that messages with the same key are sent to the same partition. |
Consumer properties¶
The following table provides a few configuration properties for Consumers that you can review for potential modification.
Configuration property | Java default | librdkafka default | Notes |
---|---|---|---|
allow.auto.create.topics |
true | false | |
isolation.level |
read_uncommitted | read_committed | |
partition.assignment.strategy |
RangeAssignor, CooperativeStickyAssignor | range, roundrobin | Online upgrade from eager to cooperative assignor is not supported in librdkafka. |
check.crcs |
true | false | Record checksum validation comes at slightly increased CPU usage. Checksum is also present at the IPv4 and TCP layers. Other types of checks could be available at disk sector (ECC) or file system level (not in ext4 by default). |
OpenId Connect (OIDC) and token retry behavior¶
The OIDC retry behavior handles operations such as obtaining new authentication tokens or refreshing them when failures happen. Before modifying configuration properties related to this behavior, read this section to understand how retry behavior works for your client.
Java Client¶
The token refresh process begins when a credential’s lifetime has exceeded a
specified percentage. This percentage is 80% by default, but you can configure a
different value by implementing the
org.apache.kafka.common.security.oauthbearer.OAuthBearerToken
interface in a
custom class and specifying the token lifetime.
If an authentication error occurs during the token refresh process, the client waits 10 seconds before retrying the token refresh. You cannot configure the wait time before the refresh retry.
The refresh process for the Java Client is unlimited. The token refresh process continues to retry until it succeeds, or the application closes. The underlying HTTP request to fetch the token from the identity provider (IdP) uses an exponential retry delay mechanism. This retry delay mechanism doubles after each failed attempt.
By default, the retry starts at 100 ms and increase up to a maximum of 10000ms.
You can configure alternatives values if you like by configuring
sasl.oauthbearer.jwks.endpoint.retry.backoff.ms
and
sasl.oauthbearer.jwks.endpoint.retry.backoff.max.ms
.
Schema Registry Java Client¶
The retry behavior for this client is identical to the Java Client. By default,
the retry starts at 100ms and increase up to a maximum of 10000ms. You can
configure alternatives values if you like by configuring
sasl.oauthbearer.jwks.endpoint.retry.backoff.ms
and
sasl.oauthbearer.jwks.endpoint.retry.backoff.max.ms
.
Node.js Client for Kafka¶
You cannot alter the default behavior for token refresh and you cannot configure retries for Node.js Client for Kafka. Token refresh begins when the token lifetime reaches 80%. If token refresh fails due to any reason, the refresh process logs an error via the logger, and retries the token refresh after 10 seconds. If all attempts to refresh a token fail and the token expires, then produce and fetch operations start failing also.
If you would like to add custom behavior, for example, jitter, or different timeouts, you can write an application-side token refresh callback. The refresh process uses this callback at 80% of a token’s lifetime.
The token refresh process begins when a credential’s lifetime reaches 80%. The token refresh can fail due to one of these error codes:
Code | Description |
---|---|
408 |
Request timeout |
425 |
Too early |
500 |
Internal server error |
502 |
Bad gateway |
503 |
Service unavailable |
504 |
Gateway timeout |
To see the errors in the source go here. If any of these errors occur, the retry process attempts four retries with backoff (5s, 10s, 15s, 20s), with no error logging.
If the process returns a different error code, or if all the retries fail, then the process logs an error and repeats the same process with a linear backoff (1+4) tries after 10s. This process continues until retry succeeds or the token expires. If all attempts to refresh a token fail and the token expires, produce and fetch operations begin failing also.
If you would like to add custom behavior, for example, jitter, or different timeouts, you can write an application-side token refresh callback. It replaces the process of fetching the credentials and the 1+4 tries. The library triggers the custom callback at 80% of token expiry duration, and in case the callback signals failure, it is re-triggered after 10s.
Schema Registry Node.js Client¶
The token refresh process begins 30 minutes before a token expires.
librdkafka derived (non-Java) clients¶
For these clients, the token refresh process is identical to the Node.js Client for Kafka.
For additional information about optimizing and tuning clients, see Optimize and Tune Confluent Cloud Clients.