Configure JavaScript Clients for OAuth/OIDC on Confluent Cloud

Prerequisites

Before you begin, ensure you have the following:

Language and tooling

Confluent Cloud environment

  • OAuth setup in Confluent Cloud - Configure OAuth authentication.

Credentials and identifiers

  • Client ID - Your application’s identifier
  • Client secret - Your application’s password for OAuth
  • Token endpoint URL - Where to request access tokens
  • Scopes - Permissions your application needs (for example, kafka:read kafka:write for Kafka, schema-registry for Schema Registry)
  • Kafka cluster ID (lkc-xxxxx) - Your Kafka cluster identifier.
  • Schema Registry cluster ID (lsrc-xxxxx) - Your Schema Registry cluster identifier.
  • Identity pool ID (pool-xxxxx) - If using identity pools.

Client library

  • Install required packages:

    npm install confluent-kafka-javascript @confluentinc/schemaregistry
    

Configure Kafka clients

JavaScript Kafka clients use a callback function approach for OAuth token retrieval.

OAuth callback function

The JavaScript client requires an OAuth callback function that handles token requests:

async function oauthCallback(oauthConfig) {
    /**
     * OAuth callback function for JavaScript Kafka client
     *
     * @param {Object} oauthConfig - OAuth configuration object
     * @returns {string} Access token
     */
    const tokenEndpoint = oauthConfig['sasl.oauth.token.endpoint.uri'];
    const clientId = oauthConfig['sasl.oauth.client.id'];
    const clientSecret = oauthConfig['sasl.oauth.client.secret'];
    const scope = oauthConfig['sasl.oauth.scope'] || '';

    // Prepare token request
    const data = new URLSearchParams({
        grant_type: 'client_credentials',
        client_id: clientId,
        client_secret: clientSecret
    });

    if (scope) {
        data.append('scope', scope);
    }

    // Make token request
    const response = await fetch(tokenEndpoint, {
        method: 'POST',
        headers: {
            'Content-Type': 'application/x-www-form-urlencoded'
        },
        body: data
    });

    if (!response.ok) {
        throw new Error(`HTTP error! status: ${response.status}`);
    }

    const tokenData = await response.json();
    return tokenData.access_token;
}

Kafka producer configuration

const { Kafka } = require('confluent-kafka-javascript');

// OAuth Configuration
const oauthConfig = {
    'sasl.mechanism': 'OAUTHBEARER',
    'sasl.oauth.token.endpoint.uri': 'https://your-oauth-provider.com/oauth2/token',
    'sasl.oauth.client.id': 'your-client-id',
    'sasl.oauth.client.secret': 'your-client-secret',
    'sasl.oauth.scope': 'kafka:read kafka:write',
    'sasl.oauth.logical.cluster': 'lkc-xxxxx',
    'sasl.oauth.identity.pool.id': 'pool-xxxxx',
    'oauth_cb': oauthCallback
};

// Kafka Producer Configuration
const producerConfig = {
    'bootstrap.servers': 'pkc-xxxxx.us-west-2.aws.confluent.cloud:9092',
    'security.protocol': 'SASL_SSL',
    ...oauthConfig
};

// Create producer
const producer = new Kafka.Producer(producerConfig);

Kafka consumer configuration

// Kafka Consumer Configuration
const consumerConfig = {
    'bootstrap.servers': 'pkc-xxxxx.us-west-2.aws.confluent.cloud:9092',
    'group.id': 'your-consumer-group',
    'security.protocol': 'SASL_SSL',
    ...oauthConfig
};

// Create consumer
const consumer = new Kafka.Consumer(consumerConfig);

Configure Schema Registry clients

The Schema Registry client uses a different configuration approach with bearer authentication.

Modern Schema Registry client configuration

Use the modern @confluentinc/schemaregistry package for better TypeScript support:

const { SchemaRegistryClient } = require('@confluentinc/schemaregistry');

// Schema Registry Configuration
const schemaRegistryConfig = {
    baseURLs: ['https://psrc-xxxxx.us-west-2.aws.confluent.cloud'],
    bearerAuthCredentials: {
        credentialsSource: 'OAUTHBEARER',
        issuerEndpointUrl: 'https://your-oauth-provider.com/oauth2/token',
        clientId: 'your-client-id',
        clientSecret: 'your-client-secret',
        scope: 'schema-registry',
        identityPoolId: 'pool-xxxxx',
        logicalCluster: 'lsrc-xxxxx'  // Your Schema Registry cluster ID
    }
};

// Create Schema Registry client
const schemaRegistry = new SchemaRegistryClient(schemaRegistryConfig);

Complete working example

Here’s a complete example showing how to use both Kafka and Schema Registry clients together:

const { Kafka } = require('confluent-kafka-javascript');
const { SchemaRegistryClient } = require('@confluentinc/schemaregistry');

// --- 1. Define the Kafka OAuth Callback ---
async function oauthCallback(oauthConfig) {
    const tokenEndpoint = oauthConfig['sasl.oauth.token.endpoint.uri'];
    const clientId = oauthConfig['sasl.oauth.client.id'];
    const clientSecret = oauthConfig['sasl.oauth.client.secret'];
    const scope = oauthConfig['sasl.oauth.scope'] || '';

    const data = new URLSearchParams({
        grant_type: 'client_credentials',
        client_id: clientId,
        client_secret: clientSecret
    });

    if (scope) {
        data.append('scope', scope);
    }

    const response = await fetch(tokenEndpoint, {
        method: 'POST',
        headers: {
            'Content-Type': 'application/x-www-form-urlencoded'
        },
        body: data
    });

    if (!response.ok) {
        throw new Error(`HTTP error! status: ${response.status}`);
    }

    const tokenData = await response.json();
    return tokenData.access_token;
}

// --- 2. Configure the Kafka Client ---
const producerConfig = {
    'bootstrap.servers': 'pkc-xxxxx.us-west-2.aws.confluent.cloud:9092',
    'security.protocol': 'SASL_SSL',
    'sasl.mechanism': 'OAUTHBEARER',
    'sasl.oauth.token.endpoint.uri': 'https://your-oauth-provider.com/oauth2/token',
    'sasl.oauth.client.id': 'your-client-id',
    'sasl.oauth.client.secret': 'your-client-secret',
    'sasl.oauth.scope': 'kafka:read kafka:write',
    'sasl.oauth.logical.cluster': 'lkc-xxxxx',
    'sasl.oauth.identity.pool.id': 'pool-xxxxx',
    'oauth_cb': oauthCallback
};

// --- 3. Configure the Schema Registry Client ---
const srConfig = {
    baseURLs: ['https://psrc-xxxxx.us-west-2.aws.confluent.cloud'],
    bearerAuthCredentials: {
        credentialsSource: 'OAUTHBEARER',
        issuerEndpointUrl: 'https://your-oauth-provider.com/oauth2/token',
        clientId: 'your-client-id',
        clientSecret: 'your-client-secret',
        scope: 'schema-registry',
        identityPoolId: 'pool-xxxxx',
        logicalCluster: 'lsrc-xxxxx'
    }
};

// --- 4. Instantiate and Use Clients ---
const producer = new Kafka.Producer(producerConfig);
const schemaRegistry = new SchemaRegistryClient(srConfig);

// Example: Get all subjects from Schema Registry
async function getSubjects() {
    try {
        const subjects = await schemaRegistry.getAllSubjects();
        console.log('Subjects:', subjects);
        return subjects;
    } catch (error) {
        console.error('Error fetching subjects:', error);
        throw error;
    }
}

// Example: Produce a message
async function produceMessage(topic, message) {
    try {
        producer.produce(topic, null, message, null, Date.now());
        producer.flush(10000, () => {
            console.log('Message sent successfully!');
        });
    } catch (error) {
        console.error('Error producing message:', error);
        throw error;
    }
}

Troubleshoot client issues

Common issues and solutions

Authentication failures (401/403)

Problem: OAuth authentication fails with 401 or 403 errors.

Solutions: * Verify all OAuth parameters are correct (client ID, secret, endpoint URL) * Ensure the identity pool ID and cluster IDs are valid * Check that the OAuth provider is properly configured in Confluent Cloud * Verify the scope includes necessary permissions

Content-type header issues

Problem: Authentication fails when manually setting Content-Type headers.

Solution: Remove any manual Content-Type header settings. The clients handle these automatically for OAuth requests.

Cluster ID issues

Problem: Using incorrect cluster IDs.

Solution: * Use the correct Kafka cluster ID (lkc-xxxxx) for Kafka clients * Use the correct Schema Registry cluster ID (lsrc-xxxxx) for Schema Registry clients * Find these IDs in the Confluent Cloud Console

Debug configuration

Enable debug logging to troubleshoot OAuth issues:

// For Kafka client
const debugConfig = {
    ...producerConfig,
    'debug': 'security,protocol,broker'
};

// For Schema Registry client
const srDebugConfig = {
    ...srConfig,
    debug: true
};

Test your configuration

  1. Test Kafka client:

    // Test producer
    const testProducer = new Kafka.Producer(producerConfig);
    
    testProducer.produce('test-topic', null, 'test-message', null, Date.now());
    testProducer.flush(10000, () => {
        console.log('Message sent successfully - OAuth is working!');
        testProducer.close();
    });
    
  2. Test Schema Registry client:

    // Test Schema Registry
    async function testSchemaRegistry() {
        try {
            const subjects = await schemaRegistry.getAllSubjects();
            console.log('Schema Registry connection successful!');
            console.log('Subjects:', subjects);
        } catch (error) {
            console.error('Schema Registry test failed:', error);
        }
    }
    

Best practices

  • Use environment variables for sensitive configuration like client secrets
  • Implement proper error handling for authentication failures
  • Monitor token expiration and implement retry logic
  • Use the correct cluster IDs for both Kafka and Schema Registry
  • Avoid manual header manipulation when using OAuth authentication
  • Test authentication in a development environment before production