Configure Java Clients for OAuth/OIDC on Confluent Cloud

Prerequisites

Before you begin, ensure you have the following:

Language and tooling

Confluent Cloud environment

  • OAuth setup in Confluent Cloud - Configure OAuth authentication.

Credentials and identifiers

  • Client ID - Your application’s identifier (like a username)
  • Client secret - Your application’s password for OAuth
  • Token endpoint URL - Where to request access tokens
  • Scopes - Permissions your application needs (for example, kafka:read kafka:write)
  • Kafka cluster ID (lkc-xxxxx) - Your Kafka cluster identifier
  • Identity pool ID (pool-xxxxx) - If using identity pools
  • Schema Registry logical cluster ID (lsrc-xxxxx) - For Schema Registry operations

Client library

  • Add the Confluent OAuth client library to your project dependencies.

Note

Dependency Management Best Practice: For production applications, consider using Maven properties or the Confluent Platform BOM (Bill of Materials) to manage dependency versions consistently. This ensures all Confluent Cloud components use compatible versions.

Maven properties example:

<properties>
    <java.version>11</java.version>
    <confluent.version>7.6.1</confluent.version>
</properties>

Configure Kafka Java clients

The Kafka client initiates authentication by requesting a token from the configured OAuth provider using client credentials. Once obtained, this token is sent to the Kafka broker with each connection request. The broker validates the token’s authenticity, expiration, and permissions before granting access.

The implementation class is:

org.apache.kafka.common.security.oauthbearer.OAuthBearerLoginCallbackHandler

and accepts instances of the following:

org.apache.kafka.common.security.oauthbearer.OAuthBearerTokenCallback
org.apache.kafka.common.security.auth.SaslExtensionsCallback

The fully-qualified class name is provided to the client’s configuration of sasl.login.callback.handler.class.

Because the HTTP call made to the OAuth 2.0 provider might time out or transiently fail, a retry mechanism waits between attempts. The number of attempts made (including the first attempt) are variable as it uses an exponential backoff approach:

  1. Immediately, an attempt to connect to the HTTP endpoint is initiated.
  2. If the first attempt fails, a second attempt is initiated after the configurable duration of milliseconds–sasl.login.retry.backoff.ms.
  3. If the second attempt fails, the duration is doubled before a third attempt.

This pattern repeats as required until the maximum duration of sasl.login.retry.backoff.max.ms.

Configuration approach

Java clients use the JAAS (Java Authentication and Authorization Service) framework for SASL authentication. For the OAUTHBEARER mechanism, the recommended approach is to configure the client by setting the sasl.oauthbearer.* properties directly in your Properties object, as shown in the example below. This avoids creating complex JAAS configuration strings manually.

JAAS configuration (sensitive data)

  • OAuth client credentials (client ID, secret)
  • Token endpoint URL
  • Confluent Cloud cluster and identity pool IDs

Top-level configuration (general settings)

  • SASL mechanism and protocol
  • Callback handler class
  • Timeout and retry settings

Configuration example

Add these OAuth-specific settings to your existing Kafka configuration:

Properties props = new Properties();

// Basic Kafka configuration
props.put("bootstrap.servers", "your-bootstrap-server:9092");
props.put("security.protocol", "SASL_SSL");
props.put("sasl.mechanism", "OAUTHBEARER");
props.put("sasl.login.callback.handler.class",
    "io.confluent.kafka.clients.oauth.OauthLoginCallbackHandler");

// OAuth configuration
props.put("sasl.oauthbearer.token.endpoint.url", "https://your-idp.com/oauth2/token");
props.put("sasl.oauthbearer.client.id", "your-client-id");
props.put("sasl.oauthbearer.client.secret", "your-client-secret");
props.put("sasl.oauthbearer.scope", "kafka:read kafka:write");
props.put("sasl.oauthbearer.extensions", "logical_cluster=lkc-12345,identity_pool_id=pool-abcde");

// Timeout and retry settings
props.put("sasl.login.connect.timeout.ms", "10000");
props.put("sasl.login.read.timeout.ms", "10000");
props.put("sasl.login.retry.backoff.ms", "100");
props.put("sasl.login.retry.backoff.max.ms", "10000");

Note

JAAS Configuration Alternatives: For better readability, you can also construct the JAAS configuration using Java code:

Using String.join() (Java 8+):

String jaasConfig = String.join(" ",
    "org.apache.kafka.common.security.oauthbearer.OAuthBearerLoginModule required",
    "oauth.token.endpoint.uri=\"" + tokenEndpoint + "\"",
    "oauth.client.id=\"" + clientId + "\"",
    "oauth.client.secret=\"" + clientSecret + "\"",
    "oauth.scope=\"" + scope + "\"",
    "oauth.logical.cluster=\"" + logicalCluster + "\"",
    "oauth.identity.pool.id=\"" + identityPoolId + "\";"
);

Using Text Blocks (Java 15+):

String jaasConfig = """
    org.apache.kafka.common.security.oauthbearer.OAuthBearerLoginModule required \
    oauth.token.endpoint.uri="%s" \
    oauth.client.id="%s" \
    oauth.client.secret="%s" \
    oauth.scope="%s" \
    oauth.logical.cluster="%s" \
    oauth.identity.pool.id="%s";
    """.formatted(tokenEndpoint, clientId, clientSecret, scope, logicalCluster, identityPoolId);

Test your configuration

  1. Enable debug logging to troubleshoot OAuth issues:

    # Add to your configuration
    log4j.logger.org.apache.kafka.clients.NetworkClient=DEBUG
    log4j.logger.org.apache.kafka.common.security.oauthbearer=DEBUG
    log4j.logger.io.confluent.kafka.clients.oauth=DEBUG
    
  2. Test with a simple producer:

    // Test producer
    KafkaProducer<String, String> producer = new KafkaProducer<>(props);
    ProducerRecord<String, String> record = new ProducerRecord<>("test-topic", "test-message");
    
    producer.send(record, (metadata, exception) -> {
        if (exception == null) {
            System.out.println("Message sent successfully - OAuth is working!");
        } else {
            System.err.println("Error sending message: " + exception.getMessage());
        }
        producer.close();
    });
    
  3. Check for common errors:

    • “SASL authentication failed” - Check your OAuth credentials and endpoint
    • “Invalid token” - Verify your OAuth configuration is correct
    • “Connection timeout” - Check your bootstrap servers and network connectivity
  4. Verify token refresh - The client should automatically refresh tokens when they expire

Configure Schema Registry Java clients

The Schema Registry Java client module includes support for the OpenID Connect (OIDC) authentication protocol and OAuth 2.0. This enables these Java clients to use token credentials to authenticate with Confluent Cloud Schema Registry.

To obtain the OAuth token, Schema Registry clients use the client credentials grant. The RFC client credential grant flow for obtaining authorization looks like this:

  1. The Schema Registry client makes a request to a public OIDC server or token server (for example, Okta), using the client ID and secret to authenticate with it.
  2. The OIDC server returns a token that the Schema Registry client can use to talk to the cloud Schema Registry server.

The logic of fetching tokens and providing them to HTTP headers in Schema Registry clients is done by the BearerAuthCredentialProvider (Token) Classes.

Configure a Schema Registry client to use a standard OAuth authorization server and protocol

You can authenticate Schema Registry clients to a public OAuth 2.0 server by either adding the required configuration settings directly to the clients or by enabling the clients to inherit the settings.

To add OAuth authentication to Schema Registry, you must add the following configuration settings to your Schema Registry client:

Properties srConfig = new Properties();

// Schema Registry OAuth configuration
srConfig.put("bearer.auth.credentials.source", "OAUTHBEARER");
srConfig.put("bearer.auth.issuer.endpoint.url", "https://your-idp.com/oauth2/token");
srConfig.put("bearer.auth.client.id", "your-client-id");
srConfig.put("bearer.auth.client.secret", "your-client-secret");
srConfig.put("bearer.auth.scope", "schema:read schema:write");
srConfig.put("bearer.auth.logical.cluster", "lsrc-xxxxx");
srConfig.put("bearer.auth.identity.pool.id", "pool-xxxxx");

To inherit the OAuth configuration settings from your Kafka configuration, add the following configuration settings to your Schema Registry client:

Properties srConfig = new Properties();

// Inherit OAuth configuration from Kafka client
srConfig.put("bearer.auth.credentials.source", "SASL_OAUTHBEARER_INHERIT");
srConfig.put("bearer.auth.logical.cluster", "lsrc-xxxxx");
srConfig.put("bearer.auth.identity.pool.id", "pool-xxxxx");

Configure a Schema Registry client to use a custom token provider and OAuth implementation

For custom OAuth implementations, use the following configuration:

bearer.auth.credentials.source=CUSTOM
bearer.auth.custom.provider.class=<fully-qualified-class-name>
bearer.auth.logical.cluster=<lsrc-resource-id>
bearer.auth.identity.pool.id=<identity-pool-id>

The last two configurations (bearer.auth.logical.cluster and bearer.auth.identity.pool.id) shown above are optional for other use cases, but required for Confluent Cloud Schema Registry.

The configurations above are for a Kafka producer. Other types of Schema Registry clients need appropriate prefixes. For example, KSQL would start with ksql.schema.registry.

Note

Resource Management Best Practice: When using Schema Registry clients, always use try-with-resources to ensure proper cleanup. The SchemaRegistryClient interface implements java.io.Closeable, so it can be used in a try-with-resources block for automatic cleanup:

try (SchemaRegistryClient schemaRegistryClient = new CachedSchemaRegistryClient(
    "https://your-schema-registry-endpoint",
    100, // cache capacity
    srConfig
)) {
    System.out.println("Schema Registry client configured successfully with OAuth.");
    // Use the client...
} catch (Exception e) {
    System.err.println("Error configuring Schema Registry client: " + e.getMessage());
}

Custom OAuth implementations

For advanced use cases, implement custom token providers:

public class CustomTokenProvider implements BearerAuthCredentialProvider {

    private final String tokenEndpoint;
    private final String clientId;
    private final String clientSecret;
    private final Map<String, String> additionalHeaders;

    public CustomTokenProvider(String tokenEndpoint, String clientId,
                             String clientSecret, Map<String, String> additionalHeaders) {
        this.tokenEndpoint = tokenEndpoint;
        this.clientId = clientId;
        this.clientSecret = clientSecret;
        this.additionalHeaders = additionalHeaders;
    }

    @Override
    public String getBearerToken(URL url) throws IOException {
        // Custom token retrieval logic
        HttpURLConnection connection = (HttpURLConnection) new URL(tokenEndpoint).openConnection();
        connection.setRequestMethod("POST");
        connection.setRequestProperty("Content-Type", "application/x-www-form-urlencoded");

        // Add custom headers
        if (additionalHeaders != null) {
            for (Map.Entry<String, String> header : additionalHeaders.entrySet()) {
                connection.setRequestProperty(header.getKey(), header.getValue());
            }
        }

        // Set up authentication
        String auth = clientId + ":" + clientSecret;
        String encodedAuth = Base64.getEncoder().encodeToString(auth.getBytes());
        connection.setRequestProperty("Authorization", "Basic " + encodedAuth);

        // Send request
        String postData = "grant_type=client_credentials";
        connection.setDoOutput(true);
        try (OutputStream os = connection.getOutputStream()) {
            os.write(postData.getBytes());
        }

        // Parse response
        try (BufferedReader br = new BufferedReader(
                new InputStreamReader(connection.getInputStream()))) {
            StringBuilder response = new StringBuilder();
            String line;
            while ((line = br.readLine()) != null) {
                response.append(line);
            }

            // Parse JSON response
            ObjectMapper mapper = new ObjectMapper();
            JsonNode jsonNode = mapper.readTree(response.toString());
            return jsonNode.get("access_token").asText();
        }
    }
}

Google OIDC integration

For Google OIDC integration with Java clients:

// Google OIDC configuration
Properties props = new Properties();
props.put("bootstrap.servers", "your-bootstrap-server:9092");
props.put("security.protocol", "SASL_SSL");
props.put("sasl.mechanism", "OAUTHBEARER");
props.put("sasl.login.callback.handler.class",
    "io.confluent.kafka.clients.oauth.OauthLoginCallbackHandler");
props.put("sasl.oauthbearer.token.endpoint.url", "https://oauth2.googleapis.com/token");
props.put("sasl.oauthbearer.client.id", "your-google-client-id");
props.put("sasl.oauthbearer.client.secret", "your-google-client-secret");
props.put("sasl.oauthbearer.scope", "https://www.googleapis.com/auth/cloud-platform");
props.put("sasl.oauthbearer.extensions", "logical_cluster=lkc-xxxxx,identity_pool_id=pool-xxxxx");

// Create producer with OAuth configuration
KafkaProducer<String, String> producer = new KafkaProducer<>(props);

Troubleshoot Java OAuth clients

Common issues and solutions for Java OAuth clients:

Authentication failures

  • Verify client ID and secret are correct
  • Check token endpoint URL is accessible
  • Ensure logical cluster ID is valid
  • Validate identity pool ID if used

Network issues

  • Confirm network connectivity to OAuth provider
  • Check firewall rules allow OAuth traffic
  • Verify SSL certificate validation

Configuration issues

  • Ensure all required parameters are provided
  • Validate OAuth configuration syntax
  • Check timeout values are reasonable

Debug logging

Enable debug logging for OAuth troubleshooting:

# Enable OAuth debug logging
log4j.logger.org.apache.kafka.common.security.oauthbearer=DEBUG
log4j.logger.io.confluent.kafka.clients.oauth=DEBUG
log4j.logger.org.apache.kafka.clients.NetworkClient=DEBUG