Configure Clients for OAuth-OIDC on Confluent Cloud¶
Use the following information to configure your Kafka Java clients to use the OAuth 2.0 handler for connecting to Confluent Cloud clusters.
Prerequisites¶
- Apache Kafka client: 3.2.1 or later;
- Confluent Platform: 7.2.1 or later; 7.1.3 or later
- librdkafka: 1.9.2 or later
The implementation class is
org.apache.kafka.common.security.oauthbearer.secured.OAuthBearerLoginCallbackHandler
and accepts instances of the following:
org.apache.kafka.common.security.oauthbearer.OAuthBearerTokenCallback
org.apache.kafka.common.security.auth.SaslExtensionsCallback
The fully-qualified class name is provided to the client’s configuration of
sasl.login.callback.handler.class
.
Because the HTTP call made to the OAuth 2.0 provider might time out or transiently fail, a retry mechanism waits between attempts. The number of attempts made (including the first attempt) are variable as it uses an exponential backoff approach:
- Immediately, an attempt to connect to the HTTP endpoint is initiated.
- If the first attempt fails, a second attempt is initiated after the configurable
duration of
milliseconds–sasl.login.retry.backoff.ms
. - If the second attempt fails, the duration is doubled before a third attempt.
This pattern repeats as required until the maximum duration of sasl.login.retry.backoff.max.ms
.
JAAS configuration options¶
Several configuration options are available for the callback handler. Sensitive
configuration options and SASL extensions are included in the JAAS configuration
file (sasl.jaas.config
) while the others are top-level configurations.
JAAS configuration option | Description |
---|---|
clientID |
The public identifier for an application used to support the OAuth client credentials grant type. Note that this is not the Kafka client identifier. |
clientSecret |
The secret, known only to an application and an authorization server,
associated with the clientID and used to support the OAuth client
credentials grant. |
scope |
(optional) The scope to reference in the call to the OAuth server. |
Client login callback handler - top-level configuration options¶
The top-level configuration options available for the client login callback handler are included in the following table.
JAAS configuration option | Description | Default |
---|---|---|
sasl.oauthbearer.token.endpoint.url |
The URL for the OAuth 2.0 issuer token endpoint. | |
sasl.oauthbearer.scope.claim.name |
(optional) The override name of the scope claim. |
scope |
sasl.login.connect.timeout.ms |
(optional) The duration, in milliseconds, for HTTPS connect timeout | 10000 |
sasl.login.read.timeout.ms |
(optional) The duration, in milliseconds, for HTTPS read timeout. | 10000 |
sasl.login.retry.backoff.ms |
(optional) The duration, in milliseconds, to wait between HTTPS call attempts. | 100 |
sasl.login.retry.backoff.max.ms |
(optional) The maximum duration, in milliseconds, for HTTPS call attempts. | 10000 |
Here’s an example of the JAAS configuration as a part of a Java properties file:
bootstrap.servers=<bootstrap URL>
security.protocol=SASL_SSL
sasl.oauthbearer.token.endpoint.url=https://myidp.example.com/oauth2/default/v1/token
sasl.login.callback.handler.class=org.apache.kafka.common.security.oauthbearer.secured.OAuthBearerLoginCallbackHandler
sasl.mechanism=OAUTHBEARER
sasl.jaas.config= \
org.apache.kafka.common.security.oauthbearer.OAuthBearerLoginModule required \
clientId='<client ID>' \
scope='<Requested Scope>' \
clientSecret='<Client Secret>' \
extension_logicalCluster='<Cluster ID>' \
extension_identityPoolId='<Pool ID>';
In the example above:
- Replace
<bootstrap-url>
with the bootstrap server URL. You can get the bootstrap server URL in the Cluster settings section of the Confluent Cloud Console or by using theconfluent kafka cluster describe
Confluent CLI command. - The OAuth provider’s
sasl.oauthbearer.token.endpoint.url
is specified as well as asasl.login.connect.timeout.ms
overrides the default value. - The
clientId
andclientSecret
values as provided by the OAuth provider for an “API” or “machine-to-machine” account are required. - The optional
scope
value allows the inclusion of ascope
parameter when requesting the token. - Note the SASL extension configuration for
extension_logicalCluster
andextension_identityPoolId
. During the OAuth token retrieval step, extensions are ignored, but will be passed to the broker using the SASL extension mechanism from KIP-342.
After the client successfully logs in, the returned access token can be reused by other connections from this client. While these additional connections do not issue the token retrieval HTTP call on the client, the broker validates the token each time it is sent by a client connection.
Based on KIP-368, the OAuth token reauthentication logic is automatically “inherited” by this implementation, so no additional work is needed to support that feature.
Configure Schema Registry clients for OAuth¶
Requires: Kafka Schema Registry Client 7.3.1 or later (Confluent Platform 7.3.1 or later)
Confluent OAuth supports the OAuth 2.0 protocol for authentication and authorization, as described in Use OAuth/OIDC to Authenticate to Confluent Cloud.
The Schema Registry Java client module includes support for the OpenID Connect (OIDC) authentication protocol and OAuth 2.0. This enables these Java clients to use token credentials to authenticate with Confluent Cloud Schema Registry. The Schema Registry client is used by serializers and deserializers in producers, consumers, (self-managed) Connect, ksqlDB, and so on. Note that these serdes have a dependency of Confluent Platform; that is, you need Confluent Platform to create them.
You have the option of configuring your Schema Registry clients to use a standard OAuth bearer token with a public OIDC server or token server (like Okta), or
a custom token provider paired with your own implementation of the BearerAuthCredentialProvider
interface.
How it works¶
To obtain the OAuth token, Schema Registry clients use the client credentials grant. The RFC client credential grant flow for obtaining authorization looks like this:
- The Schema Registry client makes a request to a public OIDC server or token server (such as, Okta), using the client ID and secret to authenticate with it.
- The OIDC server returns a token that the Schema Registry client can use to talk to the cloud Schema Registry server.
The logic of fetching tokens and providing them to HTTP headers in Schema Registry clients is done by the BearerAuthCredentialProvider
(Token) Classes.
For example, in a Kafka producer using a Schema Registry serialization format, this is configured as follows:
bearer.auth.credentials.source=<alias of BearerAuthCredentialProvider class>
On the other hand, a non-standard, custom implementation of OAuth and self-managed infrastructure does not follow the standard
RFC client credential grant flow. For these use cases, Confluent provides a new implementation of BearerAuthCredentialProvider
called CustomBearerAuthCredentialProvider
(alias CUSTOM
). This is available to use in Schema Registry clients and will act as a
wrapper around your custom token fetching logic. CustomBearerAuthCredentialProvider
doesn’t have any logic for token retrieval
by itself, but it delegates that work to your custom implementation. BearerAuthCredentialProvider
is a public interface and
since the Schema Registry is open source, customers can create a concrete implementation with their own logic.
The following diagram shows the BearerAuthCredentialProvider
or token provider classes available to choose from,
including the CustomTokenCredentialProvider
. Each of these classes is configured using your aliases for
OauthCredentialProvider
using OAUTHBEARER, SaslOauthCredentialProvider
using SASL_OAUTHBEARER_INHERIT, and so forth.
For the custom implementation, bearer.auth.custom.provider.class
is where you specify the fully qualified class name
of your custom implementation. In the diagram, this is represented as MyCustomImplementation
. This class implements BearerAuthCredentialProvider
as well.
You implement the function getBearerToken(URL)
, which contains the logic for retrieving the token. Ideally, the getBearerToken(URL)
would internally use a cache to get the token from, and this cache would have the logic of refreshing the token when it expires.
Configure a Schema Registry client to use a standard OAuth authorization server and protocol¶
You can authenticate Schema Registry clients to a public OAuth 2.0 server by either adding the required configuration settings directly to the clients or by enabling the clients to inherit the settings.
To add OAuth authentication to Schema Registry, you must add the following configuration settings to your Schema Registry client:
bearer.auth.credentials.source=OAUTHBEARER
bearer.auth.issuer.endpoint.url=<openid-provider-url>
bearer.auth.client.id=<client-id>
bearer.auth.client.secret=<client-secret>
bearer.auth.scope=<scope>
bearer.auth.logical.cluster=<lsrc-resource-id>
bearer.auth.identity.pool.id=<identity-pool-id>
To inherit the OAuth configuration settings from your Kafka configuration, add the following configuration settings to your Schema Registry client:
bearer.auth.credentials.source=SASL_OAUTHBEARER_INHERIT
bearer.auth.logical.cluster=<lsrc-resource-id>
# Config settings below for bearer.auth are optional when using SaslOAuthCredentialProvider
bearer.auth.issuer.endpoint.url=<openid-provider-url>
bearer.auth.client.id=<client-id>
bearer.auth.client.secret=<client-secret>
bearer.auth.scope=<scope>
bearer.auth.identity.pool.id=<identity-pool-id>
Configure a Schema Registry client to use a custom token provider and OAuth implementation¶
To authenticate a client to Schema Registry given a custom token provider and implementation, add the following configuration settings to the client:
bearer.auth.credentials.source=CUSTOM
bearer.auth.custom.provider.class=<Fully Qualified class name of custom Implementation>
bearer.auth.logical.cluster=<lsrc-xxxxx>
bearer.auth.identity.pool.id=<Pool ID>
The last two configs (bearer.auth.logical.cluster
and bearer.auth.identity.pool.id
) shown above
are optional for other use cases, but required for Confluent Cloud Schema Registry.
The above configs are for a Kafka producer. Other types of Schema Registry clients need appropriate prefixes. For example, KSQL would start with ksql.schema.registry
.