Configure Java Clients for OAuth/OIDC on Confluent Cloud¶
Prerequisites¶
Before you begin, ensure you have the following:
Language and tooling¶
- Apache Kafka client: 4.0.0 or later.
- Confluent Platform: 7.2.1 or later; 7.1.3 or later.
Confluent Cloud environment¶
- OAuth setup in Confluent Cloud - Configure OAuth authentication.
Credentials and identifiers¶
- Client ID - Your application’s identifier (like a username)
- Client secret - Your application’s password for OAuth
- Token endpoint URL - Where to request access tokens
- Scopes - Permissions your application needs (for example,
kafka:read kafka:write
) - Kafka cluster ID (
lkc-xxxxx
) - Your Kafka cluster identifier - Identity pool ID (
pool-xxxxx
) - If using identity pools - Schema Registry logical cluster ID (
lsrc-xxxxx
) - For Schema Registry operations
Client library¶
- Add the Confluent OAuth client library to your project dependencies.
Note
Dependency Management Best Practice: For production applications, consider using Maven properties or the Confluent Platform BOM (Bill of Materials) to manage dependency versions consistently. This ensures all Confluent Cloud components use compatible versions.
Maven properties example:
<properties>
<java.version>11</java.version>
<confluent.version>7.6.1</confluent.version>
</properties>
Configure Kafka Java clients¶
The Kafka client initiates authentication by requesting a token from the configured OAuth provider using client credentials. Once obtained, this token is sent to the Kafka broker with each connection request. The broker validates the token’s authenticity, expiration, and permissions before granting access.
The implementation class is:
org.apache.kafka.common.security.oauthbearer.OAuthBearerLoginCallbackHandler
and accepts instances of the following:
org.apache.kafka.common.security.oauthbearer.OAuthBearerTokenCallback
org.apache.kafka.common.security.auth.SaslExtensionsCallback
The fully-qualified class name is provided to the client’s configuration of
sasl.login.callback.handler.class
.
Because the HTTP call made to the OAuth 2.0 provider might time out or transiently fail, a retry mechanism waits between attempts. The number of attempts made (including the first attempt) are variable as it uses an exponential backoff approach:
- Immediately, an attempt to connect to the HTTP endpoint is initiated.
- If the first attempt fails, a second attempt is initiated after the configurable
duration of
milliseconds–sasl.login.retry.backoff.ms
. - If the second attempt fails, the duration is doubled before a third attempt.
This pattern repeats as required until the maximum duration of sasl.login.retry.backoff.max.ms
.
Configuration approach¶
Java clients use the JAAS (Java Authentication and Authorization Service) framework
for SASL authentication. For the OAUTHBEARER mechanism, the recommended approach is
to configure the client by setting the sasl.oauthbearer.*
properties directly
in your Properties
object, as shown in the example below. This avoids creating
complex JAAS configuration strings manually.
JAAS configuration (sensitive data)¶
- OAuth client credentials (client ID, secret)
- Token endpoint URL
- Confluent Cloud cluster and identity pool IDs
Top-level configuration (general settings)¶
- SASL mechanism and protocol
- Callback handler class
- Timeout and retry settings
Configuration example¶
Add these OAuth-specific settings to your existing Kafka configuration:
Properties props = new Properties();
// Basic Kafka configuration
props.put("bootstrap.servers", "your-bootstrap-server:9092");
props.put("security.protocol", "SASL_SSL");
props.put("sasl.mechanism", "OAUTHBEARER");
props.put("sasl.login.callback.handler.class",
"io.confluent.kafka.clients.oauth.OauthLoginCallbackHandler");
// OAuth configuration
props.put("sasl.oauthbearer.token.endpoint.url", "https://your-idp.com/oauth2/token");
props.put("sasl.oauthbearer.client.id", "your-client-id");
props.put("sasl.oauthbearer.client.secret", "your-client-secret");
props.put("sasl.oauthbearer.scope", "kafka:read kafka:write");
props.put("sasl.oauthbearer.extensions", "logical_cluster=lkc-12345,identity_pool_id=pool-abcde");
// Timeout and retry settings
props.put("sasl.login.connect.timeout.ms", "10000");
props.put("sasl.login.read.timeout.ms", "10000");
props.put("sasl.login.retry.backoff.ms", "100");
props.put("sasl.login.retry.backoff.max.ms", "10000");
Note
JAAS Configuration Alternatives: For better readability, you can also construct the JAAS configuration using Java code:
Using String.join() (Java 8+):
String jaasConfig = String.join(" ",
"org.apache.kafka.common.security.oauthbearer.OAuthBearerLoginModule required",
"oauth.token.endpoint.uri=\"" + tokenEndpoint + "\"",
"oauth.client.id=\"" + clientId + "\"",
"oauth.client.secret=\"" + clientSecret + "\"",
"oauth.scope=\"" + scope + "\"",
"oauth.logical.cluster=\"" + logicalCluster + "\"",
"oauth.identity.pool.id=\"" + identityPoolId + "\";"
);
Using Text Blocks (Java 15+):
String jaasConfig = """
org.apache.kafka.common.security.oauthbearer.OAuthBearerLoginModule required \
oauth.token.endpoint.uri="%s" \
oauth.client.id="%s" \
oauth.client.secret="%s" \
oauth.scope="%s" \
oauth.logical.cluster="%s" \
oauth.identity.pool.id="%s";
""".formatted(tokenEndpoint, clientId, clientSecret, scope, logicalCluster, identityPoolId);
Test your configuration¶
Enable debug logging to troubleshoot OAuth issues:
# Add to your configuration log4j.logger.org.apache.kafka.clients.NetworkClient=DEBUG log4j.logger.org.apache.kafka.common.security.oauthbearer=DEBUG log4j.logger.io.confluent.kafka.clients.oauth=DEBUG
Test with a simple producer:
// Test producer KafkaProducer<String, String> producer = new KafkaProducer<>(props); ProducerRecord<String, String> record = new ProducerRecord<>("test-topic", "test-message"); producer.send(record, (metadata, exception) -> { if (exception == null) { System.out.println("Message sent successfully - OAuth is working!"); } else { System.err.println("Error sending message: " + exception.getMessage()); } producer.close(); });
Check for common errors:
- “SASL authentication failed” - Check your OAuth credentials and endpoint
- “Invalid token” - Verify your OAuth configuration is correct
- “Connection timeout” - Check your bootstrap servers and network connectivity
Verify token refresh - The client should automatically refresh tokens when they expire
Configure Schema Registry Java clients¶
The Schema Registry Java client module includes support for the OpenID Connect (OIDC) authentication protocol and OAuth 2.0. This enables these Java clients to use token credentials to authenticate with Confluent Cloud Schema Registry.
To obtain the OAuth token, Schema Registry clients use the client credentials grant. The RFC client credential grant flow for obtaining authorization looks like this:
- The Schema Registry client makes a request to a public OIDC server or token server (for example, Okta), using the client ID and secret to authenticate with it.
- The OIDC server returns a token that the Schema Registry client can use to talk to the cloud Schema Registry server.
The logic of fetching tokens and providing them to HTTP headers in Schema Registry clients is
done by the BearerAuthCredentialProvider
(Token) Classes.
Configure a Schema Registry client to use a standard OAuth authorization server and protocol¶
You can authenticate Schema Registry clients to a public OAuth 2.0 server by either adding the required configuration settings directly to the clients or by enabling the clients to inherit the settings.
To add OAuth authentication to Schema Registry, you must add the following configuration settings to your Schema Registry client:
Properties srConfig = new Properties();
// Schema Registry OAuth configuration
srConfig.put("bearer.auth.credentials.source", "OAUTHBEARER");
srConfig.put("bearer.auth.issuer.endpoint.url", "https://your-idp.com/oauth2/token");
srConfig.put("bearer.auth.client.id", "your-client-id");
srConfig.put("bearer.auth.client.secret", "your-client-secret");
srConfig.put("bearer.auth.scope", "schema:read schema:write");
srConfig.put("bearer.auth.logical.cluster", "lsrc-xxxxx");
srConfig.put("bearer.auth.identity.pool.id", "pool-xxxxx");
To inherit the OAuth configuration settings from your Kafka configuration, add the following configuration settings to your Schema Registry client:
Properties srConfig = new Properties();
// Inherit OAuth configuration from Kafka client
srConfig.put("bearer.auth.credentials.source", "SASL_OAUTHBEARER_INHERIT");
srConfig.put("bearer.auth.logical.cluster", "lsrc-xxxxx");
srConfig.put("bearer.auth.identity.pool.id", "pool-xxxxx");
Configure a Schema Registry client to use a custom token provider and OAuth implementation¶
For custom OAuth implementations, use the following configuration:
bearer.auth.credentials.source=CUSTOM
bearer.auth.custom.provider.class=<fully-qualified-class-name>
bearer.auth.logical.cluster=<lsrc-resource-id>
bearer.auth.identity.pool.id=<identity-pool-id>
The last two configurations (bearer.auth.logical.cluster
and bearer.auth.identity.pool.id
) shown above
are optional for other use cases, but required for Confluent Cloud Schema Registry.
The configurations above are for a Kafka producer. Other types of Schema Registry clients need
appropriate prefixes. For example, KSQL would start with ksql.schema.registry
.
Note
Resource Management Best Practice: When using Schema Registry clients, always use try-with-resources to ensure proper cleanup. The SchemaRegistryClient
interface implements java.io.Closeable
, so it can be used in a try-with-resources block for automatic cleanup:
try (SchemaRegistryClient schemaRegistryClient = new CachedSchemaRegistryClient(
"https://your-schema-registry-endpoint",
100, // cache capacity
srConfig
)) {
System.out.println("Schema Registry client configured successfully with OAuth.");
// Use the client...
} catch (Exception e) {
System.err.println("Error configuring Schema Registry client: " + e.getMessage());
}
Custom OAuth implementations¶
For advanced use cases, implement custom token providers:
public class CustomTokenProvider implements BearerAuthCredentialProvider {
private final String tokenEndpoint;
private final String clientId;
private final String clientSecret;
private final Map<String, String> additionalHeaders;
public CustomTokenProvider(String tokenEndpoint, String clientId,
String clientSecret, Map<String, String> additionalHeaders) {
this.tokenEndpoint = tokenEndpoint;
this.clientId = clientId;
this.clientSecret = clientSecret;
this.additionalHeaders = additionalHeaders;
}
@Override
public String getBearerToken(URL url) throws IOException {
// Custom token retrieval logic
HttpURLConnection connection = (HttpURLConnection) new URL(tokenEndpoint).openConnection();
connection.setRequestMethod("POST");
connection.setRequestProperty("Content-Type", "application/x-www-form-urlencoded");
// Add custom headers
if (additionalHeaders != null) {
for (Map.Entry<String, String> header : additionalHeaders.entrySet()) {
connection.setRequestProperty(header.getKey(), header.getValue());
}
}
// Set up authentication
String auth = clientId + ":" + clientSecret;
String encodedAuth = Base64.getEncoder().encodeToString(auth.getBytes());
connection.setRequestProperty("Authorization", "Basic " + encodedAuth);
// Send request
String postData = "grant_type=client_credentials";
connection.setDoOutput(true);
try (OutputStream os = connection.getOutputStream()) {
os.write(postData.getBytes());
}
// Parse response
try (BufferedReader br = new BufferedReader(
new InputStreamReader(connection.getInputStream()))) {
StringBuilder response = new StringBuilder();
String line;
while ((line = br.readLine()) != null) {
response.append(line);
}
// Parse JSON response
ObjectMapper mapper = new ObjectMapper();
JsonNode jsonNode = mapper.readTree(response.toString());
return jsonNode.get("access_token").asText();
}
}
}
Google OIDC integration¶
For Google OIDC integration with Java clients:
// Google OIDC configuration
Properties props = new Properties();
props.put("bootstrap.servers", "your-bootstrap-server:9092");
props.put("security.protocol", "SASL_SSL");
props.put("sasl.mechanism", "OAUTHBEARER");
props.put("sasl.login.callback.handler.class",
"io.confluent.kafka.clients.oauth.OauthLoginCallbackHandler");
props.put("sasl.oauthbearer.token.endpoint.url", "https://oauth2.googleapis.com/token");
props.put("sasl.oauthbearer.client.id", "your-google-client-id");
props.put("sasl.oauthbearer.client.secret", "your-google-client-secret");
props.put("sasl.oauthbearer.scope", "https://www.googleapis.com/auth/cloud-platform");
props.put("sasl.oauthbearer.extensions", "logical_cluster=lkc-xxxxx,identity_pool_id=pool-xxxxx");
// Create producer with OAuth configuration
KafkaProducer<String, String> producer = new KafkaProducer<>(props);
Troubleshoot Java OAuth clients¶
Common issues and solutions for Java OAuth clients:
Authentication failures¶
- Verify client ID and secret are correct
- Check token endpoint URL is accessible
- Ensure logical cluster ID is valid
- Validate identity pool ID if used
Network issues¶
- Confirm network connectivity to OAuth provider
- Check firewall rules allow OAuth traffic
- Verify SSL certificate validation
Configuration issues¶
- Ensure all required parameters are provided
- Validate OAuth configuration syntax
- Check timeout values are reasonable
Debug logging¶
Enable debug logging for OAuth troubleshooting:
# Enable OAuth debug logging
log4j.logger.org.apache.kafka.common.security.oauthbearer=DEBUG
log4j.logger.io.confluent.kafka.clients.oauth=DEBUG
log4j.logger.org.apache.kafka.clients.NetworkClient=DEBUG