Configure Go Clients for OAuth/OIDC on Confluent Cloud
Prerequisites
Before you begin, ensure you have the following:
Language and tooling
Go 1.18+
confluent-kafka-go: v2.0.0 or later
Confluent Platform: 7.2.1 or later; 7.1.3 or later
Confluent Cloud environment
OAuth setup in Confluent Cloud - Configure OAuth authentication
Credentials and identifiers
Client ID - Your application’s identifier (like a username)
Client secret - Your application’s password for OAuth
Token endpoint URL - Where to request access tokens
Scopes - Permissions your application needs (for example,
kafka:read kafka:write)Cluster ID (
lkc-xxxxx) - Your Kafka cluster identifierIdentity pool ID (
pool-xxxxx) - If using identity pools
Client library
Install the latest version with OAuth support:
go get github.com/confluentinc/confluent-kafka-go/v2/kafka
Configure Kafka Go clients
Go Kafka clients can authenticate to Confluent Cloud clusters using the OAuth 2.0 protocol. The Go client uses the confluent-kafka-go library which is based on librdkafka.
Configuration approach
Required parameters
Parameter | Description | Required Value |
|---|---|---|
| Sets the authentication method for OAuth. | OAUTHBEARER |
| OAuth token endpoint URL from your identity provider. | (Provided by IdP) |
| OAuth client ID from your identity provider. | (Provided by IdP) |
| OAuth client secret from your identity provider. | (Provided by IdP) |
| OAuth scopes for token request (optional). | (Optional) |
| Logical cluster ID for Confluent Cloud. | lkc-xxxxx |
| Identity pool ID for Confluent Cloud (optional). | (Optional) |
| Connection timeout for token requests (optional). | 10000 (default) |
| Read timeout for token requests (optional). | 10000 (default) |
| Go function for OAuth token retrieval. | Your callback function |
OAuth callback function
The Go client requires an OAuth callback function with the following signature:
func oauthCallback(oauthConfig string) (string, error) {
// Implementation here
return "", nil
}
The callback function receives a JSON string containing the OAuth configuration parameters from librdkafka. This includes all the sasl.oauth.* parameters that were set in the client configuration. The function should parse this JSON, make the OAuth token request, and return the access token as a string.
Configuration example
package main
import (
"bytes"
"encoding/json"
"fmt"
"io"
"net/http"
"net/url"
"strings"
"time"
"github.com/confluentinc/confluent-kafka-go/kafka"
)
type OAuthConfig struct {
TokenEndpoint string `json:"token_endpoint"`
ClientID string `json:"client_id"`
ClientSecret string `json:"client_secret"`
Scope string `json:"scope"`
}
type TokenResponse struct {
AccessToken string `json:"access_token"`
TokenType string `json:"token_type"`
ExpiresIn int `json:"expires_in"`
}
func oauthCallback(oauthConfig string) (string, error) {
// Parse OAuth configuration from JSON string
var config OAuthConfig
if err := json.Unmarshal([]byte(oauthConfig), &config); err != nil {
return "", fmt.Errorf("failed to parse OAuth config: %v", err)
}
// Prepare the token request
data := url.Values{}
data.Set("grant_type", "client_credentials")
data.Set("client_id", config.ClientID)
data.Set("client_secret", config.ClientSecret)
if config.Scope != "" {
data.Set("scope", config.Scope)
}
// Create HTTP request
req, err := http.NewRequest("POST", config.TokenEndpoint, strings.NewReader(data.Encode()))
if err != nil {
return "", fmt.Errorf("failed to create request: %v", err)
}
req.Header.Set("Content-Type", "application/x-www-form-urlencoded")
// Send request with timeout
client := &http.Client{
Timeout: 10 * time.Second,
}
resp, err := client.Do(req)
if err != nil {
return "", fmt.Errorf("failed to send request: %v", err)
}
defer resp.Body.Close()
// Read the body ONCE
body, err := io.ReadAll(resp.Body)
if err != nil {
return "", fmt.Errorf("failed to read response body: %v", err)
}
if resp.StatusCode != http.StatusOK {
// Use the body we already read for the error message
return "", fmt.Errorf("OAuth request failed with status %d: %s", resp.StatusCode, string(body))
}
// Parse the body we already read
var tokenResp TokenResponse
if err := json.Unmarshal(body, &tokenResp); err != nil {
return "", fmt.Errorf("failed to parse token response: %v", err)
}
return tokenResp.AccessToken, nil
}
// Enhanced OAuth callback with retry logic
func oauthCallbackWithRetry(oauthConfig string) (string, error) {
maxRetries := 3
baseDelay := time.Second
for attempt := 1; attempt <= maxRetries; attempt++ {
token, err := oauthCallback(oauthConfig)
if err == nil {
return token, nil
}
fmt.Printf("OAuth token request failed (attempt %d/%d): %v\n", attempt, maxRetries, err)
if attempt < maxRetries {
delay := time.Duration(float64(baseDelay) * float64(1<<(attempt-1))) // Exponential backoff
time.Sleep(delay)
}
}
return "", fmt.Errorf("failed to obtain OAuth token after %d attempts", maxRetries)
}
func main() {
// Producer configuration with integrated retry logic
producerConfig := &kafka.ConfigMap{
"bootstrap.servers": "your-cluster-endpoint:9092",
"security.protocol": "SASL_SSL",
"sasl.mechanism": "OAUTHBEARER",
"sasl.oauth.token.endpoint.uri": "https://your-oauth-provider.com/oauth2/token",
"sasl.oauth.client.id": "your-client-id",
"sasl.oauth.client.secret": "your-client-secret",
"sasl.oauth.scope": "kafka:read kafka:write",
"sasl.oauth.logical.cluster": "lkc-xxxxx",
"sasl.oauth.identity.pool.id": "pool-xxxxx",
"sasl.oauth.connect.timeout.ms": 10000,
"sasl.oauth.read.timeout.ms": 10000,
"oauth_cb": oauthCallbackWithRetry, // Using enhanced callback with retry
}
// Create producer
producer, err := kafka.NewProducer(producerConfig)
if err != nil {
panic(fmt.Sprintf("Failed to create producer: %v", err))
}
defer producer.Close()
// Consumer configuration with integrated retry logic
consumerConfig := &kafka.ConfigMap{
"bootstrap.servers": "your-cluster-endpoint:9092",
"security.protocol": "SASL_SSL",
"sasl.mechanism": "OAUTHBEARER",
"sasl.oauth.token.endpoint.uri": "https://your-oauth-provider.com/oauth2/token",
"sasl.oauth.client.id": "your-client-id",
"sasl.oauth.client.secret": "your-client-secret",
"sasl.oauth.scope": "kafka:read kafka:write",
"sasl.oauth.logical.cluster": "lkc-xxxxx",
"sasl.oauth.identity.pool.id": "pool-xxxxx",
"oauth_cb": oauthCallbackWithRetry, // Using enhanced callback with retry
"group.id": "your-consumer-group",
"auto.offset.reset": "earliest",
}
// Create consumer
consumer, err := kafka.NewConsumer(consumerConfig)
if err != nil {
panic(fmt.Sprintf("Failed to create consumer: %v", err))
}
defer consumer.Close()
}
Test your configuration
Enable debug logging to troubleshoot OAuth issues:
// Add to your configuration config := &kafka.ConfigMap{ // ... your OAuth config "debug": "security,protocol,broker", }
Test with a simple producer:
// Test producer producer, err := kafka.NewProducer(config) if err != nil { log.Fatal("Failed to create producer: ", err) } defer producer.Close() topic := "test-topic" message := "test-message" producer.ProduceChannel() <- &kafka.Message{ TopicPartition: kafka.TopicPartition{Topic: &topic, Partition: kafka.PartitionAny}, Value: []byte(message), } // Wait for delivery for e := range producer.Events() { switch ev := e.(type) { case *kafka.Message: if ev.TopicPartition.Error != nil { fmt.Printf("Delivery failed: %v\n", ev.TopicPartition.Error) } else { fmt.Printf("Message delivered to %s [%d] - OAuth is working!\n", *ev.TopicPartition.Topic, ev.TopicPartition.Partition) } return } }
Check for common errors: - “SASL authentication failed” - Check your OAuth credentials and endpoint - “Invalid token” - Verify your callback function is returning a valid token - “Connection timeout” - Check your bootstrap servers and network connectivity
Verify token refresh - The client should automatically refresh tokens when they expire
Custom OAuth implementations
For advanced use cases, implement custom token providers:
import (
"crypto/rsa"
"crypto/x509"
"encoding/pem"
"fmt"
"os"
"time"
"github.com/golang-jwt/jwt/v4"
)
type JWTCustomTokenProvider struct {
privateKey *rsa.PrivateKey
clientID string
tokenEndpoint string
}
func NewJWTCustomTokenProvider(privateKeyPath, clientID, tokenEndpoint string) (*JWTCustomTokenProvider, error) {
// Load private key
keyBytes, err := os.ReadFile(privateKeyPath)
if err != nil {
return nil, fmt.Errorf("failed to read private key: %v", err)
}
block, _ := pem.Decode(keyBytes)
if block == nil {
return nil, fmt.Errorf("failed to decode PEM block")
}
privateKey, err := x509.ParsePKCS1PrivateKey(block.Bytes)
if err != nil {
return nil, fmt.Errorf("failed to parse private key: %v", err)
}
return &JWTCustomTokenProvider{
privateKey: privateKey,
clientID: clientID,
tokenEndpoint: tokenEndpoint,
}, nil
}
func (p *JWTCustomTokenProvider) CreateJwtAssertion(audience, issuer string) (string, error) {
now := time.Now()
claims := jwt.MapClaims{
"iss": issuer,
"sub": p.clientID,
"aud": audience,
"iat": now.Unix(),
"exp": now.Add(time.Hour).Unix(),
"jti": fmt.Sprintf("jwt-%d", now.Unix()),
}
token := jwt.NewWithClaims(jwt.SigningMethodRS256, claims)
return token.SignedString(p.privateKey)
}
func (p *JWTCustomTokenProvider) GetToken(audience, issuer string) (string, error) {
assertion, err := p.CreateJwtAssertion(audience, issuer)
if err != nil {
return "", fmt.Errorf("failed to create JWT assertion: %v", err)
}
data := url.Values{}
data.Set("grant_type", "urn:ietf:params:oauth:grant-type:jwt-bearer")
data.Set("assertion", assertion)
req, err := http.NewRequest("POST", p.tokenEndpoint, strings.NewReader(data.Encode()))
if err != nil {
return "", fmt.Errorf("failed to create request: %v", err)
}
req.Header.Set("Content-Type", "application/x-www-form-urlencoded")
client := &http.Client{Timeout: 10 * time.Second}
resp, err := client.Do(req)
if err != nil {
return "", fmt.Errorf("failed to send request: %v", err)
}
defer resp.Body.Close()
// Read the body ONCE
body, err := io.ReadAll(resp.Body)
if err != nil {
return "", fmt.Errorf("failed to read response body: %v", err)
}
if resp.StatusCode != http.StatusOK {
return "", fmt.Errorf("token request failed with status %d: %s", resp.StatusCode, string(body))
}
var tokenResp TokenResponse
if err := json.Unmarshal(body, &tokenResp); err != nil {
return "", fmt.Errorf("failed to parse token response: %v", err)
}
return tokenResp.AccessToken, nil
}
// NewCustomOAuthCallback creates and returns a Kafka OAuth callback function
// that uses a JWT provider for token retrieval.
func NewCustomOAuthCallback(privateKeyPath, clientID, tokenEndpoint string) (func(string) (string, error), error) {
jwtProvider, err := NewJWTCustomTokenProvider(
privateKeyPath,
clientID,
tokenEndpoint,
)
if err != nil {
return nil, fmt.Errorf("failed to create JWT provider: %v", err)
}
// Return a closure that has access to the jwtProvider
return func(oauthConfig string) (string, error) {
return jwtProvider.GetToken(
"https://your-oauth-provider.com", // audience
"your-client-id", // issuer
)
}, nil
}
// --- In your main application setup ---
// 1. Create the callback function once
customCallback, err := NewCustomOAuthCallback(
"/path/to/private_key.pem",
"your-client-id",
"https://your-oauth-provider.com/oauth2/token",
)
if err != nil {
panic(err)
}
// 2. Use it in the Kafka configuration
producerConfig := &kafka.ConfigMap{
// ... other properties
"oauth_cb": customCallback,
}
Google OIDC integration
For Google OIDC integration with Go clients:
import (
"context"
"fmt"
"time"
"golang.org/x/oauth2/google"
"google.golang.org/api/option"
)
func googleOAuthCallback(oauthConfig string) (string, error) {
// Load service account credentials
credentials, err := google.CredentialsFromFile(
"path/to/service-account-key.json",
"https://www.googleapis.com/auth/cloud-platform",
)
if err != nil {
return "", fmt.Errorf("failed to load credentials: %v", err)
}
// Get token
token, err := credentials.TokenSource.Token()
if err != nil {
return "", fmt.Errorf("failed to get token: %v", err)
}
return token.AccessToken, nil
}
// Google OIDC configuration
googleConfig := &kafka.ConfigMap{
"bootstrap.servers": "your-cluster-endpoint:9092",
"security.protocol": "SASL_SSL",
"sasl.mechanism": "OAUTHBEARER",
"sasl.oauth.logical.cluster": "lkc-xxxxx",
"sasl.oauth.identity.pool.id": "pool-xxxxx",
"oauth_cb": googleOAuthCallback,
}
Note
The googleOAuthCallback function loads credentials directly from the service account JSON file and ignores the oauthConfig string passed by the client. For this reason, properties like sasl.oauth.token.endpoint.uri are not needed in the ConfigMap when using this callback.
Configure Schema Registry Go clients
The Schema Registry Go client authenticates using a built-in OAuth mechanism. You only need to provide the correct configuration parameters, and the client handles the token request automatically. It does not use the oauth_cb callback.
Required parameters
The following parameters must be included in the Schema Registry client configuration:
Parameter | Description |
|---|---|
| Set to |
| OAuth token endpoint URL from your identity provider |
| OAuth client ID from your identity provider |
| OAuth client secret from your identity provider |
| OAuth scopes for token request (optional) |
| Logical cluster ID for Schema Registry (lsrc-xxxxx) |
| Identity pool ID for Confluent Cloud (optional) |
Configuration example
import (
"github.com/confluentinc/confluent-kafka-go/schemaregistry"
)
// Schema Registry configuration with OAuth
schemaRegistryConfig := map[string]string{
"url": "https://<your-schema-registry-endpoint>",
"bearer.auth.credentials.source": "OAUTHBEARER",
"bearer.auth.issuer.endpoint.url": "https://<your-idp.com>/oauth2/token",
"bearer.auth.client.id": "<your-client-id>",
"bearer.auth.client.secret": "<your-client-secret>",
"bearer.auth.scope": "schema:read",
"bearer.auth.logical.cluster": "<lsrc-xxxxx>",
"bearer.auth.identity.pool.id": "<pool-yyyyy>",
}
// Create Schema Registry client
schemaRegistryClient, err := schemaregistry.NewClient(schemaRegistryConfig)
if err != nil {
panic(fmt.Sprintf("Failed to create Schema Registry client: %v", err))
}
Inherit OAuth configuration from Kafka client
You can also inherit the OAuth configuration from your Kafka client:
// Schema Registry configuration inheriting from Kafka client
schemaRegistryConfig := map[string]string{
"url": "https://<your-schema-registry-endpoint>",
"bearer.auth.credentials.source": "SASL_OAUTHBEARER_INHERIT",
"bearer.auth.logical.cluster": "<lsrc-xxxxx>",
"bearer.auth.identity.pool.id": "<pool-yyyyy>",
}
Custom token provider
For custom OAuth implementations, use the following configuration:
schemaRegistryConfig := map[string]string{
"url": "https://<your-schema-registry-endpoint>",
"bearer.auth.credentials.source": "CUSTOM",
"bearer.auth.custom.provider.class": "<fully-qualified-class-name>",
"bearer.auth.logical.cluster": "<lsrc-resource-id>",
"bearer.auth.identity.pool.id": "<identity-pool-id>",
}
Troubleshoot Go OAuth clients
Common issues and solutions for Go OAuth clients:
Authentication failures
Verify client ID and secret are correct
Check token endpoint URL is accessible
Ensure logical cluster ID is valid
Validate identity pool ID if used
Network issues
Confirm network connectivity to OAuth provider
Check firewall rules allow OAuth traffic
Verify SSL certificate validation
Configuration issues
Ensure all required parameters are provided
Validate OAuth callback function signature
Check timeout values are reasonable
Debug logging
Enable librdkafka’s internal debug logs for troubleshooting OAuth issues by setting the debug property in your ConfigMap. This provides far more detail than standard Go logging for client-broker communication.
producerConfig := &kafka.ConfigMap{
// ... other properties
"debug": "security,protocol,broker",
}
Common Go-specific issues
Import errors: Ensure
confluent-kafka-gois properly installedCallback function: Verify the OAuth callback function signature matches requirements
Token refresh: Implement proper token refresh logic with error handling
Thread safety: Ensure OAuth callback is thread-safe for multi-threaded applications
Performance considerations
Implement token caching to avoid repeated OAuth requests
Use connection pooling for HTTP requests to OAuth provider
Consider implementing token refresh before expiration