Kafka Client Quick Start for Confluent Cloud¶
You can write Kafka client applications to connect to Confluent Cloud in any supported language. You just need to configure the clients using the Confluent Cloud cluster credentials.
Confluent’s official clients are available for:
- Java
- librdkafka and derived clients, including Golang, .NET, and Python
This document covers how to properly configure a client. If you have already configured a client and are looking for examples on writing a client application, please see our “Hello, World!” code examples that produce to and consume from any Kafka cluster, including Confluent Cloud clusters.
For more information about Kafka clients in general, see Kafka Clients.
Note
All clients that connect to Confluent Cloud must support SASL_PLAIN authentication and TLS 1.2 encryption.
Warning
Confluent recommends that you avoid certificate pinning because it can introduce connection failures. Confluent may change certificates or properties of these certificates at any time. So if the certificate you pinned is replaced, then your applications may fail to connect.
If your organization absolutely requires pinning, then pin to the Let’s Encrypt ISRG Root X1 certificate instead of any subordinate certificate. However, be aware that if Confluent switches CA providers, connections to your applications may be impacted.
Configure clients¶
Clients must be configured properly to connect to a Confluent Cloud cluster. For a subset of client languages, you can also configure clients to connect to a Confluent Cloud Schema Registry cluster.
You can obtain a configuration file prefilled with necessary credentials either from the Confluent Cloud Console or on the Confluent CLI.
Configure clients from the Confluent Cloud Console¶
The easiest way to get started connecting your client apps to Confluent Cloud is to copy-paste the configuration file from the Confluent Cloud Console.
Log in to Confluent Cloud.
Select an environment.
Select a cluster.
Select Clients from the navigation menu.
(Optional) Click + New client button.
Select the language you are using for your client application.
Once you have selected a language, create or use existing Kafka cluster API key and Schema Registry key as needed. Then, copy and paste the displayed configuration into your client application source code.
Tip
For first time users, you can click on Get started with our example project and tutorial right below the displayed configuration, which walks through not only client configuration but also “Hello, World!” client code example that produces to and consumes from a Confluent Cloud cluster.
Configure clients on the Confluent CLI¶
For Confluent CLI frequent users, once you have set up context in the CLI, you can use one-line command confluent kafka client-config create to create a configuration file for connecting your client apps to Confluent Cloud.
The following table lists supported client languages, corresponding language ID, and whether the language supports Confluent Cloud Schema Registry configuration. For languages that support Confluent Cloud Schema Registry configuration, you can optionally configure it for your client apps by passing Schema Registry information via the flags to the command.
Language | Language ID | Support for Confluent Cloud Schema Registry | Examples |
---|---|---|---|
Clojure | clojure |
No | |
C/C++ | cpp |
No | C/C++ examples (librdkafka) |
C# | csharp |
No | |
Go | go |
Yes | confluent-kafka-go/examples |
Groovy | groovy |
No | |
Java | java |
Yes | |
Kotlin | kotlin |
No | |
Ktor | ktor |
Yes | |
Node.js | nodejs |
No | |
Python | python |
Yes | confluent-kafka-python//examples |
REST API | restapi |
Yes | |
Ruby | ruby |
No | |
Rust | rust |
No | |
Scala | scala |
No | |
Spring Boot | springboot |
Yes |
Tip
If you already have CLI context set up, you can skip to create client configuration file step directly.
- Prerequisites:
- Access to Confluent Cloud with an active cluster.
- Install the Confluent CLI.
Log in to your cluster using the confluent login command with the cluster URL specified.
confluent login
Enter your Confluent Cloud credentials: Email: susan@myemail.com Password:
Set the Confluent Cloud environment.
Get the environment ID.
confluent environment list
Your output should resemble:
Id | Name +-------------+--------------------+ * t2703 | default env-m2561 | demo-env-102893 env-vnywz | ccloud-demo env-qzrg2 | data-lineage-demo env-250o2 | my-new-environment
Set the environment using the ID (
<env-id>
).confluent environment use <env-id>
Your output should resemble:
Now using "env-vnywz" as the default (active) environment.
Set the cluster to use.
Get the cluster ID.
confluent kafka cluster list
Your output should resemble:
Id | Name | Type | Provider | Region | Availability | Status +-------------+-----------+-------+----------+----------+--------------+--------+ lkc-oymmj | cluster_1 | BASIC | gcp | us-east4 | single-zone | UP * lkc-7k6kj | cluster_0 | BASIC | gcp | us-east1 | single-zone | UP
Set the cluster using the ID (
<cluster-id>
). This is the cluster where the commands are run.confluent kafka cluster use <cluster-id>
To verify the selected cluster after setting it, type
confluent kafka cluster list
again. The selected cluster will have an asterisk (*
) next to it.
Create an API key and secret, and save them.
You can generate the API key on the Confluent CLI or from the Confluent Cloud Console. Be sure to save the API key and secret.
Run the following command to create the API key and secret, using the ID (
<cluster-id>
).confluent api-key create --resource <cluster-id>
Your output should resemble:
It may take a couple of minutes for the API key to be ready. Save the API key and secret. The secret is not retrievable later. +---------+------------------------------------------------------------------+ | API Key | ABC123xyz | | Secret | 123xyzABC123xyzABC123xyzABC123xyzABC123xyzABC123xyzABC123xyzABCx | +---------+------------------------------------------------------------------+
For more information, see Resource-specific API keys.
In the console, click the Kafka API keys tab and click Create key. Save the key and secret, then click the checkbox next to I have saved my API key and secret and am ready to continue.
Add the API secret with
confluent api-key store <key> <secret>
. When you create an API key with the CLI, it is automatically stored locally. However, when you create an API key using the console, API, or with the CLI on another machine, the secret is not available for CLI use until you store it. This is required because secrets cannot be retrieved after creation.confluent api-key store <api-key> <api-secret> --resource <cluster-id>
For more information, see Resource-specific API keys.
Set the API key to use for Confluent CLI commands, using the ID (
<cluster-id>
).confluent api-key use <api-key> --resource <cluster-id>
(Optional) Enable Confluent Cloud Schema Registry and create an API key and secret for Schema Registry.
Important
If you are creating a client configuration file for a language that does NOT support Schema Registry configuration, you do not have to enable Schema Registry. Otherwise, it is recommended that you enable Schema Registry and configure it, although this is not required.
To check whether the language of your choice supports Schema Registry configuration, see Client Language Table.
You can enable Confluent Cloud Schema Registry and create an API key and secret on the Confluent CLI or from the Confluent Cloud Console. Be sure to save the API key and secret.
Enable Schema Registry with cloud provider and geography of your choice.
confluent schema-registry cluster enable --cloud <cloud-provider> --geo <geography>
You output should resemble:
+--------------+--------------------------------------------------+ | Id | lsrc-zxvj3 | | Endpoint URL | https://psrc-vrpp5.us-east-2.gcp.confluent.cloud | +--------------+--------------------------------------------------+
Create the API key and secret for Schema Registry, using the ID (
<sr-id>
).confluent api-key create --resource <sr-id>
You output should resemble:
It may take a couple of minutes for the API key to be ready. Save the API key and secret. The secret is not retrievable later. +---------+------------------------------------------------------------------+ | API Key | ABC123xyz | | Secret | 123xyzABC123xyzABC123xyzABC123xyzABC123xyzABC123xyzABC123xyzABCx | +---------+------------------------------------------------------------------+
For more information on enabling Schema Registry, see confluent schema-registry cluster enable. For more information on creating API key and secret, see Resource-specific API keys.
On the web UI, select an environment. Then click the Schema Registry tab and click Set up on my own.
Choose a cloud provider and geography. Review your settings and click Continue to enable Schema Registry for the environment.
Within the Schema Registry tab, click Edit on the API credentials card. Then click Create key.
Save the key and secret, then click the checkbox next to I have saved my API key and secret and am ready to continue.
For more information, see Quick Start for Schema Management on Confluent Cloud.
Create a client configuration file for the language of your choice, using language ID (
<language-id>
). Then, copy and paste the displayed configuration into your client application source code.See Client Language Table for a list of language IDs and whether the language supports Schema Registry configuration.
Tip
For the output of command
confluent kafka client-config create <language-id>
, the client configuration file is printed to stdout and the errors or warnings are printed to stderr.You can redirect the command output to separate files by doing the following:
confluent kafka client-config create <language-id> 1> config-file.config 2> errors-warnings-file.err
Tip
If you do not want to use the CLI context, you can alternatively pass all the necessary information via flags:
confluent kafka client-config create <language-id> \ --environment <env-id> \ --cluster <cluster-id> \ --api-key <api-key> --api-secret <api-secret> \ --sr-apikey <sr-apikey> --sr-apisecret <sr-apisecret> # only for languages that support Schema Registry configuration
For languages that do NOT support Schema Registry configuration, run the following command:
confluent kafka client-config create <language-id>
For languages that support Schema Registry configuration, run the following command:
confluent kafka client-config create <language-id> \ --sr-apikey <sr-apikey> \ --sr-apisecret <sr-apisecret>
For more information, see confluent kafka client-config create.
JVM settings for Java clients¶
There are two recommended JVM settings for Java clients when interacting with Confluent Cloud:
JVM Security configuration
java.security.Security.setProperty(“networkaddress.cache.ttl” , “30"); java.security.Security.setProperty(“networkaddress.cache.negative.ttl” , “0");
Kafka Producer and Consumer configuration
consumer.client.dns.lookup="use_all_dns_ips" producer.client.dns.lookup="use_all_dns_ips"
Configure clients for cluster rolls¶
Confluent Cloud regularly rolls all clusters for upgrades and maintenance. Rolling a cluster means updating all the brokers that make up that cluster one at a time, so that the cluster remains fully available and performant throughout the update. The Kafka protocol and architecture are designed for exactly this type of highly-available, fault-tolerant operation, so correctly configured clients will gracefully handle the broker changes that happen during a roll.
During a cluster roll clients may encounter the following retriable exceptions, which will generate warnings on correctly-configured clients:
UNKNOWN_TOPIC_OR_PARTITION: "This server does not host this topic-partition."
LEADER_NOT_AVAILABLE: "There is no leader for this topic-partition as we are in the middle of a leadership election."
NOT_LEADER_FOR_PARTITION: "This server is not the leader for that topic-partition."
NOT_ENOUGH_REPLICAS: "Messages are rejected since there are fewer in-sync replicas than required."
NOT_ENOUGH_REPLICAS_AFTER_APPEND: "Messages are written to the log, but to fewer in-sync replicas than required."
By default, Kafka producer clients will retry for 2 minutes, print these warnings to logs, and recover without any intervention. Consumer and admin clients default to retrying for 1 minute.
If clients are configured with insufficient retries or retry-time, the exceptions above will be logged as errors.
If clients run out of memory buffer space while retrying, and also run out of time while the client blocks waiting for memory, timeout exceptions will occur.
Recommendations¶
We do not recommend triggering internal alerts on the retriable warnings listed above, because they will occur regularly as part of normal operations and will be gracefully handled by correctly-configured clients without disruption to your streaming applications. Instead, we recommend limiting alerts to client errors that cannot be automatically retried.
For additional recommendations on how to architect, monitor, and optimize your Kafka applications on Confluent Cloud, refer to Build Kafka Client Applications on Confluent Cloud.