Configure Clients to Use Confluent Cloud OAuth¶
Use the following information to configure your Kafka Java clients to use the OAuth 2.0 handler for connecting to Confluent Cloud clusters.
Prerequisites¶
- Apache Kafka client: 3.2.1 or later;
- Confluent Platform: 7.2.1 or later; 7.1.3 or later
- librdkafka: 1.9.2 or later
The implementation class is
org.apache.kafka.common.security.oauthbearer.secured.OAuthBearerLoginCallbackHandler
and accepts instances of the following:
org.apache.kafka.common.security.oauthbearer.OAuthBearerTokenCallback
org.apache.kafka.common.security.auth.SaslExtensionsCallback
The fully-qualified class name is provided to the client’s configuration of
sasl.login.callback.handler.class
.
Because the HTTP call made to the OAuth 2.0 provider might time out or transiently fail, a retry mechanism waits between attempts. The number of attempts made (including the first attempt) are variable as it uses an exponential backoff approach:
- Immediately, an attempt to connect to the HTTP endpoint is initiated.
- If the first attempt fails, a second attempt is initiated after the configurable
duration of
milliseconds–sasl.login.retry.backoff.ms
. - If the second attempt fails, the duration is doubled before a third attempt.
This pattern repeats as required until the maximum duration of sasl.login.retry.backoff.max.ms
.
JAAS configuration options¶
Several configuration options are available for the callback handler. Sensitive
configuration options and SASL extensions are included in the JAAS configuration
file (sasl.jaas.config
) while the others are top-level configurations.
JAAS configuration option | Description |
---|---|
clientID |
The public identifier for an application used to support the OAuth client credentials grant type. Note that this is not the Kafka client identifier. |
clientSecret |
The secret, known only to an application and an authorization server,
associated with the clientID and used to support the OAuth client
credentials grant. |
scope |
(optional) The scope to reference in the call to the OAuth server. |
Client login callback handler - top-level configuration options¶
The top-level configuration options available for the client login callback handler are included in the following table.
JAAS configuration option | Description | Default |
---|---|---|
sasl.oauthbearer.token.endpoint.url |
The URL for the OAuth 2.0 issuer token endpoint. | |
sasl.oauthbearer.scope.claim.name |
(optional) The override name of the scope claim. |
scope |
sasl.login.connect.timeout.ms |
(optional) The duration, in milliseconds, for HTTPS connect timeout | 10000 |
sasl.login.read.timeout.ms |
(optional) The duration, in milliseconds, for HTTPS read timeout. | 10000 |
sasl.login.retry.backoff.ms |
(optional) The duration, in milliseconds, to wait between HTTPS call attempts. | 100 |
sasl.login.retry.backoff.max.ms |
(optional) The maximum duration, in milliseconds, for HTTPS call attempts. | 10000 |
Here’s an example of the JAAS configuration as a part of a Java properties file:
bootstrap.servers=<bootstrap URL>
security.protocol=SASL_SSL
sasl.oauthbearer.token.endpoint.url=https://myidp.example.com/oauth2/default/v1/token
sasl.login.callback.handler.class=org.apache.kafka.common.security.oauthbearer.secured.OAuthBearerLoginCallbackHandler
sasl.mechanism=OAUTHBEARER
sasl.jaas.config= \
org.apache.kafka.common.security.oauthbearer.OAuthBearerLoginModule required \
clientId='<client ID>' \
scope='<Requested Scope>' \
clientSecret='<Client Secret>' \
extension_logicalCluster='<Cluster ID>' \
extension_identityPoolId='<Pool ID>';
In the example above:
- Replace
<bootstrap-url>
with the bootstrap server URL. You can get the bootstrap server URL in the Cluster settings section of the Confluent Cloud Console or by using theconfluent kafka cluster describe
Confluent CLI command. - The OAuth provider’s
sasl.oauthbearer.token.endpoint.url
is specified as well as asasl.login.connect.timeout.ms
overrides the default value. - The
clientId
andclientSecret
values as provided by the OAuth provider for an “API” or “machine-to-machine” account are required. - The optional
scope
value allows the inclusion of ascope
parameter when requesting the token. - Note the SASL extension configuration for
extension_logicalCluster
andextension_identityPoolId
. During the OAuth token retrieval step, extensions are ignored, but will be passed to the broker using the SASL extension mechanism from KIP-342.
After the client successfully logs in, the returned access token can be reused by other connections from this client. While these additional connections do not issue the token retrieval HTTP call on the client, the broker validates the token each time it is sent by a client connection.
Based on KIP-368, the OAuth token reauthentication logic is automatically “inherited” by this implementation, so no additional work is needed to support that feature.
Configure Schema Registry clients for OAuth¶
Requires: Kafka Schema Registry Client 7.3.1 or later (Confluent Platform 7.3.1 or later)
You can authenticate Schema Registry clients by either adding the required configuration settings directly to the clients or by enabling the clients to inherit the settings.
To add OAuth authentication to Schema Registry, you must add the following configuration settings to your Schema Registry client:
bearer.auth.credentials.source=OAUTHBEARER
bearer.auth.issuer.endpoint.url=<openid-provider-url>
bearer.auth.client.id=<client-id>
bearer.auth.client.secret=<client-secret>
bearer.auth.scope=<scope>
bearer.auth.logical.cluster=<lsrc-resource-id>
bearer.auth.identity.pool.id=<identity-pool-id>
To inherit the OAuth configuration settings from your Kafka configuration, add the following configuration settings to your Schema Registry client:
bearer.auth.credentials.source=SASL_OAUTHBEARER_INHERIT
bearer.auth.logical.cluster=<lsrc-resource-id>
# Config settings below for bearer.auth are optional when using SaslOAuthCredentialProvider
bearer.auth.issuer.endpoint.url=<openid-provider-url>
bearer.auth.client.id=<client-id>
bearer.auth.client.secret=<client-secret>
bearer.auth.scope=<scope>
bearer.auth.identity.pool.id=<identity-pool-id>