OAuth Configuration Reference for Confluent Cloud Clients¶
This section provides a comprehensive reference for OAuth configuration parameters across different client types and platforms.
Java client parameters¶
| Parameter | Description | Default | Required | 
|---|---|---|---|
sasl.mechanism | 
SASL mechanism for OAuth authentication | OAUTHBEARER | Yes | 
sasl.jaas.config | 
JAAS configuration for OAuth login module | None | Yes | 
sasl.login.callback.handler.class | 
OAuth callback handler class | io.confluent.kafka.clients.plugins.auth.token.TokenUserLoginCallbackHandler | Yes | 
sasl.login.connect.timeout.ms | 
Connection timeout for token requests | 10000 | No | 
sasl.login.read.timeout.ms | 
Read timeout for token requests | 10000 | No | 
sasl.login.retry.backoff.ms | 
Initial retry backoff for failed requests | 100 | No | 
sasl.login.retry.backoff.max.ms | 
Maximum retry backoff for failed requests | 10000 | No | 
oauth.token.endpoint.uri | 
OAuth token endpoint URL | None | Yes | 
oauth.client.id | 
OAuth client ID | None | Yes | 
oauth.client.secret | 
OAuth client secret | None | Yes | 
oauth.scope | 
OAuth scopes for token request | None | No | 
oauth.logical.cluster | 
Logical cluster ID for Confluent Cloud. Required except when connecting to AWS Enterprise clusters over AWS PrivateLink. Can be omitted due to server-side mapping. | None | Conditional | 
oauth.identity.pool.id | 
Identity pool ID for Confluent Cloud | None | No | 
JAAS configuration parameters¶
| Parameter | Description | Default | Required | 
|---|---|---|---|
oauth.token.endpoint.uri | 
OAuth token endpoint URL | None | Yes | 
oauth.client.id | 
OAuth client ID | None | Yes | 
oauth.client.secret | 
OAuth client secret | None | Yes | 
oauth.scope | 
OAuth scopes for token request | None | No | 
oauth.logical.cluster | 
Logical cluster ID for Confluent Cloud. Required except when connecting to AWS Enterprise clusters over AWS PrivateLink; may be omitted due to server-side mapping. | None | Conditional | 
oauth.identity.pool.id | 
Identity pool ID for Confluent Cloud | None | No | 
Python client parameters¶
| Parameter | Description | Default | Required | 
|---|---|---|---|
sasl.mechanism | 
SASL mechanism for OAuth authentication | OAUTHBEARER | Yes | 
sasl.oauth.token.endpoint.uri | 
OAuth token endpoint URL | None | Yes | 
sasl.oauth.client.id | 
OAuth client ID | None | Yes | 
sasl.oauth.client.secret | 
OAuth client secret | None | Yes | 
sasl.oauth.scope | 
OAuth scopes for token request | None | No | 
sasl.oauth.logical.cluster | 
Logical cluster ID for Confluent Cloud. Required except when connecting to AWS Enterprise clusters over AWS PrivateLink; may be omitted due to server-side mapping. | None | Conditional | 
sasl.oauth.identity.pool.id | 
Identity pool ID for Confluent Cloud | None | No | 
sasl.oauth.connect.timeout.ms | 
Connection timeout for token requests | 10000 | No | 
sasl.oauth.read.timeout.ms | 
Read timeout for token requests | 10000 | No | 
.NET client parameters¶
| Parameter | Description | Default | Required | 
|---|---|---|---|
sasl.mechanism | 
SASL mechanism for OAuth authentication | OAUTHBEARER | Yes | 
sasl.oauth.token.endpoint.uri | 
OAuth token endpoint URL | None | Yes | 
sasl.oauth.client.id | 
OAuth client ID | None | Yes | 
sasl.oauth.client.secret | 
OAuth client secret | None | Yes | 
sasl.oauth.scope | 
OAuth scopes for token request | None | No | 
sasl.oauth.logical.cluster | 
Logical cluster ID for Confluent Cloud. Required except when connecting to AWS Enterprise clusters over AWS PrivateLink; may be omitted due to server-side mapping. | None | Conditional | 
sasl.oauth.identity.pool.id | 
Identity pool ID for Confluent Cloud | None | No | 
sasl.oauth.connect.timeout.ms | 
Connection timeout for token requests | 10000 | No | 
sasl.oauth.read.timeout.ms | 
Read timeout for token requests | 10000 | No | 
Go client parameters¶
| Parameter | Description | Default | Required | 
|---|---|---|---|
sasl.mechanism | 
SASL mechanism for OAuth authentication | OAUTHBEARER | Yes | 
sasl.oauth.token.endpoint.uri | 
OAuth token endpoint URL | None | Yes | 
sasl.oauth.client.id | 
OAuth client ID | None | Yes | 
sasl.oauth.client.secret | 
OAuth client secret | None | Yes | 
sasl.oauth.scope | 
OAuth scopes for token request | None | No | 
sasl.oauth.logical.cluster | 
Logical cluster ID for Confluent Cloud. Required except when connecting to AWS Enterprise clusters over AWS PrivateLink; may be omitted due to server-side mapping. | None | Conditional | 
sasl.oauth.identity.pool.id | 
Identity pool ID for Confluent Cloud | None | No | 
sasl.oauth.connect.timeout.ms | 
Connection timeout for token requests | 10000 | No | 
sasl.oauth.read.timeout.ms | 
Read timeout for token requests | 10000 | No | 
Schema Registry client parameters¶
| Parameter | Description | Default | Required | 
|---|---|---|---|
bearer.auth.credentials.source | 
Authentication method. Use OAUTHBEARER for OAuth | 
None | Yes | 
bearer.auth.issuer.endpoint.url | 
OAuth token endpoint URL | None | Yes | 
bearer.auth.client.id | 
OAuth client ID | None | Yes | 
bearer.auth.client.secret | 
OAuth client secret | None | Yes | 
bearer.auth.scope | 
OAuth scopes for token request | None | No | 
bearer.auth.logical.cluster | 
Schema Registry logical cluster ID (lsrc-xxxxx) | 
None | Yes | 
bearer.auth.identity.pool.id | 
Identity pool ID for Confluent Cloud | None | No | 
Configuration validation¶
When configuring OAuth authentication, validate your configuration:
- Required parameters: Ensure all required parameters are provided.
 - URL format: Verify token endpoint URLs are properly formatted
 - Credentials: Validate client ID and secret are correct
 - Network access: Confirm network connectivity to OAuth provider
 - Permissions: Verify client has appropriate scopes and permissions
 
// Configuration validation example
public static void validateOAuthConfig(Properties props) {
    // Check required parameters
    String[] required = {
        "sasl.mechanism",
        "sasl.jaas.config",
        "oauth.token.endpoint.uri",
        "oauth.client.id",
        "oauth.client.secret",
        "oauth.logical.cluster"
    };
    for (String param : required) {
        if (!props.containsKey(param)) {
            throw new IllegalArgumentException("Missing required parameter: " + param);
        }
    }
    // Validate timeout values
    int connectTimeout = Integer.parseInt(props.getProperty("sasl.login.connect.timeout.ms", "10000"));
    int readTimeout = Integer.parseInt(props.getProperty("sasl.login.read.timeout.ms", "10000"));
    if (connectTimeout < 1000 || readTimeout < 1000) {
        throw new IllegalArgumentException("Timeout values must be at least 1000ms");
    }
}
Configuration templates¶
Use configuration templates for consistent setup:
# oauth-config-template.yaml
kafka:
  oauth:
    token_endpoint: ${OAUTH_TOKEN_ENDPOINT}
    client_id: ${OAUTH_CLIENT_ID}
    client_secret: ${OAUTH_CLIENT_SECRET}
    scope: ${OAUTH_SCOPE}
    logical_cluster: ${LOGICAL_CLUSTER_ID}
    identity_pool: ${IDENTITY_POOL_ID}
    timeouts:
      connect: 10000
      read: 10000
      retry_backoff: 100
      retry_max: 10000
Environment-specific configurations¶
Development environment¶
# Development OAuth configuration
sasl.oauthbearer.token.endpoint.url=https://dev-auth.example.com/oauth2/token
sasl.oauthbearer.client.id=dev-kafka-client
sasl.oauthbearer.client.secret=${DEV_CLIENT_SECRET}
sasl.oauthbearer.scope=kafka:dev
Staging environment¶
# Staging OAuth configuration
sasl.oauthbearer.token.endpoint.url=https://staging-auth.example.com/oauth2/token
sasl.oauthbearer.client.id=staging-kafka-client
sasl.oauthbearer.client.secret=${STAGING_CLIENT_SECRET}
sasl.oauthbearer.scope=kafka:staging
Production environment¶
# Production OAuth configuration
sasl.oauthbearer.token.endpoint.url=https://prod-auth.example.com/oauth2/token
sasl.oauthbearer.client.id=prod-kafka-client
sasl.oauthbearer.client.secret=${PROD_CLIENT_SECRET}
sasl.oauthbearer.scope=kafka:prod
Advanced configuration options¶
Custom token providers¶
For advanced use cases, implement custom token providers:
public class CustomTokenProvider implements BearerAuthCredentialProvider {
    private final String tokenEndpoint;
    private final String clientId;
    private final String clientSecret;
    private final Map<String, String> additionalHeaders;
    public CustomTokenProvider(String tokenEndpoint, String clientId,
                             String clientSecret, Map<String, String> additionalHeaders) {
        this.tokenEndpoint = tokenEndpoint;
        this.clientId = clientId;
        this.clientSecret = clientSecret;
        this.additionalHeaders = additionalHeaders;
    }
    @Override
    public String getBearerToken(URL url) throws IOException {
        // Custom token retrieval logic
        HttpURLConnection connection = (HttpURLConnection) new URL(tokenEndpoint).openConnection();
        connection.setRequestMethod("POST");
        connection.setRequestProperty("Content-Type", "application/x-www-form-urlencoded");
        // Add custom headers
        if (additionalHeaders != null) {
            for (Map.Entry<String, String> header : additionalHeaders.entrySet()) {
                connection.setRequestProperty(header.getKey(), header.getValue());
            }
        }
        // Set up authentication
        String auth = clientId + ":" + clientSecret;
        String encodedAuth = Base64.getEncoder().encodeToString(auth.getBytes());
        connection.setRequestProperty("Authorization", "Basic " + encodedAuth);
        // Send request
        String postData = "grant_type=client_credentials";
        connection.setDoOutput(true);
        try (OutputStream os = connection.getOutputStream()) {
            os.write(postData.getBytes());
        }
        // Parse response
        try (BufferedReader br = new BufferedReader(
                new InputStreamReader(connection.getInputStream()))) {
            StringBuilder response = new StringBuilder();
            String line;
            while ((line = br.readLine()) != null) {
                response.append(line);
            }
            // Parse JSON response
            ObjectMapper mapper = new ObjectMapper();
            JsonNode jsonNode = mapper.readTree(response.toString());
            return jsonNode.get("access_token").asText();
        }
    }
}
Retry and circuit breaker patterns¶
public class CircuitBreaker {
    private final AtomicInteger failureCount = new AtomicInteger(0);
    private final AtomicLong lastFailureTime = new AtomicLong(0);
    private final int threshold;
    private final long timeout;
    public CircuitBreaker(int threshold, long timeout) {
        this.threshold = threshold;
        this.timeout = timeout;
    }
    public boolean isOpen() {
        long now = System.currentTimeMillis();
        if (failureCount.get() >= threshold) {
            if (now - lastFailureTime.get() < timeout) {
                return true; // Circuit is open
            } else {
                // Reset after timeout
                failureCount.set(0);
                return false;
            }
        }
        return false;
    }
    public void recordFailure() {
        failureCount.incrementAndGet();
        lastFailureTime.set(System.currentTimeMillis());
    }
    public void recordSuccess() {
        failureCount.set(0);
    }
}
public class RetryTemplate {
    private final int maxRetries;
    private final long backoffMs;
    public RetryTemplate(int maxRetries, long backoffMs) {
        this.maxRetries = maxRetries;
        this.backoffMs = backoffMs;
    }
    public <T> T execute(Supplier<T> operation) throws Exception {
        int attempts = 0;
        while (attempts < maxRetries) {
            try {
                return operation.get();
            } catch (Exception e) {
                attempts++;
                if (attempts >= maxRetries) {
                    throw e;
                }
                Thread.sleep(backoffMs * attempts);
            }
        }
        throw new RuntimeException("Max retries exceeded");
    }
}
Deployment considerations¶
Container deployment¶
# Dockerfile for OAuth-enabled Kafka client
FROM openjdk:11-jre-slim
# Copy application
COPY target/oauth-kafka-client.jar /app/
# Set environment variables
ENV OAUTH_TOKEN_ENDPOINT=https://your-oauth-provider.com/oauth2/token
ENV OAUTH_CLIENT_ID=your-client-id
ENV OAUTH_CLIENT_SECRET=your-client-secret
ENV OAUTH_SCOPE=kafka:read kafka:write
ENV LOGICAL_CLUSTER_ID=lkc-xxxxx
ENV IDENTITY_POOL_ID=pool-xxxxx
# Run application
CMD ["java", "-jar", "/app/oauth-kafka-client.jar"]
Kubernetes deployment¶
# kubernetes-deployment.yaml
apiVersion: apps/v1
kind: Deployment
metadata:
  name: oauth-kafka-client
spec:
  replicas: 3
  selector:
    matchLabels:
      app: oauth-kafka-client
  template:
    metadata:
      labels:
        app: oauth-kafka-client
    spec:
      containers:
      - name: kafka-client
        image: your-registry/oauth-kafka-client:latest
        env:
        - name: OAUTH_TOKEN_ENDPOINT
          valueFrom:
            secretKeyRef:
              name: oauth-secrets
              key: token-endpoint
        - name: OAUTH_CLIENT_ID
          valueFrom:
            secretKeyRef:
              name: oauth-secrets
              key: client-id
        - name: OAUTH_CLIENT_SECRET
          valueFrom:
            secretKeyRef:
              name: oauth-secrets
              key: client-secret
        - name: OAUTH_SCOPE
          value: "kafka:read kafka:write"
        - name: LOGICAL_CLUSTER_ID
          value: "lkc-xxxxx"
        - name: IDENTITY_POOL_ID
          value: "pool-xxxxx"
CI/CD integration¶
# .github/workflows/deploy.yml
name: Deploy OAuth Kafka Client
on:
  push:
    branches: [main]
jobs:
  deploy:
    runs-on: ubuntu-latest
    steps:
    - uses: actions/checkout@v2
    - name: Set up JDK
      uses: actions/setup-java@v2
      with:
        java-version: '11'
    - name: Build with Maven
      run: mvn clean package
    - name: Build Docker image
      run: docker build -t oauth-kafka-client .
    - name: Deploy to Kubernetes
      run: |
        kubectl apply -f kubernetes-deployment.yaml
        kubectl rollout restart deployment/oauth-kafka-client
Integration testing¶
Mock OAuth server
@Test
public void testOAuthIntegration() {
    // Start mock OAuth server
    MockOAuthServer mockServer = new MockOAuthServer();
    mockServer.start();
    try {
        // Configure client with mock server
        Properties props = new Properties();
        props.put("sasl.mechanism", "OAUTHBEARER");
        props.put("sasl.jaas.config",
            "org.apache.kafka.common.security.oauthbearer.OAuthBearerLoginModule required " +
            "oauth.token.endpoint.uri=\"" + mockServer.getTokenEndpoint() + "\" " +
            "oauth.client.id=\"test-client\" " +
            "oauth.client.secret=\"test-secret\";");
        // Test authentication
        KafkaProducer<String, String> producer = new KafkaProducer<>(props);
        // ... test operations
    } finally {
        mockServer.stop();
    }
}
Performance testing¶
@Test
public void testOAuthPerformance() {
    // Measure token retrieval performance
    long startTime = System.currentTimeMillis();
    for (int i = 0; i < 100; i++) {
        // Perform OAuth token request
        String token = oauthProvider.getToken();
        assertNotNull(token);
    }
    long endTime = System.currentTimeMillis();
    long duration = endTime - startTime;
    // Assert performance requirements
    assertTrue("OAuth performance below threshold", duration < 5000);
}
Azure User-Assigned Managed Identity (UAMI)¶
Confluent Cloud supports using Azure User-Assigned Managed Identity (UAMI) for OAuth authentication, eliminating the need for static client IDs and secrets. This feature leverages Azure’s built-in identity management to automatically retrieve authentication tokens.
For complete documentation on configuring and using UAMI with OAuth, including step-by-step setup instructions and client examples, see Configure Azure User Assigned Managed Identity OAuth for Confluent Cloud.