Use OAuth with ksqlDB on Confluent Platform

The ksqlDB server bridges real-time data streams in Kafka and the analytical power of SQL. This page explains how to configure OAuth in ksqlDB so you can connect to your ksqlDB server using OAuth. You need to configure both the ksqlDB server and the ksqlDB client to use OAuth. Once configured, the ksqlDB server can accept an identity provider (IdP) OAuth token to authenticate your clients.

ksqlDB server-side configuration

  1. Edit the ksql-server.properties file on the ksqlDB server.

  2. Specify an authentication plugin to handle the authentication.

    ksql.authentication.plugin.class=io.confluent.ksql.security.VertxAuthenticationHandlerPlugin
    
  3. Set the following OAuth properties in the ksql-server.properties file to enable OAuth.

    oauthbearer.jwks.endpoint.url=<url>
    oauthbearer.expected.issuer=<issuer>
    oauthbearer.expected.audience=<audience>
    oauthbearer.sub.claim.name=<sub>
    oauthbearer.groups.claim.name=<groups>
    
  4. Configure the connection to the Metadata Service (MDS).

    confluent.metadata.bootstrap.server.urls=http://<mds_url>:8090
    confluent.metadata.http.auth.credentials.provider=OAUTHBEARER
    confluent.metadata.oauthbearer.token.endpoint.url=<token-endpoint-url>
    confluent.metadata.oauthbearer.login.client.id=<client-id>
    confluent.metadata.oauthbearer.login.client.secret=<client-secret>
    
  5. Save and close the ksql-server.properties file.

  6. Restart the ksqlDB server to apply the changes.

    $ confluent local stop ksql-server
    $ confluent local start ksql-server
    

ksqlDB migrations tool configuration

In this section, you configure the client-side connections.

  1. Open the ksql-migrations.properties file to configure the ksqlDB migrations tool.

  2. Configure the ksqlDB migrations tool.

    bearer.auth.issuer.endpoint.url=<url>
    bearer.auth.client.id=<client-id>
    bearer.auth.client.secret=<client client-secret>
    bearer.auth.scope=<scope>
    bearer.auth.scope.claim.name=<name>
    bearer.auth.sub.claim.name=<sub>
    bearer.auth.cache.expiry.buffer.seconds=300
    
  3. Save and close the ksql-migrations.properties file.

Confluent Control Center configuration

  1. Edit the Control Center control-center.properties file.

    confluent.metadata.oauthbearer.token.endpoint.url=<idp-token-endpoint=url>
    
    # When RBAC is enabled, metadata.oauthbearer configs will suffice
    confluent.metadata.oauthbearer.login.client.id=<client-id>
    confluent.metadata.oauthbearer.login.client.secret=<client-secret>
    confluent.metadata.oauthbearer.login.oauth.scope=<scope>
    
    # When RBAC is not enabled,following configs need to be added
    confluent.controlcenter.ksql.<ksql.cluster-name>.oauthbearer.login.client.id=<client-id>
    confluent.controlcenter.ksql.<ksql.cluster-name>.oauthbearer.login.client.secret=<client-secret>
    confluent.controlcenter.ksql.<ksql.cluster-name>.oauthbearer.login.oauth.scope=<scope>
    
  2. Save and close the control-center.properties file.

  3. Restart Control Center for your changes to take effect.

ksql Java client configuration

Create an IdpConfig object on the kSQL Java client. This object retrieves a token from the IdP.

IdpConfig idpConfig = new IdpConfig.Builder()
  .withTokenEndpointUrl("<idp-token-endpoint-url>")
  .withClientId("<client-id>")
  .withClientSecret("<client-secret>")
  // scope is optional
  .withScope("<scope>")
  // scope claim name is optional, default is "scope"
  .withScopeClaimName("<scope>")
  // sub claim name is optional, default is "sub"
  .withSubClaimName("<sub>")
  // the buffer time before token expiry by which the token should be renewed.
  // optional, default is "300"
  .withCacheExpiryBufferSeconds((short) 300)
  .build();