Configure Clients for Confluent Cloud OAuth

Use the following information to configure your Kafka Java clients to use the OAuth 2.0 handler for connecting to Confluent Cloud clusters.

Prerequisites

  • Apache Kafka client: 3.2.1 or later;
  • Confluent Platform: 7.2.1 or later; 7.1.3 or later
  • librdkafka: 1.9.2 or later

The implementation class is

org.apache.kafka.common.security.oauthbearer.secured.OAuthBearerLoginCallbackHandler

and accepts instances of the following:

org.apache.kafka.common.security.oauthbearer.OAuthBearerTokenCallback
org.apache.kafka.common.security.auth.SaslExtensionsCallback

The fully-qualified class name is provided to the client’s configuration of sasl.login.callback.handler.class.

Because the HTTP call made to the OAuth 2.0 provider might time out or transiently fail, a retry mechanism waits between attempts. The number of attempts made (including the first attempt) are variable as it uses an exponential backoff approach:

  1. Immediately, an attempt to connect to the HTTP endpoint is initiated.
  2. If the first attempt fails, a second attempt is initiated after the configurable duration of milliseconds–sasl.login.retry.backoff.ms.
  3. If the second attempt fails, the duration is doubled before a third attempt.

This pattern repeats as required until the maximum duration of sasl.login.retry.backoff.max.ms.

JAAS configuration options

Several configuration options are available for the callback handler. Sensitive configuration options and SASL extensions are included in the JAAS configuration file (sasl.jaas.config) while the others are top-level configurations.

JAAS configuration option Description
clientID The public identifier for an application used to support the OAuth client credentials grant type. Note that this is not the Kafka client identifier.
clientSecret The secret, known only to an application and an authorization server, associated with the clientID and used to support the OAuth client credentials grant.
scope (optional) The scope to reference in the call to the OAuth server.

Client login callback handler - top-level configuration options

The top-level configuration options available for the client login callback handler are included in the following table.

JAAS configuration option Description Default
sasl.oauthbearer.token.endpoint.url The URL for the OAuth 2.0 issuer token endpoint.  
sasl.oauthbearer.scope.claim.name (optional) The override name of the scope claim. scope
sasl.login.connect.timeout.ms (optional) The duration, in milliseconds, for HTTPS connect timeout 10000
sasl.login.read.timeout.ms (optional) The duration, in milliseconds, for HTTPS read timeout. 10000
sasl.login.retry.backoff.ms (optional) The duration, in milliseconds, to wait between HTTPS call attempts. 100
sasl.login.retry.backoff.max.ms (optional) The maximum duration, in milliseconds, for HTTPS call attempts. 10000

Here’s an example of the JAAS configuration as a part of a Java properties file:

bootstrap.servers=<bootstrap URL>
security.protocol=SASL_SSL
sasl.oauthbearer.token.endpoint.url=https://myidp.example.com/oauth2/default/v1/token
sasl.login.callback.handler.class=org.apache.kafka.common.security.oauthbearer.secured.OAuthBearerLoginCallbackHandler
sasl.mechanism=OAUTHBEARER
sasl.jaas.config= \
  org.apache.kafka.common.security.oauthbearer.OAuthBearerLoginModule required \
    clientId='<client ID>' \
    scope='<Requested Scope>' \
    clientSecret='<Client Secret>' \
    extension_logicalCluster='<Cluster ID>' \
    extension_identityPoolId='<Pool ID>';

In the example above:

  • Replace <bootstrap-url> with the bootstrap server URL. You can get the bootstrap server URL in the Cluster settings section of the Confluent Cloud Console or by using the confluent kafka cluster describe Confluent CLI command.
  • The OAuth provider’s sasl.oauthbearer.token.endpoint.url is specified as well as a sasl.login.connect.timeout.ms overrides the default value.
  • The clientId and clientSecret values as provided by the OAuth provider for an “API” or “machine-to-machine” account are required.
  • The optional scope value allows the inclusion of a scope parameter when requesting the token.
  • Note the SASL extension configuration for extension_logicalCluster and extension_identityPoolId. During the OAuth token retrieval step, extensions are ignored, but will be passed to the broker using the SASL extension mechanism from KIP-342.

After the client successfully logs in, the returned access token can be reused by other connections from this client. While these additional connections do not issue the token retrieval HTTP call on the client, the broker validates the token each time it is sent by a client connection.

Based on KIP-368, the OAuth token reauthentication logic is automatically “inherited” by this implementation, so no additional work is needed to support that feature.

Configure Schema Registry clients for OAuth

Requires: Kafka Schema Registry Client 7.3.1 or later (Confluent Platform 7.3.1 or later)

Confluent OAuth supports the OAuth 2.0 protocol for authentication and authorization, as described in OAuth for Confluent Cloud.

The Schema Registry Java client module includes support for the OpenID Connect (OIDC) authentication protocol and OAuth 2.0. This enables these Java clients to use token credentials to authenticate with Confluent Cloud Schema Registry. The Schema Registry client is used by serializers and deserializers in producers, consumers, (self-managed) Connect, ksqlDB, and so on. Note that these serdes have a dependency of Confluent Platform; that is, you need Confluent Platform to create them.

You have the option of configuring your Schema Registry clients to use a standard OAuth bearer token with a public OIDC server or token server (like Okta), or a custom token provider paired with your own implementation of the BearerAuthCredentialProvider interface.

How it works

To obtain the OAuth token, Schema Registry clients use the client credentials grant. The RFC client credential grant flow for obtaining authorization looks like this:

  1. The Schema Registry client makes a request to a public OIDC server or token server (such as, Okta), using the client ID and secret to authenticate with it.
  2. The OIDC server returns a token that the Schema Registry client can use to talk to the cloud Schema Registry server.

The logic of fetching tokens and providing them to HTTP headers in Schema Registry clients is done by the BearerAuthCredentialProvider (Token) Classes. For example, in a Kafka producer using a Schema Registry serialization format, this is configured as follows:

bearer.auth.credentials.source=<alias of BearerAuthCredentialProvider class>

On the other hand, a non-standard, custom implementation of OAuth and self-managed infrastructure does not follow the standard RFC client credential grant flow. For these use cases, Confluent provides a new implementation of BearerAuthCredentialProvider called CustomBearerAuthCredentialProvider (alias CUSTOM). This is available to use in Schema Registry clients and will act as a wrapper around your custom token fetching logic. CustomBearerAuthCredentialProvider doesn’t have any logic for token retrieval by itself, but it delegates that work to your custom implementation. BearerAuthCredentialProvider is a public interface and since the Schema Registry is open source, customers can create a concrete implementation with their own logic.

The following diagram shows the BearerAuthCredentialProvider or token provider classes available to choose from, including the CustomTokenCredentialProvider. Each of these classes is configured using your aliases for OauthCredentialProvider using OAUTHBEARER, SaslOauthCredentialProvider using SASL_OAUTHBEARER_INHERIT, and so forth.

../../../_images/oauth-oidc-flow-sr.png

For the custom implementation, bearer.auth.custom.provider.class is where you specify the fully qualified class name of your custom implementation. In the diagram, this is represented as MyCustomImplementation. This class implements BearerAuthCredentialProvider as well. You implement the function getBearerToken(URL), which contains the logic for retrieving the token. Ideally, the getBearerToken(URL) would internally use a cache to get the token from, and this cache would have the logic of refreshing the token when it expires.

Configure a Schema Registry client to use a standard OAuth authorization server and protocol

You can authenticate Schema Registry clients to a public OAuth 2.0 server by either adding the required configuration settings directly to the clients or by enabling the clients to inherit the settings.

To add OAuth authentication to Schema Registry, you must add the following configuration settings to your Schema Registry client:

bearer.auth.credentials.source=OAUTHBEARER
bearer.auth.issuer.endpoint.url=<openid-provider-url>
bearer.auth.client.id=<client-id>
bearer.auth.client.secret=<client-secret>
bearer.auth.scope=<scope>
bearer.auth.logical.cluster=<lsrc-resource-id>
bearer.auth.identity.pool.id=<identity-pool-id>

To inherit the OAuth configuration settings from your Kafka configuration, add the following configuration settings to your Schema Registry client:

bearer.auth.credentials.source=SASL_OAUTHBEARER_INHERIT
bearer.auth.logical.cluster=<lsrc-resource-id>

# Config settings below for bearer.auth are optional when using SaslOAuthCredentialProvider
bearer.auth.issuer.endpoint.url=<openid-provider-url>
bearer.auth.client.id=<client-id>
bearer.auth.client.secret=<client-secret>
bearer.auth.scope=<scope>
bearer.auth.identity.pool.id=<identity-pool-id>

Configure a Schema Registry client to use a custom token provider and OAuth implementation

To authenticate a client to Schema Registry given a custom token provider and implementation, add the following configuration settings to the client:

bearer.auth.credentials.source=CUSTOM
bearer.auth.custom.token.provider.class=<Fully Qualified class name of custom Implementation>
bearer.auth.logical.cluster=<lsrc-xxxxx>
bearer.auth.identity.pool.id=<pood Id>

The last two configs (bearer.auth.logical.cluster and bearer.auth.identity.pool.id) shown above are optional for other use cases, but required for Confluent Cloud Schema Registry.

The above configs are for a Kafka producer. Other types of Schema Registry clients need appropriate prefixes. For example, KSQL would start with ksql.schema.registry.