Configure JavaScript Clients for OAuth/OIDC on Confluent Cloud¶
Prerequisites¶
Before you begin, ensure you have the following:
Language and tooling¶
- Node.js 16 or later
- confluent-kafka-javascript: 2.0.0 or later
- @confluentinc/schemaregistry: Latest version
- Confluent Platform: 7.2.1 or later; 7.1.3 or later
Confluent Cloud environment¶
- OAuth setup in Confluent Cloud - Configure OAuth authentication.
Credentials and identifiers¶
- Client ID - Your application’s identifier
- Client secret - Your application’s password for OAuth
- Token endpoint URL - Where to request access tokens
- Scopes - Permissions your application needs (for example,
kafka:read kafka:write
for Kafka,schema-registry
for Schema Registry) - Kafka cluster ID (
lkc-xxxxx
) - Your Kafka cluster identifier. - Schema Registry cluster ID (
lsrc-xxxxx
) - Your Schema Registry cluster identifier. - Identity pool ID (
pool-xxxxx
) - If using identity pools.
Client library¶
Install required packages:
npm install confluent-kafka-javascript @confluentinc/schemaregistry
Configure Kafka clients¶
JavaScript Kafka clients use a callback function approach for OAuth token retrieval.
OAuth callback function¶
The JavaScript client requires an OAuth callback function that handles token requests:
async function oauthCallback(oauthConfig) {
/**
* OAuth callback function for JavaScript Kafka client
*
* @param {Object} oauthConfig - OAuth configuration object
* @returns {string} Access token
*/
const tokenEndpoint = oauthConfig['sasl.oauth.token.endpoint.uri'];
const clientId = oauthConfig['sasl.oauth.client.id'];
const clientSecret = oauthConfig['sasl.oauth.client.secret'];
const scope = oauthConfig['sasl.oauth.scope'] || '';
// Prepare token request
const data = new URLSearchParams({
grant_type: 'client_credentials',
client_id: clientId,
client_secret: clientSecret
});
if (scope) {
data.append('scope', scope);
}
// Make token request
const response = await fetch(tokenEndpoint, {
method: 'POST',
headers: {
'Content-Type': 'application/x-www-form-urlencoded'
},
body: data
});
if (!response.ok) {
throw new Error(`HTTP error! status: ${response.status}`);
}
const tokenData = await response.json();
return tokenData.access_token;
}
Kafka producer configuration¶
const { Kafka } = require('confluent-kafka-javascript');
// OAuth Configuration
const oauthConfig = {
'sasl.mechanism': 'OAUTHBEARER',
'sasl.oauth.token.endpoint.uri': 'https://your-oauth-provider.com/oauth2/token',
'sasl.oauth.client.id': 'your-client-id',
'sasl.oauth.client.secret': 'your-client-secret',
'sasl.oauth.scope': 'kafka:read kafka:write',
'sasl.oauth.logical.cluster': 'lkc-xxxxx',
'sasl.oauth.identity.pool.id': 'pool-xxxxx',
'oauth_cb': oauthCallback
};
// Kafka Producer Configuration
const producerConfig = {
'bootstrap.servers': 'pkc-xxxxx.us-west-2.aws.confluent.cloud:9092',
'security.protocol': 'SASL_SSL',
...oauthConfig
};
// Create producer
const producer = new Kafka.Producer(producerConfig);
Kafka consumer configuration¶
// Kafka Consumer Configuration
const consumerConfig = {
'bootstrap.servers': 'pkc-xxxxx.us-west-2.aws.confluent.cloud:9092',
'group.id': 'your-consumer-group',
'security.protocol': 'SASL_SSL',
...oauthConfig
};
// Create consumer
const consumer = new Kafka.Consumer(consumerConfig);
Configure Schema Registry clients¶
The Schema Registry client uses a different configuration approach with bearer authentication.
Modern Schema Registry client configuration¶
Use the modern @confluentinc/schemaregistry package for better TypeScript support:
const { SchemaRegistryClient } = require('@confluentinc/schemaregistry');
// Schema Registry Configuration
const schemaRegistryConfig = {
baseURLs: ['https://psrc-xxxxx.us-west-2.aws.confluent.cloud'],
bearerAuthCredentials: {
credentialsSource: 'OAUTHBEARER',
issuerEndpointUrl: 'https://your-oauth-provider.com/oauth2/token',
clientId: 'your-client-id',
clientSecret: 'your-client-secret',
scope: 'schema-registry',
identityPoolId: 'pool-xxxxx',
logicalCluster: 'lsrc-xxxxx' // Your Schema Registry cluster ID
}
};
// Create Schema Registry client
const schemaRegistry = new SchemaRegistryClient(schemaRegistryConfig);
Complete working example¶
Here’s a complete example showing how to use both Kafka and Schema Registry clients together:
const { Kafka } = require('confluent-kafka-javascript');
const { SchemaRegistryClient } = require('@confluentinc/schemaregistry');
// --- 1. Define the Kafka OAuth Callback ---
async function oauthCallback(oauthConfig) {
const tokenEndpoint = oauthConfig['sasl.oauth.token.endpoint.uri'];
const clientId = oauthConfig['sasl.oauth.client.id'];
const clientSecret = oauthConfig['sasl.oauth.client.secret'];
const scope = oauthConfig['sasl.oauth.scope'] || '';
const data = new URLSearchParams({
grant_type: 'client_credentials',
client_id: clientId,
client_secret: clientSecret
});
if (scope) {
data.append('scope', scope);
}
const response = await fetch(tokenEndpoint, {
method: 'POST',
headers: {
'Content-Type': 'application/x-www-form-urlencoded'
},
body: data
});
if (!response.ok) {
throw new Error(`HTTP error! status: ${response.status}`);
}
const tokenData = await response.json();
return tokenData.access_token;
}
// --- 2. Configure the Kafka Client ---
const producerConfig = {
'bootstrap.servers': 'pkc-xxxxx.us-west-2.aws.confluent.cloud:9092',
'security.protocol': 'SASL_SSL',
'sasl.mechanism': 'OAUTHBEARER',
'sasl.oauth.token.endpoint.uri': 'https://your-oauth-provider.com/oauth2/token',
'sasl.oauth.client.id': 'your-client-id',
'sasl.oauth.client.secret': 'your-client-secret',
'sasl.oauth.scope': 'kafka:read kafka:write',
'sasl.oauth.logical.cluster': 'lkc-xxxxx',
'sasl.oauth.identity.pool.id': 'pool-xxxxx',
'oauth_cb': oauthCallback
};
// --- 3. Configure the Schema Registry Client ---
const srConfig = {
baseURLs: ['https://psrc-xxxxx.us-west-2.aws.confluent.cloud'],
bearerAuthCredentials: {
credentialsSource: 'OAUTHBEARER',
issuerEndpointUrl: 'https://your-oauth-provider.com/oauth2/token',
clientId: 'your-client-id',
clientSecret: 'your-client-secret',
scope: 'schema-registry',
identityPoolId: 'pool-xxxxx',
logicalCluster: 'lsrc-xxxxx'
}
};
// --- 4. Instantiate and Use Clients ---
const producer = new Kafka.Producer(producerConfig);
const schemaRegistry = new SchemaRegistryClient(srConfig);
// Example: Get all subjects from Schema Registry
async function getSubjects() {
try {
const subjects = await schemaRegistry.getAllSubjects();
console.log('Subjects:', subjects);
return subjects;
} catch (error) {
console.error('Error fetching subjects:', error);
throw error;
}
}
// Example: Produce a message
async function produceMessage(topic, message) {
try {
producer.produce(topic, null, message, null, Date.now());
producer.flush(10000, () => {
console.log('Message sent successfully!');
});
} catch (error) {
console.error('Error producing message:', error);
throw error;
}
}
Troubleshoot client issues¶
Common issues and solutions¶
Authentication failures (401/403)¶
Problem: OAuth authentication fails with 401 or 403 errors.
Solutions: * Verify all OAuth parameters are correct (client ID, secret, endpoint URL) * Ensure the identity pool ID and cluster IDs are valid * Check that the OAuth provider is properly configured in Confluent Cloud * Verify the scope includes necessary permissions
Content-type header issues¶
Problem: Authentication fails when manually setting Content-Type headers.
Solution: Remove any manual Content-Type header settings. The clients handle these automatically for OAuth requests.
Cluster ID issues¶
Problem: Using incorrect cluster IDs.
Solution: * Use the correct Kafka cluster ID (lkc-xxxxx) for Kafka clients * Use the correct Schema Registry cluster ID (lsrc-xxxxx) for Schema Registry clients * Find these IDs in the Confluent Cloud Console
Debug configuration¶
Enable debug logging to troubleshoot OAuth issues:
// For Kafka client
const debugConfig = {
...producerConfig,
'debug': 'security,protocol,broker'
};
// For Schema Registry client
const srDebugConfig = {
...srConfig,
debug: true
};
Test your configuration¶
Test Kafka client:
// Test producer const testProducer = new Kafka.Producer(producerConfig); testProducer.produce('test-topic', null, 'test-message', null, Date.now()); testProducer.flush(10000, () => { console.log('Message sent successfully - OAuth is working!'); testProducer.close(); });
Test Schema Registry client:
// Test Schema Registry async function testSchemaRegistry() { try { const subjects = await schemaRegistry.getAllSubjects(); console.log('Schema Registry connection successful!'); console.log('Subjects:', subjects); } catch (error) { console.error('Schema Registry test failed:', error); } }
Best practices¶
- Use environment variables for sensitive configuration like client secrets
- Implement proper error handling for authentication failures
- Monitor token expiration and implement retry logic
- Use the correct cluster IDs for both Kafka and Schema Registry
- Avoid manual header manipulation when using OAuth authentication
- Test authentication in a development environment before production