Configure Clients for Confluent Cloud OAuth

Use the following information to configure your Kafka Java clients to use the OAuth 2.0 handler for connecting to Confluent Cloud clusters.

Prerequisites

  • Apache Kafka client: 3.2.1 or later;
  • Confluent Platform: 7.2.1 or later; 7.1.3 or later
  • librdkafka: 1.9.2 or later

The implementation class is

org.apache.kafka.common.security.oauthbearer.secured.OAuthBearerLoginCallbackHandler

and accepts instances of the following:

org.apache.kafka.common.security.oauthbearer.OAuthBearerTokenCallback
org.apache.kafka.common.security.auth.SaslExtensionsCallback

The fully-qualified class name is provided to the client’s configuration of sasl.login.callback.handler.class.

Because the HTTP call made to the OAuth 2.0 provider might time out or transiently fail, a retry mechanism waits between attempts. The number of attempts made (including the first attempt) are variable as it uses an exponential backoff approach:

  1. Immediately, an attempt to connect to the HTTP endpoint is initiated.
  2. If the first attempt fails, a second attempt is initiated after the configurable duration of milliseconds–sasl.login.retry.backoff.ms.
  3. If the second attempt fails, the duration is doubled before a third attempt.

This pattern repeats as required until the maximum duration of sasl.login.retry.backoff.max.ms.

JAAS configuration options

Several configuration options are available for the callback handler. Sensitive configuration options and SASL extensions are included in the JAAS configuration file (sasl.jaas.config) while the others are top-level configurations.

JAAS configuration option Description
clientID The public identifier for an application used to support the OAuth client credentials grant type. Note that this is not the Kafka client identifier.
clientSecret The secret, known only to an application and an authorization server, associated with the clientID and used to support the OAuth client credentials grant.
scope (optional) The scope to reference in the call to the OAuth server.

Client login callback handler - top-level configuration options

The top-level configuration options available for the client login callback handler are included in the following table.

JAAS configuration option Description Default
sasl.oauthbearer.token.endpoint.url The URL for the OAuth 2.0 issuer token endpoint.  
sasl.oauthbearer.scope.claim.name (optional) The override name of the scope claim. scope
sasl.login.connect.timeout.ms (optional) The duration, in milliseconds, for HTTPS connect timeout 10000
sasl.login.read.timeout.ms (optional) The duration, in milliseconds, for HTTPS read timeout. 10000
sasl.login.retry.backoff.ms (optional) The duration, in milliseconds, to wait between HTTPS call attempts. 100
sasl.login.retry.backoff.max.ms (optional) The maximum duration, in milliseconds, for HTTPS call attempts. 10000

Here’s an example of the JAAS configuration as a part of a Java properties file:

bootstrap.servers=<bootstrap URL>
security.protocol=SASL_SSL
sasl.oauthbearer.token.endpoint.url=https://myidp.example.com/oauth2/default/v1/token
sasl.login.callback.handler.class=org.apache.kafka.common.security.oauthbearer.secured.OAuthBearerLoginCallbackHandler
sasl.mechanism=OAUTHBEARER
sasl.jaas.config= \
  org.apache.kafka.common.security.oauthbearer.OAuthBearerLoginModule required \
    clientId='<client ID>' \
    scope='<Requested Scope>' \
    clientSecret='<Client Secret>' \
    extension_logicalCluster='<Cluster ID>' \
    extension_identityPoolId='<Pool ID>';

In the example above:

  • Replace <bootstrap-url> with the bootstrap server URL. You can get the bootstrap server URL in the Cluster settings section of the Confluent Cloud Console or by using the confluent kafka cluster describe Confluent CLI command.
  • The OAuth provider’s sasl.oauthbearer.token.endpoint.url is specified as well as a sasl.login.connect.timeout.ms overrides the default value.
  • The clientId and clientSecret values as provided by the OAuth provider for an “API” or “machine-to-machine” account are required.
  • The optional scope value allows the inclusion of a scope parameter when requesting the token.
  • Note the SASL extension configuration for extension_logicalCluster and extension_identityPoolId. During the OAuth token retrieval step, extensions are ignored, but will be passed to the broker using the SASL extension mechanism from KIP-342.

After the client successfully logs in, the returned access token can be reused by other connections from this client. While these additional connections do not issue the token retrieval HTTP call on the client, the broker validates the token each time it is sent by a client connection.

Based on KIP-368, the OAuth token reauthentication logic is automatically “inherited” by this implementation, so no additional work is needed to support that feature.

Configure Schema Registry clients for OAuth

Requires: Kafka Schema Registry Client 7.3.1 or later (Confluent Platform 7.3.1 or later)

You can authenticate Schema Registry clients by either adding the required configuration settings directly to the clients or by enabling the clients to inherit the settings.

To add OAuth authentication to Schema Registry, you must add the following configuration settings to your Schema Registry client:

bearer.auth.credentials.source=OAUTHBEARER
bearer.auth.issuer.endpoint.url=<openid-provider-url>
bearer.auth.client.id=<client-id>
bearer.auth.client.secret=<client-secret>
bearer.auth.scope=<scope>
bearer.auth.logical.cluster=<lsrc-resource-id>
bearer.auth.identity.pool.id=<identity-pool-id>

To inherit the OAuth configuration settings from your Kafka configuration, add the following configuration settings to your Schema Registry client:

bearer.auth.credentials.source=SASL_OAUTHBEARER_INHERIT
bearer.auth.logical.cluster=<lsrc-resource-id>

# Config settings below for bearer.auth are optional when using SaslOAuthCredentialProvider
bearer.auth.issuer.endpoint.url=<openid-provider-url>
bearer.auth.client.id=<client-id>
bearer.auth.client.secret=<client-secret>
bearer.auth.scope=<scope>
bearer.auth.identity.pool.id=<identity-pool-id>