KSQL and role-based access control (RBAC)

Starting in Confluent Platform 5.3, KSQL supports role-based access control (RBAC) to cluster resources.

Note

KSQL supports RBAC to provide authorization to KSQL clusters when running in interactive mode. RBAC for users isn't used in KSQL clusters that are deployed in headless mode.

Use the Confluent CLI to create roles for users. You need the cluster ID of the Kafka cluster, the service ID of the KSQL cluster, and the cluster ID of the Schema Registry cluster, if your topics have Avro schemas. For more information, see Discover Identifiers for Clusters.

Tip

To get started, try the RBAC automated demo that showcases the RBAC functionality in Confluent Platform.

KSQL service principal

KSQL applications run with the roles granted to a privileged KSQL user service principal. By default, the KSQL user is named "ksql". The roles granted to the KSQL user by default are required to run KSQL and are distinct from the roles granted to an interactive user who wants to connect and contribute to KSQL.

When you grant roles to an interactive user, you must also grant the same roles to the KSQL user service principal. KSQL applications execute user commands as the KSQL service principal, in the background and on other servers in the KSQL cluster, and some KSQL statements start long-running persistent queries. KSQL checks that the interactive user has the required permissions to access any resources required, but the query itself runs in the security context of the KSQL service principal. For this reason, it's important that both the interactive user and the KSQL service principal have the correct permissions to all resources required by any KSQL statements.

The following example commands show how to grant a user and the KSQL user read-only access to a topic. To grant these roles, you must be logged in to the Confluent CLI with sufficient permissions, for example as UserAdmin or ResourceOwner.

# Grant read-only access for a user to a topic.
confluent iam rolebinding create \
  --principal User:<user-name> \
  --role DeveloperRead \
  --resource Topic:<topic-name> \
  --kafka-cluster-id <kafka-id>

# Grant read-only access for the ksql user to a topic.
confluent iam rolebinding create \
  --principal User:ksql \
  --role DeveloperRead \
  --resource Topic:<topic-name> \
  --kafka-cluster-id <kafka-id>

Configure KSQL Server for RBAC

Assign the following configuration settings in the KSQL Server properties file to enable KSQL in an RBAC environment.

To access the Confluent security service, you must generate a public/private key pair. Specify the path to the public key file by assigning the public.key.path setting. For more information, see Configure Metadata Service (MDS).

Tip

You can store passwords and other configuration data securely by using the confluent secret commands. For more information see Secrets.

Append the following settings to the KSQL Server configuration file, which by default is named ${CONFLUENT_HOME}/etc/ksql/ksql-server.properties.

#------ External service configuration -------

# The set of Kafka brokers to bootstrap Kafka cluster information from
bootstrap.servers=<kafka-hostname>:9092

# Schema Registry authentication
ksql.schema.registry.url=http://<sr-hostname>:8081
# Credentials for accessing Schema Registry
ksql.schema.registry.basic.auth.user.info=<ksql-user>:<ksql-user-password>

############ KSQL security settings for Confluent RBAC ######################

# Set the KSQL cluster ID to work on specific roles and permissions for this KSQL cluster scope.
ksql.service.id=<ksql-cluster-id>

# Enable the KSQL Confluent security plugin.
ksql.security.extension.class=io.confluent.ksql.security.KsqlConfluentSecurityExtension

# Assign the metadata URL and access credentials, which are shared by authentication security handlers and the KSQL security extension.
# May be the same as the Kafka cluster hostname.
confluent.metadata.bootstrap.server.urls=http://<metadata-server-hostname>:8090
confluent.metadata.http.auth.credentials.provider=BASIC
# Credentials for accessing the metadata server
confluent.metadata.basic.auth.user.info=<ksql-user>:<ksql-user-password>

# Enable KSQL Basic+Bearer authentication.
rest.servlet.initializor.classes=io.confluent.common.security.jetty.initializer.InstallBearerOrBasicSecurityHandler
websocket.servlet.initializor.classes=io.confluent.common.security.jetty.initializer.InstallBearerOrBasicSecurityHandler
public.key.path=<path/to/publickey.pem>

# Assign credentials for Kafka access.
# These settings may vary, depending on your Kafka cluster authentication settings.
security.protocol=SASL_PLAINTEXT
sasl.mechanism=OAUTHBEARER
sasl.login.callback.handler.class=io.confluent.kafka.clients.plugins.auth.token.TokenUserLoginCallbackHandler
# Credentials for the KSQL user, which has the default user name of "ksql".
# These credentials authorize KSQL Server to access the Kafka cluster.
sasl.jaas.config=org.apache.kafka.common.security.oauthbearer.OAuthBearerLoginModule required \
    metadataServerUrls="http://<metadata-server-hostname>" username="<ksql-user>" password="<ksql-user-password>";

Save the file and restart KSQL Server.

RBAC for KSQL depends on the Confluent Platform Metadata Service (MDS) and the Confluent Server Authorizer. The confluent.metadata settings configure the Metadata Service. The ksql.security.extension.class setting configures KSQL for the Confluent Server Authorizer. For more information, see Confluent Server Authorizer.

Use the KSQL service principal credentials for the following settings.

  • sasl.jaas.config for authorizing to the Kafka cluster with Confluent Server Authorizer
  • confluent.metadata.basic.auth.user.info for authorizing to MDS
  • ksql.schema.registry.basic.auth.user.info for authorizing to Schema Registry

Assign roles for KSQL access

The following section defines the minimal set of roles and resources required for a user to connect to KSQL and execute KSQL statements, including transient queries.

As a best practice for security, give users the minimum necessary access level to resources. For example, prefer granting an interactive user the DeveloperRead role over the ResourceOwner role for reading from a topic.

Important

Users require permissions in Kafka to use any KSQL statements that attempt to read, write, list, and create topics. Additional access to resources on the Kafka cluster and Schema Registry must be granted explicitly. For more information, see Grant a user access to resources.

To grant these roles, you must be logged in to the Confluent CLI as a UserAdmin, ResourceOwner, or other role with sufficient permissions.

Cluster identifiers

To create role bindings, you need the cluster identifiers for the components in your Confluent Platform deployment. For more information, see Discover Identifiers for Clusters.

The following commands use these environment variables for the cluster IDs. Variables for the interactive user name and topic name are also defined.

# Cluster identifiers
export KAFKA_ID=<kafka-cluster-id>
export KSQL_ID=<ksql-cluster-id>
export SR_ID=<schema-registry-cluster-id>

# The name of the interactive user and
# the name of a topic for the user to access.
export USER_NAME=<interactive-user-name>
export TOPIC_NAME=<kafka-topic-name>

Assign roles for starting KSQL Server

To start KSQL Server and connect to a secured Kafka cluster, a user must have the following roles granted by a UserAdmin.

KSQL cluster scope

Run the following commands to assign the ResourceOwner role on the KSQL cluster, which binds ownership of the KSQL cluster's resources to the user. A user who has the ResourceOwner or ClusterAdmin role on a KSQL cluster is a cluster owner.

confluent iam rolebinding create \
  --principal User:<cluster-owner-name> \
  --role ResourceOwner \
  --kafka-cluster-id $KAFKA_ID \
  --ksql-cluster-id $KSQL_ID \
  --resource KsqlCluster:ksql-cluster

confluent iam rolebinding create \
  --principal User:<cluster-owner-name> \
  --role SecurityAdmin \
  --kafka-cluster-id $KAFKA_CLUSTER_ID \
  --ksql-cluster-id $KSQL_SERVICE_ID

Run the following commands to assign the ResourceOwner role to the KSQL user service principal.

confluent iam rolebinding create \
  --principal User:ksql \
  --role ResourceOwner \
  --kafka-cluster-id $KAFKA_ID \
  --ksql-cluster-id $KSQL_ID \
  --resource KsqlCluster:ksql-cluster

Kafka cluster scope

Grant the ResourceOwner role to the KSQL service principal on the following internal KSQL resources. By default, these roles are granted to the KSQL user named "ksql".

  • KSQL command topic: _confluent-ksql-${KSQL_ID}_command_topic
  • KSQL processing log topic: ${KSQL_ID}ksql_processing_log
  • Consumer groups: Prefixed with _confluent-ksql-${KSQL_ID}

Use the following command to grant full access for the KSQL user to the KSQL command topic.

# Grant full access for the cluster owner to the KSQL command topic.
confluent iam rolebinding create \
  --principal User:ksql \
  --role ResourceOwner \
  --resource Topic:_confluent-ksql-${KSQL_ID}_command_topic \
  --kafka-cluster-id $KAFKA_ID

Use the following command to grant full access for the KSQL user to the KSQL processing log topic.

# Grant full access for the cluster owner to the KSQL processing log topic.
confluent iam rolebinding create \
  --principal User:ksql \
  --role ResourceOwner \
  --resource Topic:${KSQL_ID}ksql_processing_log \
  --kafka-cluster-id $KAFKA_ID

Use the following command to grant full access for the KSQL user to the KSQL consumer groups.

# Grant full access for the cluster owner to the KSQL consumer groups.
confluent iam rolebinding create \
  --principal User:ksql \
  --role ResourceOwner \
  --resource Group:_confluent-ksql-${KSQL_ID} \
  --prefix \
  --kafka-cluster-id $KAFKA_ID

Assign roles for terminating a KSQL cluster

To terminate a KSQL cluster by calling the REST endpoint at /ksql/terminate, a user must have the DeveloperManage role granted by a UserAdmin. For more information, see Terminate a Cluster.

KSQL cluster scope

Use the following command to assign a DeveloperManage role on the KSQL cluster.

confluent iam rolebinding create \
    --principal User:<cluster-owner-name> \
    --role DeveloperManage \
    --kafka-cluster-id $KAFKA_ID \
    --ksql-cluster-id $KSQL_ID \
    --resource KsqlCluster:ksql-cluster

Assign roles for contributing to a KSQL application

To access the KSQL cluster for development, an interactive user must have the following roles.

KSQL cluster scope

Use the following command to assign the DeveloperWrite role on the KSQL cluster.

confluent iam rolebinding create \
    --principal User:$USER_NAME \
    --role DeveloperWrite \
    --kafka-cluster-id $KAFKA_ID \
    --ksql-cluster-id $KSQL_ID \
    --resource KsqlCluster:ksql-cluster

Kafka cluster scope

Use the following command to assign the DeveloperRead role on the internal consumer groups for transient queries.

confluent iam rolebinding create \
  --principal User:$USER_NAME \
  --role DeveloperRead \
  --resource Group:_confluent-ksql-${KSQL_SERVICE_ID} \
  --prefix \
  --kafka-cluster-id $KAFKA_ID

Assign the following role if the interactive user needs access to the KSQL processing log.

# Grant access for an interactive user to the processing log.
confluent iam rolebinding create \
  --principal User:$USER_NAME \
  --role DeveloperRead \
  --resource Topic:${KSQL_SERVICE_ID}ksql_processing_log \
  --kafka-cluster-id $KAFKA_CLUSTER_ID

Grant a user access to resources

In addition to the minimum necessary roles, an interactive user needs roles to access specific resources like topics, consumer groups, and subjects.

Important

When you grant roles to a user, remember to grant the same roles to the KSQL user service principal, which is named "ksql" by default.

Prefixed resource names

Use the prefix option in the Confluent CLI to grant access to all resources with names that start with the specified prefix string. For example, to grant read-only access to all topics that start with <topic-name-prefix->, you could use the following command.

# Grant read-only access for a user to a topic.
confluent iam rolebinding create \
  --principal User:$USER_NAME \
  --role DeveloperRead \
  --resource Topic:<topic-name-prefix-> \
  --prefix \
  --kafka-cluster-id $KAFKA_ID

Roles for accessing topics, streams, and tables

Use the following Confluent CLI commands to give an interactive user the necessary roles for creating streams and table.

SHOW or PRINT a topic
  • DeveloperRead role on the Kafka topic
  • DeveloperRead role on the Schema Registry subject, if the topic has an Avro schema

This role enables an interactive user to display the specified topic by using the SHOW and PRINT statements. Also, the user can CREATE streams and tables from these topics, and the user can execute SELECT statements on their streams and tables.

The KSQL user doesn't need a role on the topic for these statements.

# Grant read-only access for a user to read a topic.
confluent iam rolebinding create \
  --principal User:$USER_NAME \
  --role DeveloperRead \
  --resource Topic:$TOPIC_NAME \
  --kafka-cluster-id $KAFKA_ID
# Grant read-only access for a user to read a subject.
confluent iam rolebinding create \
  --principal User:$USER_NAME \
  --role DeveloperRead \
  --resource Subject:${TOPIC_NAME}-value
  --kafka-cluster-id $KAFKA_ID \
  --schema-registry-cluster-id $SR_ID
SELECT from a stream or table
  • DeveloperRead role on the source topic
  • DeveloperRead role on the _confluent-ksql-${KSQL_ID} consumer group

These roles enable a user to read from a stream or a table by using the SELECT statement. If a SELECT statement contains a JOIN that uses an unauthorized topic, the SELECT fails with an authorization error.

confluent iam rolebinding create \
  --principal User:$USER_NAME \
  --role DeveloperRead \
  --resource Topic:$TOPIC_NAME \
  --kafka-cluster-id $KAFKA_ID
confluent iam rolebinding create \
  --principal User:$USER_NAME \
  --role ResourceOwner \
  --resource Topic:_confluent-ksql-${KSQL_SERVICE_ID}transient \
  --prefix \
  --kafka-cluster-id $KAFKA_CLUSTER_ID
Write to a topic with INSERT
  • DeveloperWrite role on the Kafka topic
  • DeveloperWrite role on the Schema Registry subject, if the topic has an Avro schema

These roles enable a user to write data by using INSERT statements. The INSERT INTO statement contains a SELECT clause that requires the user to have read permissions on the topic in the query.

# Grant write access for a user to write to a topic.
confluent iam rolebinding create \
  --principal User:$USER_NAME \
  --role DeveloperWrite \
  --resource Topic:$TOPIC_NAME \
  --kafka-cluster-id $KAFKA_ID
# Grant write access for a user to write to a subject.
confluent iam rolebinding create \
  --principal User:$USER_NAME \
  --role DeveloperWrite \
  --resource Subject:${TOPIC_NAME}-value
  --kafka-cluster-id $KAFKA_ID \
  --schema-registry-cluster-id $SR_ID
CREATE STREAM
  • DeveloperRead role on the source topic
  • DeveloperRead role on the _confluent-ksql-${KSQL_SERVICE_ID} consumer groups
  • DeveloperRead role on the Schema Registry sink subject, if the topic has an Avro schema

These roles enable an interactive user to register a stream or table on the specified topic by using the CREATE STREAM statement.

# Grant permissions to read from a topic.
confluent iam rolebinding create \
  --principal User:$USER_NAME \
  --role DeveloperRead \
  --resource Topic:$TOPIC_NAME \
  --kafka-cluster-id $KAFKA_ID

confluent iam rolebinding create \
  --principal User:ksql \
  --role DeveloperRead \
  --resource Topic:$TOPIC_NAME \
  --kafka-cluster-id $KAFKA_ID
confluent iam rolebinding create \
  --principal User:$USER_NAME \
  --role DeveloperRead \
  --resource Group:_confluent-ksql-${KSQL_SERVICE_ID} \
  --prefix \
  --kafka-cluster-id $KAFKA_ID

confluent iam rolebinding create \
  --principal User:ksql \
  --role DeveloperRead \
  --resource Group:_confluent-ksql-${KSQL_SERVICE_ID} \
  --prefix \
  --kafka-cluster-id $KAFKA_ID
# If the topic has an Avro schema, both users need permissions for the subject in Schema Registry.
confluent iam rolebinding create \
  --principal User:$USER_NAME \
  --role DeveloperRead \
  --resource Subject:${TOPIC_NAME}-value \
  --kafka-cluster-id $KAFKA_ID \
  --schema-registry-cluster-id $SR_ID

confluent iam rolebinding create \
  --principal User:ksql \
  --role DeveloperRead \
  --resource Subject:${TOPIC_NAME}-value \
  --kafka-cluster-id $KAFKA_ID \
  --schema-registry-cluster-id $SR_ID
CREATE TABLE
  • DeveloperRead role on the source topic
  • ResourceOwner role on the _confluent-ksql-${KSQL_ID}transient transient topics
  • DeveloperRead role on the Schema Registry sink subject, if the topic has an Avro schema

These roles enable an interactive user to register a table on the specified topic by using the CREATE TABLE statement.

Note

The ResourceOwner role on the transient topics is a limitation of KSQL tables. Giving this permission to the prefixed topics lets the user view tables from other queries.

# Grant permissions to read from a topic.
confluent iam rolebinding create \
  --principal User:$USER_NAME \
  --role DeveloperRead \
  --resource Topic:$TOPIC_NAME \
  --kafka-cluster-id $KAFKA_ID

confluent iam rolebinding create \
  --principal User:ksql \
  --role DeveloperRead \
  --resource Topic:$TOPIC_NAME \
  --kafka-cluster-id $KAFKA_ID
# Grant access to the transient query topics.
# This is a limitation of KSQL tables. Giving this permission to
# the prefixed topics lets the user view tables from other queries.
confluent iam rolebinding create \
  --principal User:$USER_NAME \
  --role ResourceOwner \
  --resource Topic:_confluent-ksql-${KSQL_ID}transient \
  --prefix \
  --kafka-cluster-id $KAFKA_ID

confluent iam rolebinding create \
  --principal User:ksql \
  --role ResourceOwner \
  --resource Topic:_confluent-ksql-${KSQL_ID}transient \
  --prefix \
  --kafka-cluster-id $KAFKA_ID
# If the topic has an Avro schema, both users need permissions for the subject in Schema Registry.
confluent iam rolebinding create \
  --principal User:$USER_NAME \
  --role DeveloperRead \
  --resource Subject:${TOPIC_NAME}-value \
  --kafka-cluster-id $KAFKA_ID \
  --schema-registry-cluster-id $SR_ID

confluent iam rolebinding create \
  --principal User:ksql \
  --role DeveloperRead \
  --resource Subject:${TOPIC_NAME}-value \
  --kafka-cluster-id $KAFKA_ID \
  --schema-registry-cluster-id $SR_ID
Create a stream or table with a persistent query
  • DeveloperRead role on the source topic
  • ResourceOwner role on the KSQL sink topic
  • DeveloperRead role on the Schema Registry sink subject, if the topic has an Avro schema

These roles enable a user to create streams and tables with persistent queries. Because KSQL creates a new topic and a new Schema Registry schema, the user must have sufficient permissions to create, read, and write to the topic and Schema Registry subject. The ResourceOwner role is necessary on the sink topic, because the user and the KSQL user need permission to create the sink topic if it doesn't exist already.

Note

The sink topic has the same name as the stream or table and is all uppercase.

If the topic has an Avro schema, the interactive user and the KSQL user need DeveloperRead permission on the sink topic's subject in Schema Registry. The subject's name is the sink topic's name appended with -value.

confluent iam rolebinding create \
  --principal User:$USER_NAME \
  --role DeveloperRead \
  --resource Topic:$SOURCE_TOPIC_NAME \
  --kafka-cluster-id $KAFKA_ID

confluent iam rolebinding create \
  --principal User:ksql \
  --role DeveloperRead \
  --resource Topic:$SOURCE_TOPIC_NAME \
  --kafka-cluster-id $KAFKA_ID
confluent iam rolebinding create \
  --principal User:$USER_NAME \
  --role ResourceOwner \
  --resource Topic:$SINK_TOPIC_NAME \
  --kafka-cluster-id $KAFKA_ID

confluent iam rolebinding create \
  --principal User:ksql \
  --role ResourceOwner \
  --resource Topic:$SINK_TOPIC_NAME \
  --kafka-cluster-id $KAFKA_ID
# Grant roles for a user to access the sink topic's subject in Schema Registry.
confluent iam rolebinding create \
  --principal User:$USER_NAME \
  --role DeveloperRead \
  --resource Subject:${SINK_TOPIC_NAME}-value \
  --kafka-cluster-id $KAFKA_ID \
  --schema-registry-cluster-id $SR_ID

confluent iam rolebinding create \
  --principal User:ksql \
  --role DeveloperRead \
  --resource Subject:${SINK_TOPIC_NAME}-value \
  --kafka-cluster-id $KAFKA_ID \
  --schema-registry-cluster-id $SR_ID
Full control over a topic and schema
  • ResourceOwner role on the Kafka topic
  • ResourceOwner role on the Schema Registry subject, if the topic has an Avro schema

Use the following commands to grant a user full control over a topic and its schema, including permissions to delete the topic and schema.

# Grant full access for a user to manage a topic.
confluent iam rolebinding create \
  --principal User:<topic-owner-name> \
  --role ResourceOwner \
  --resource Topic:$TOPIC_NAME \
  --kafka-cluster-id $KAFKA_ID
# Grant full access for a user to manage a subject.
confluent iam rolebinding create \
  --principal User:<topic-owner-name> \
  --role ResourceOwner \
  --resource Subject:${TOPIC_NAME}-value
  --kafka-cluster-id $KAFKA_ID \
  --schema-registry-cluster-id $SR_ID
Delete a topic
  • DeveloperManage role on the Kafka topic
  • DeveloperManage role on the Schema Registry subject, if the topic has an Avro schema

These roles enable a user to delete a topic by using the DROP STREAM/TABLE [DELETE TOPIC] statements.

Use the following commands to grant a user delete access to a topic and corresponding schema.

# Grant delete access for a user to a topic.
confluent iam rolebinding create \
  --principal User:$USER_NAME \
  --role DeveloperManage \
  --resource Topic:$TOPIC_NAME \
  --kafka-cluster-id $KAFKA_ID
# Grant delete access for a user to a subject.
confluent iam rolebinding create \
  --principal User:$USER_NAME \
  --role DeveloperManage \
  --resource Subject:${TOPIC_NAME}-value
  --kafka-cluster-id $KAFKA_ID \
  --schema-registry-cluster-id $SR_ID