Configure Python Clients for OAuth/OIDC on Confluent Cloud

Prerequisites

Before you begin, ensure you have the following:

Language and tooling

  • Python 3.8 or later
  • confluent-kafka-python: 2.0.0 or later
  • Confluent Platform: 7.2.1 or later; 7.1.3 or later

Confluent Cloud environment

  • OAuth setup in Confluent Cloud - Configure OAuth authentication in your Confluent Cloud environment

Credentials and identifiers

  • Client ID - Your application’s identifier (like a username)
  • Client secret - Your application’s password for OAuth
  • Token endpoint URL - Where to request access tokens
  • Scopes - Permissions your application needs (for example, kafka:read kafka:write for Kafka, schema:read for Schema Registry)
  • Kafka cluster ID (lkc-xxxxx) - Your Kafka cluster identifier
  • Identity pool ID (pool-xxxxx) - If using identity pools
  • Schema Registry logical cluster ID (lsrc-xxxxx) - For Schema Registry operations

Client library

  • Install the latest version with OAuth support:

    pip install confluent-kafka
    

Configure Kafka Python clients

Python Kafka clients authenticate to Confluent Cloud clusters using the OAuth 2.0 protocol with a callback function approach. The client passes OAuth configuration parameters to your callback function, which handles the token request and returns the access token.

OAuth callback function

The Python client requires an OAuth callback function with robust error handling:

import requests
import json
from confluent_kafka import KafkaException

def oauth_token_refresh_cb(oauth_config):
    """
    OAuth callback function for Python Kafka client

    Args:
        oauth_config: OAuth configuration dictionary

    Returns:
        tuple: (access_token, expiration_time_ms)
    """
    try:
        payload = {
            'grant_type': 'client_credentials',
            'client_id': oauth_config.get('sasl.oauthbearer.client.id'),
            'client_secret': oauth_config.get('sasl.oauthbearer.client.secret'),
            'scope': oauth_config.get('sasl.oauthbearer.scope')
        }

        response = requests.post(
            oauth_config.get('sasl.oauthbearer.token.endpoint.url'),
            data=payload
        )
        response.raise_for_status()
        token = response.json()

        # Return the token and its expiration time
        return token['access_token'], int(token['expires_in'] * 1000)
    except Exception as e:
        # Handle exceptions and signal failure
        raise KafkaException(f"OAuth token refresh failed: {e}")

Configuration example

Define a single, complete configuration dictionary for your Kafka client:

from confluent_kafka import Producer, Consumer

# Define a single, complete configuration dictionary
producer_config = {
    'bootstrap.servers': 'your-bootstrap-server:9092',
    'security.protocol': 'SASL_SSL',
    'sasl.mechanism': 'OAUTHBEARER',
    'sasl.oauthbearer.token.endpoint.url': 'https://<your-idp.com>/oauth2/token',
    'sasl.oauthbearer.client.id': '<your-client-id>',
    'sasl.oauthbearer.client.secret': '<your-client-secret>',
    'sasl.oauthbearer.scope': 'kafka:read kafka:write',
    'sasl.oauthbearer.extensions': 'logical_cluster=<lkc-xxxxx>,identity_pool_id=<pool-yyyyy>',
    'oauth_cb': oauth_token_refresh_cb,
    'debug': 'security,protocol,broker'  # For troubleshooting
}

# Create producer
producer = Producer(producer_config)

# For consumer, use the same OAuth configuration
consumer_config = {
    'bootstrap.servers': 'your-bootstrap-server:9092',
    'group.id': 'your-consumer-group',
    'security.protocol': 'SASL_SSL',
    'sasl.mechanism': 'OAUTHBEARER',
    'sasl.oauthbearer.token.endpoint.url': 'https://<your-idp.com>/oauth2/token',
    'sasl.oauthbearer.client.id': '<your-client-id>',
    'sasl.oauthbearer.client.secret': '<your-client-secret>',
    'sasl.oauthbearer.scope': 'kafka:read kafka:write',
    'sasl.oauthbearer.extensions': 'logical_cluster=<lkc-xxxxx>,identity_pool_id=<pool-yyyyy>',
    'oauth_cb': oauth_token_refresh_cb,
    'debug': 'security,protocol,broker'
}

# Create consumer
consumer = Consumer(consumer_config)

Test your configuration

  1. Test with a simple producer:

    # Test producer
    producer = Producer(producer_config)
    
    def delivery_report(err, msg):
        if err is not None:
            print(f'Message delivery failed: {err}')
        else:
            print(f'Message delivered to {msg.topic()} [{msg.partition()}] - OAuth is working!')
    
    producer.produce('test-topic', 'test-message', callback=delivery_report)
    producer.flush()
    
  2. Check for common errors:

    • “SASL authentication failed” - Check your OAuth credentials and endpoint
    • “Invalid token” - Verify your callback function is returning a valid token
    • “Connection timeout” - Check your bootstrap servers and network connectivity
  3. Verify token refresh - The client should automatically refresh tokens when they expire

Configure Schema Registry Python clients

The SchemaRegistryClient authenticates using a built-in OAuth mechanism. You only need to provide the correct configuration parameters, and the client handles the token request automatically. It does not use the oauth_cb callback.

Required parameters

The following parameters must be included in the schema.registry.config dictionary when creating a client.

Parameter Description
url The endpoint for your Schema Registry instance.
basic.auth.user.info The API key and secret for Schema Registry, in the format <api-key>:<api-secret>.
bearer.auth.credentials.source Must be set to ``OAUTHBEARER``.
bearer.auth.client.id The client ID from your identity provider.
bearer.auth.client.secret The client secret from your identity provider.
bearer.auth.issuer.endpoint.url The token endpoint URL of your identity provider.
bearer.auth.scope The required permissions scope (for example, schema:read).
bearer.auth.logical.cluster The Schema Registry’s logical cluster ID (for example, lsrc-xxxxx).
bearer.auth.identity.pool.id The identity pool ID, if using identity federation.

Configuration example

from confluent_kafka.schema_registry import SchemaRegistryClient
from confluent_kafka.serialization import StringSerializer
from confluent_kafka.avro import AvroSerializer

schema_registry_conf = {
    'url': 'https://<your-schema-registry-endpoint>',
    'basic.auth.user.info': '<sr-api-key>:<sr-api-secret>',
    # OAuth-specific configuration for Schema Registry client
    'bearer.auth.credentials.source': 'OAUTHBEARER',
    'bearer.auth.issuer.endpoint.url': 'https://<your-idp.com>/oauth2/token',
    'bearer.auth.client.id': '<your-client-id>',
    'bearer.auth.client.secret': '<your-client-secret>',
    'bearer.auth.scope': 'schema:read',
    'bearer.auth.logical.cluster': '<lsrc-xxxxx>',
    'bearer.auth.identity.pool.id': '<pool-yyyyy>'
}

schema_registry_client = SchemaRegistryClient(schema_registry_conf)

avro_serializer = AvroSerializer(schema_registry_client,
                                 user_schema_string,
                                 conf={'auto.register.schemas': False})

Google OIDC integration

For Google OIDC integration with Python clients:

# Google OIDC configuration for Kafka
google_oauth_config = {
    'bootstrap.servers': 'your-bootstrap-server:9092',
    'security.protocol': 'SASL_SSL',
    'sasl.mechanism': 'OAUTHBEARER',
    'sasl.oauthbearer.token.endpoint.url': 'https://oauth2.googleapis.com/token',
    'sasl.oauthbearer.client.id': 'your-google-client-id',
    'sasl.oauthbearer.client.secret': 'your-google-client-secret',
    'sasl.oauthbearer.scope': 'https://www.googleapis.com/auth/cloud-platform',
    'sasl.oauthbearer.extensions': 'logical_cluster=<lkc-xxxxx>,identity_pool_id=<pool-yyyyy>',
    'oauth_cb': oauth_token_refresh_cb,
    'debug': 'security,protocol,broker'
}

# Create producer with Google OIDC configuration
producer = Producer(google_oauth_config)

Troubleshoot Python OAuth clients

Common issues and solutions for Python OAuth clients:

Authentication failures

  • Verify client ID and secret are correct
  • Check token endpoint URL is accessible
  • Ensure logical cluster ID is valid
  • Validate identity pool ID if used

Network issues

  • Confirm network connectivity to OAuth provider
  • Check firewall rules allow OAuth traffic
  • Verify SSL certificate validation

Configuration issues

  • Ensure all required parameters are provided
  • Validate OAuth callback function signature
  • Check timeout values are reasonable

Debug logging

Enable debug logging for OAuth troubleshooting by adding the debug parameter to your configuration:

# Add to your configuration
config = {
    # ... your OAuth config
    'debug': 'security,protocol,broker'
}

This provides detailed librdkafka logs for authentication issues.