OAuth Configuration Reference for Confluent Cloud Clients

This section provides a comprehensive reference for OAuth configuration parameters across different client types and platforms.

Java client parameters

Parameter

Description

Default

Required

sasl.mechanism

SASL mechanism for OAuth authentication

OAUTHBEARER

Yes

sasl.jaas.config

JAAS configuration for OAuth login module

None

Yes

sasl.login.callback.handler.class

OAuth callback handler class

io.confluent.kafka.clients.plugins.auth.token.TokenUserLoginCallbackHandler

Yes

sasl.login.connect.timeout.ms

Connection timeout for token requests

10000

No

sasl.login.read.timeout.ms

Read timeout for token requests

10000

No

sasl.login.retry.backoff.ms

Initial retry backoff for failed requests

100

No

sasl.login.retry.backoff.max.ms

Maximum retry backoff for failed requests

10000

No

oauth.token.endpoint.uri

OAuth token endpoint URL

None

Yes

oauth.client.id

OAuth client ID

None

Yes

oauth.client.secret

OAuth client secret

None

Yes

oauth.scope

OAuth scopes for token request

None

No

oauth.logical.cluster

Logical cluster ID for Confluent Cloud. Required except when connecting to AWS Enterprise clusters over AWS PrivateLink. Can be omitted due to server-side mapping.

None

Conditional

oauth.identity.pool.id

Identity pool ID for Confluent Cloud

None

No

JAAS configuration parameters

Parameter

Description

Default

Required

oauth.token.endpoint.uri

OAuth token endpoint URL

None

Yes

oauth.client.id

OAuth client ID

None

Yes

oauth.client.secret

OAuth client secret

None

Yes

oauth.scope

OAuth scopes for token request

None

No

oauth.logical.cluster

Logical cluster ID for Confluent Cloud. Required except when connecting to AWS Enterprise clusters over AWS PrivateLink; may be omitted due to server-side mapping.

None

Conditional

oauth.identity.pool.id

Identity pool ID for Confluent Cloud

None

No

Python client parameters

Parameter

Description

Default

Required

sasl.mechanism

SASL mechanism for OAuth authentication

OAUTHBEARER

Yes

sasl.oauth.token.endpoint.uri

OAuth token endpoint URL

None

Yes

sasl.oauth.client.id

OAuth client ID

None

Yes

sasl.oauth.client.secret

OAuth client secret

None

Yes

sasl.oauth.scope

OAuth scopes for token request

None

No

sasl.oauth.logical.cluster

Logical cluster ID for Confluent Cloud. Required except when connecting to AWS Enterprise clusters over AWS PrivateLink; may be omitted due to server-side mapping.

None

Conditional

sasl.oauth.identity.pool.id

Identity pool ID for Confluent Cloud

None

No

sasl.oauth.connect.timeout.ms

Connection timeout for token requests

10000

No

sasl.oauth.read.timeout.ms

Read timeout for token requests

10000

No

.NET client parameters

Parameter

Description

Default

Required

sasl.mechanism

SASL mechanism for OAuth authentication

OAUTHBEARER

Yes

sasl.oauth.token.endpoint.uri

OAuth token endpoint URL

None

Yes

sasl.oauth.client.id

OAuth client ID

None

Yes

sasl.oauth.client.secret

OAuth client secret

None

Yes

sasl.oauth.scope

OAuth scopes for token request

None

No

sasl.oauth.logical.cluster

Logical cluster ID for Confluent Cloud. Required except when connecting to AWS Enterprise clusters over AWS PrivateLink; may be omitted due to server-side mapping.

None

Conditional

sasl.oauth.identity.pool.id

Identity pool ID for Confluent Cloud

None

No

sasl.oauth.connect.timeout.ms

Connection timeout for token requests

10000

No

sasl.oauth.read.timeout.ms

Read timeout for token requests

10000

No

Go client parameters

Parameter

Description

Default

Required

sasl.mechanism

SASL mechanism for OAuth authentication

OAUTHBEARER

Yes

sasl.oauth.token.endpoint.uri

OAuth token endpoint URL

None

Yes

sasl.oauth.client.id

OAuth client ID

None

Yes

sasl.oauth.client.secret

OAuth client secret

None

Yes

sasl.oauth.scope

OAuth scopes for token request

None

No

sasl.oauth.logical.cluster

Logical cluster ID for Confluent Cloud. Required except when connecting to AWS Enterprise clusters over AWS PrivateLink; may be omitted due to server-side mapping.

None

Conditional

sasl.oauth.identity.pool.id

Identity pool ID for Confluent Cloud

None

No

sasl.oauth.connect.timeout.ms

Connection timeout for token requests

10000

No

sasl.oauth.read.timeout.ms

Read timeout for token requests

10000

No

Schema Registry client parameters

Parameter

Description

Default

Required

bearer.auth.credentials.source

Authentication method. Use OAUTHBEARER for OAuth

None

Yes

bearer.auth.issuer.endpoint.url

OAuth token endpoint URL

None

Yes

bearer.auth.client.id

OAuth client ID

None

Yes

bearer.auth.client.secret

OAuth client secret

None

Yes

bearer.auth.scope

OAuth scopes for token request

None

No

bearer.auth.logical.cluster

Schema Registry logical cluster ID (lsrc-xxxxx)

None

Yes

bearer.auth.identity.pool.id

Identity pool ID for Confluent Cloud

None

No

Configuration validation

When configuring OAuth authentication, validate your configuration:

  • Required parameters: Ensure all required parameters are provided.

  • URL format: Verify token endpoint URLs are properly formatted

  • Credentials: Validate client ID and secret are correct

  • Network access: Confirm network connectivity to OAuth provider

  • Permissions: Verify client has appropriate scopes and permissions

// Configuration validation example
public static void validateOAuthConfig(Properties props) {
    // Check required parameters
    String[] required = {
        "sasl.mechanism",
        "sasl.jaas.config",
        "oauth.token.endpoint.uri",
        "oauth.client.id",
        "oauth.client.secret",
        "oauth.logical.cluster"
    };

    for (String param : required) {
        if (!props.containsKey(param)) {
            throw new IllegalArgumentException("Missing required parameter: " + param);
        }
    }

    // Validate timeout values
    int connectTimeout = Integer.parseInt(props.getProperty("sasl.login.connect.timeout.ms", "10000"));
    int readTimeout = Integer.parseInt(props.getProperty("sasl.login.read.timeout.ms", "10000"));

    if (connectTimeout < 1000 || readTimeout < 1000) {
        throw new IllegalArgumentException("Timeout values must be at least 1000ms");
    }
}

Configuration templates

Use configuration templates for consistent setup:

# oauth-config-template.yaml
kafka:
  oauth:
    token_endpoint: ${OAUTH_TOKEN_ENDPOINT}
    client_id: ${OAUTH_CLIENT_ID}
    client_secret: ${OAUTH_CLIENT_SECRET}
    scope: ${OAUTH_SCOPE}
    logical_cluster: ${LOGICAL_CLUSTER_ID}
    identity_pool: ${IDENTITY_POOL_ID}
    timeouts:
      connect: 10000
      read: 10000
      retry_backoff: 100
      retry_max: 10000

Environment-specific configurations

Development environment

# Development OAuth configuration
sasl.oauthbearer.token.endpoint.url=https://dev-auth.example.com/oauth2/token
sasl.oauthbearer.client.id=dev-kafka-client
sasl.oauthbearer.client.secret=${DEV_CLIENT_SECRET}
sasl.oauthbearer.scope=kafka:dev

Staging environment

# Staging OAuth configuration
sasl.oauthbearer.token.endpoint.url=https://staging-auth.example.com/oauth2/token
sasl.oauthbearer.client.id=staging-kafka-client
sasl.oauthbearer.client.secret=${STAGING_CLIENT_SECRET}
sasl.oauthbearer.scope=kafka:staging

Production environment

# Production OAuth configuration
sasl.oauthbearer.token.endpoint.url=https://prod-auth.example.com/oauth2/token
sasl.oauthbearer.client.id=prod-kafka-client
sasl.oauthbearer.client.secret=${PROD_CLIENT_SECRET}
sasl.oauthbearer.scope=kafka:prod

Advanced configuration options

Custom token providers

For advanced use cases, implement custom token providers:

public class CustomTokenProvider implements BearerAuthCredentialProvider {

    private final String tokenEndpoint;
    private final String clientId;
    private final String clientSecret;
    private final Map<String, String> additionalHeaders;

    public CustomTokenProvider(String tokenEndpoint, String clientId,
                             String clientSecret, Map<String, String> additionalHeaders) {
        this.tokenEndpoint = tokenEndpoint;
        this.clientId = clientId;
        this.clientSecret = clientSecret;
        this.additionalHeaders = additionalHeaders;
    }

    @Override
    public String getBearerToken(URL url) throws IOException {
        // Custom token retrieval logic
        HttpURLConnection connection = (HttpURLConnection) new URL(tokenEndpoint).openConnection();
        connection.setRequestMethod("POST");
        connection.setRequestProperty("Content-Type", "application/x-www-form-urlencoded");

        // Add custom headers
        if (additionalHeaders != null) {
            for (Map.Entry<String, String> header : additionalHeaders.entrySet()) {
                connection.setRequestProperty(header.getKey(), header.getValue());
            }
        }

        // Set up authentication
        String auth = clientId + ":" + clientSecret;
        String encodedAuth = Base64.getEncoder().encodeToString(auth.getBytes());
        connection.setRequestProperty("Authorization", "Basic " + encodedAuth);

        // Send request
        String postData = "grant_type=client_credentials";
        connection.setDoOutput(true);
        try (OutputStream os = connection.getOutputStream()) {
            os.write(postData.getBytes());
        }

        // Parse response
        try (BufferedReader br = new BufferedReader(
                new InputStreamReader(connection.getInputStream()))) {
            StringBuilder response = new StringBuilder();
            String line;
            while ((line = br.readLine()) != null) {
                response.append(line);
            }

            // Parse JSON response
            ObjectMapper mapper = new ObjectMapper();
            JsonNode jsonNode = mapper.readTree(response.toString());
            return jsonNode.get("access_token").asText();
        }
    }
}

Retry and circuit breaker patterns

public class CircuitBreaker {
    private final AtomicInteger failureCount = new AtomicInteger(0);
    private final AtomicLong lastFailureTime = new AtomicLong(0);
    private final int threshold;
    private final long timeout;

    public CircuitBreaker(int threshold, long timeout) {
        this.threshold = threshold;
        this.timeout = timeout;
    }

    public boolean isOpen() {
        long now = System.currentTimeMillis();
        if (failureCount.get() >= threshold) {
            if (now - lastFailureTime.get() < timeout) {
                return true; // Circuit is open
            } else {
                // Reset after timeout
                failureCount.set(0);
                return false;
            }
        }
        return false;
    }

    public void recordFailure() {
        failureCount.incrementAndGet();
        lastFailureTime.set(System.currentTimeMillis());
    }

    public void recordSuccess() {
        failureCount.set(0);
    }
}

public class RetryTemplate {
    private final int maxRetries;
    private final long backoffMs;

    public RetryTemplate(int maxRetries, long backoffMs) {
        this.maxRetries = maxRetries;
        this.backoffMs = backoffMs;
    }

    public <T> T execute(Supplier<T> operation) throws Exception {
        int attempts = 0;
        while (attempts < maxRetries) {
            try {
                return operation.get();
            } catch (Exception e) {
                attempts++;
                if (attempts >= maxRetries) {
                    throw e;
                }
                Thread.sleep(backoffMs * attempts);
            }
        }
        throw new RuntimeException("Max retries exceeded");
    }
}

Deployment considerations

Container deployment

# Dockerfile for OAuth-enabled Kafka client
FROM openjdk:11-jre-slim

# Copy application
COPY target/oauth-kafka-client.jar /app/

# Set environment variables
ENV OAUTH_TOKEN_ENDPOINT=https://your-oauth-provider.com/oauth2/token
ENV OAUTH_CLIENT_ID=your-client-id
ENV OAUTH_CLIENT_SECRET=your-client-secret
ENV OAUTH_SCOPE=kafka:read kafka:write
ENV LOGICAL_CLUSTER_ID=lkc-xxxxx
ENV IDENTITY_POOL_ID=pool-xxxxx

# Run application
CMD ["java", "-jar", "/app/oauth-kafka-client.jar"]

Kubernetes deployment

# kubernetes-deployment.yaml
apiVersion: apps/v1
kind: Deployment
metadata:
  name: oauth-kafka-client
spec:
  replicas: 3
  selector:
    matchLabels:
      app: oauth-kafka-client
  template:
    metadata:
      labels:
        app: oauth-kafka-client
    spec:
      containers:
      - name: kafka-client
        image: your-registry/oauth-kafka-client:latest
        env:
        - name: OAUTH_TOKEN_ENDPOINT
          valueFrom:
            secretKeyRef:
              name: oauth-secrets
              key: token-endpoint
        - name: OAUTH_CLIENT_ID
          valueFrom:
            secretKeyRef:
              name: oauth-secrets
              key: client-id
        - name: OAUTH_CLIENT_SECRET
          valueFrom:
            secretKeyRef:
              name: oauth-secrets
              key: client-secret
        - name: OAUTH_SCOPE
          value: "kafka:read kafka:write"
        - name: LOGICAL_CLUSTER_ID
          value: "lkc-xxxxx"
        - name: IDENTITY_POOL_ID
          value: "pool-xxxxx"

CI/CD integration

# .github/workflows/deploy.yml
name: Deploy OAuth Kafka Client

on:
  push:
    branches: [main]

jobs:
  deploy:
    runs-on: ubuntu-latest
    steps:
    - uses: actions/checkout@v2

    - name: Set up JDK
      uses: actions/setup-java@v2
      with:
        java-version: '11'

    - name: Build with Maven
      run: mvn clean package

    - name: Build Docker image
      run: docker build -t oauth-kafka-client .

    - name: Deploy to Kubernetes
      run: |
        kubectl apply -f kubernetes-deployment.yaml
        kubectl rollout restart deployment/oauth-kafka-client

Integration testing

Mock OAuth server

@Test
public void testOAuthIntegration() {
    // Start mock OAuth server
    MockOAuthServer mockServer = new MockOAuthServer();
    mockServer.start();

    try {
        // Configure client with mock server
        Properties props = new Properties();
        props.put("sasl.mechanism", "OAUTHBEARER");
        props.put("sasl.jaas.config",
            "org.apache.kafka.common.security.oauthbearer.OAuthBearerLoginModule required " +
            "oauth.token.endpoint.uri=\"" + mockServer.getTokenEndpoint() + "\" " +
            "oauth.client.id=\"test-client\" " +
            "oauth.client.secret=\"test-secret\";");

        // Test authentication
        KafkaProducer<String, String> producer = new KafkaProducer<>(props);
        // ... test operations

    } finally {
        mockServer.stop();
    }
}

Performance testing

@Test
public void testOAuthPerformance() {
    // Measure token retrieval performance
    long startTime = System.currentTimeMillis();

    for (int i = 0; i < 100; i++) {
        // Perform OAuth token request
        String token = oauthProvider.getToken();
        assertNotNull(token);
    }

    long endTime = System.currentTimeMillis();
    long duration = endTime - startTime;

    // Assert performance requirements
    assertTrue("OAuth performance below threshold", duration < 5000);
}

Azure User-Assigned Managed Identity (UAMI)

Confluent Cloud supports using Azure User-Assigned Managed Identity (UAMI) for OAuth authentication, eliminating the need for static client IDs and secrets. This feature leverages Azure’s built-in identity management to automatically retrieve authentication tokens.

For complete documentation on configuring and using UAMI with OAuth, including step-by-step setup instructions and client examples, see Configure Azure User Assigned Managed Identity OAuth for Confluent Cloud.