Configure Python Clients for OAuth/OIDC on Confluent Cloud¶
Prerequisites¶
Before you begin, ensure you have the following:
Language and tooling¶
- Python 3.8 or later
- confluent-kafka-python: 2.0.0 or later
- Confluent Platform: 7.2.1 or later; 7.1.3 or later
Confluent Cloud environment¶
- OAuth setup in Confluent Cloud - Configure OAuth authentication in your Confluent Cloud environment
Credentials and identifiers¶
- Client ID - Your application’s identifier (like a username)
- Client secret - Your application’s password for OAuth
- Token endpoint URL - Where to request access tokens
- Scopes - Permissions your application needs (for example,
kafka:read kafka:write
for Kafka,schema:read
for Schema Registry) - Kafka cluster ID (
lkc-xxxxx
) - Your Kafka cluster identifier - Identity pool ID (
pool-xxxxx
) - If using identity pools - Schema Registry logical cluster ID (
lsrc-xxxxx
) - For Schema Registry operations
Client library¶
Install the latest version with OAuth support:
pip install confluent-kafka
Configure Kafka Python clients¶
Python Kafka clients authenticate to Confluent Cloud clusters using the OAuth 2.0 protocol with a callback function approach. The client passes OAuth configuration parameters to your callback function, which handles the token request and returns the access token.
OAuth callback function¶
The Python client requires an OAuth callback function with robust error handling:
import requests
import json
from confluent_kafka import KafkaException
def oauth_token_refresh_cb(oauth_config):
"""
OAuth callback function for Python Kafka client
Args:
oauth_config: OAuth configuration dictionary
Returns:
tuple: (access_token, expiration_time_ms)
"""
try:
payload = {
'grant_type': 'client_credentials',
'client_id': oauth_config.get('sasl.oauthbearer.client.id'),
'client_secret': oauth_config.get('sasl.oauthbearer.client.secret'),
'scope': oauth_config.get('sasl.oauthbearer.scope')
}
response = requests.post(
oauth_config.get('sasl.oauthbearer.token.endpoint.url'),
data=payload
)
response.raise_for_status()
token = response.json()
# Return the token and its expiration time
return token['access_token'], int(token['expires_in'] * 1000)
except Exception as e:
# Handle exceptions and signal failure
raise KafkaException(f"OAuth token refresh failed: {e}")
Configuration example¶
Define a single, complete configuration dictionary for your Kafka client:
from confluent_kafka import Producer, Consumer
# Define a single, complete configuration dictionary
producer_config = {
'bootstrap.servers': 'your-bootstrap-server:9092',
'security.protocol': 'SASL_SSL',
'sasl.mechanism': 'OAUTHBEARER',
'sasl.oauthbearer.token.endpoint.url': 'https://<your-idp.com>/oauth2/token',
'sasl.oauthbearer.client.id': '<your-client-id>',
'sasl.oauthbearer.client.secret': '<your-client-secret>',
'sasl.oauthbearer.scope': 'kafka:read kafka:write',
'sasl.oauthbearer.extensions': 'logical_cluster=<lkc-xxxxx>,identity_pool_id=<pool-yyyyy>',
'oauth_cb': oauth_token_refresh_cb,
'debug': 'security,protocol,broker' # For troubleshooting
}
# Create producer
producer = Producer(producer_config)
# For consumer, use the same OAuth configuration
consumer_config = {
'bootstrap.servers': 'your-bootstrap-server:9092',
'group.id': 'your-consumer-group',
'security.protocol': 'SASL_SSL',
'sasl.mechanism': 'OAUTHBEARER',
'sasl.oauthbearer.token.endpoint.url': 'https://<your-idp.com>/oauth2/token',
'sasl.oauthbearer.client.id': '<your-client-id>',
'sasl.oauthbearer.client.secret': '<your-client-secret>',
'sasl.oauthbearer.scope': 'kafka:read kafka:write',
'sasl.oauthbearer.extensions': 'logical_cluster=<lkc-xxxxx>,identity_pool_id=<pool-yyyyy>',
'oauth_cb': oauth_token_refresh_cb,
'debug': 'security,protocol,broker'
}
# Create consumer
consumer = Consumer(consumer_config)
Test your configuration¶
Test with a simple producer:
# Test producer producer = Producer(producer_config) def delivery_report(err, msg): if err is not None: print(f'Message delivery failed: {err}') else: print(f'Message delivered to {msg.topic()} [{msg.partition()}] - OAuth is working!') producer.produce('test-topic', 'test-message', callback=delivery_report) producer.flush()
Check for common errors:
- “SASL authentication failed” - Check your OAuth credentials and endpoint
- “Invalid token” - Verify your callback function is returning a valid token
- “Connection timeout” - Check your bootstrap servers and network connectivity
Verify token refresh - The client should automatically refresh tokens when they expire
Configure Schema Registry Python clients¶
The SchemaRegistryClient
authenticates using a built-in OAuth mechanism.
You only need to provide the correct configuration parameters, and the client
handles the token request automatically. It does not use the oauth_cb
callback.
Required parameters¶
The following parameters must be included in the schema.registry.config
dictionary when creating a client.
Parameter | Description |
---|---|
url |
The endpoint for your Schema Registry instance. |
basic.auth.user.info |
The API key and secret for Schema Registry, in the format <api-key>:<api-secret> . |
bearer.auth.credentials.source |
Must be set to ``OAUTHBEARER``. |
bearer.auth.client.id |
The client ID from your identity provider. |
bearer.auth.client.secret |
The client secret from your identity provider. |
bearer.auth.issuer.endpoint.url |
The token endpoint URL of your identity provider. |
bearer.auth.scope |
The required permissions scope (for example, schema:read ). |
bearer.auth.logical.cluster |
The Schema Registry’s logical cluster ID (for example, lsrc-xxxxx ). |
bearer.auth.identity.pool.id |
The identity pool ID, if using identity federation. |
Configuration example¶
from confluent_kafka.schema_registry import SchemaRegistryClient
from confluent_kafka.serialization import StringSerializer
from confluent_kafka.avro import AvroSerializer
schema_registry_conf = {
'url': 'https://<your-schema-registry-endpoint>',
'basic.auth.user.info': '<sr-api-key>:<sr-api-secret>',
# OAuth-specific configuration for Schema Registry client
'bearer.auth.credentials.source': 'OAUTHBEARER',
'bearer.auth.issuer.endpoint.url': 'https://<your-idp.com>/oauth2/token',
'bearer.auth.client.id': '<your-client-id>',
'bearer.auth.client.secret': '<your-client-secret>',
'bearer.auth.scope': 'schema:read',
'bearer.auth.logical.cluster': '<lsrc-xxxxx>',
'bearer.auth.identity.pool.id': '<pool-yyyyy>'
}
schema_registry_client = SchemaRegistryClient(schema_registry_conf)
avro_serializer = AvroSerializer(schema_registry_client,
user_schema_string,
conf={'auto.register.schemas': False})
Google OIDC integration¶
For Google OIDC integration with Python clients:
# Google OIDC configuration for Kafka
google_oauth_config = {
'bootstrap.servers': 'your-bootstrap-server:9092',
'security.protocol': 'SASL_SSL',
'sasl.mechanism': 'OAUTHBEARER',
'sasl.oauthbearer.token.endpoint.url': 'https://oauth2.googleapis.com/token',
'sasl.oauthbearer.client.id': 'your-google-client-id',
'sasl.oauthbearer.client.secret': 'your-google-client-secret',
'sasl.oauthbearer.scope': 'https://www.googleapis.com/auth/cloud-platform',
'sasl.oauthbearer.extensions': 'logical_cluster=<lkc-xxxxx>,identity_pool_id=<pool-yyyyy>',
'oauth_cb': oauth_token_refresh_cb,
'debug': 'security,protocol,broker'
}
# Create producer with Google OIDC configuration
producer = Producer(google_oauth_config)
Troubleshoot Python OAuth clients¶
Common issues and solutions for Python OAuth clients:
Authentication failures¶
- Verify client ID and secret are correct
- Check token endpoint URL is accessible
- Ensure logical cluster ID is valid
- Validate identity pool ID if used
Network issues¶
- Confirm network connectivity to OAuth provider
- Check firewall rules allow OAuth traffic
- Verify SSL certificate validation
Configuration issues¶
- Ensure all required parameters are provided
- Validate OAuth callback function signature
- Check timeout values are reasonable
Debug logging¶
Enable debug logging for OAuth troubleshooting by adding the debug
parameter
to your configuration:
# Add to your configuration
config = {
# ... your OAuth config
'debug': 'security,protocol,broker'
}
This provides detailed librdkafka logs for authentication issues.