Example - Enable RBAC for Deployment in Confluent Platform

This example shows how to enable role-based access control (RBAC) functionality across Confluent Platform. It is for users who have downloaded Confluent Platform to their local hosts.

See also

For an RBAC example that is more representative of a real deployment of a Kafka event streaming application, see Scripted Confluent Platform Demo, a Docker-based example with RBAC and other Confluent Platform security features and LDAP integration.

Run example on local install of Confluent Platform

Caveats

  • For simplicity, this example does not use LDAP, instead it uses the Hash Login service with statically defined users/passwords. Additional configurations would be required if you wanted to augment the example to connect to your LDAP server.
  • The RBAC configurations and role bindings in this example are not comprehensive, they provide minimum RBAC functionality set up across all the services in Confluent Platform. Please refer to the RBAC documentation for comprehensive configuration and production guidance.

Prerequisites

Confluent Platform is supported in various operating systems and software versions (see Supported Versions and Interoperability for Confluent Platform for details). This example has been validated with the specific configuration described below. If you are running the example in Windows, which is not officially supported, the example may still work if you update the example code in GitHub, replacing the symlink .env with the contents of config.env.

  • macOS 10.15.3
  • Confluent Platform 7.7.1
  • Java 11.0.6 2020-01-14 LTS
  • bash version 3.2.57
  • jq 1.6
  • (Docker-based examples) Docker version 19.03.8
  • (Docker-based examples) Docker Compose docker-compose version 1.25.4

Run example

  1. Clone the confluentinc/examples GitHub repository, and check out the 7.7.1-post branch.

    git clone https://github.com/confluentinc/examples.git
    cd examples
    git checkout 7.7.1-post
    
  2. Navigate to security/rbac/scripts directory.

    cd security/rbac/scripts
    
  3. You have two options to run the example.

    • Option 1: run the example end-to-end for all services

      ./run.sh
      
    • Option 2: step through it one service at a time

      ./init.sh
      ./enable-rbac-broker.sh
      ./enable-rbac-schema-registry.sh
      ./enable-rbac-connect.sh
      ./enable-rbac-rest-proxy.sh
      ./enable-rbac-ksqldb-server.sh
      ./enable-rbac-control-center.sh
      
  4. After you run the example, view the configuration files:

    # The original configuration bundled with Confluent Platform
    ls /tmp/original_configs/
    
    # Configurations added to each service's properties file
    ls ../delta_configs/
    
    # The modified configuration = original + delta
    ls /tmp/rbac_configs/
    
  5. After you run the example, view the log files for each of the services. All logs are saved in the temporary directory /tmp/rbac_logs/.

    In that directory, you can step through the configuration properties for each of the services:

    connect
    control-center
    kafka
    kafka-rest
    ksql-server
    schema-registry
    zookeeper
    
  6. In this example, the metadata service (MDS) logs are saved under your Confluent Platform installation directory.

    cat $CONFLUENT_HOME/logs/metadata-service.log
    

Stop example

To stop the example, stop Confluent Platform, and delete files in /tmp/.

cd scripts
./cleanup.sh

Summary of Configurations and Role Bindings

Here is a summary of the delta configurations and required role bindings, by service.

Note

For simplicity, this example uses the Hash Login service instead of LDAP. If you are using LDAP in your environment, extra configurations are required.

Broker

  • Additional RBAC configurations required for server.properties

    # Confluent Authorizer Settings
    # Semi-colon separated list of super users in the format <principalType>:<principalName>
    # For example super.users=User:admin;User:mds
    super.users=User:ANONYMOUS;User:mds
    
    # MDS Server Settings
    confluent.metadata.topic.replication.factor=1
    
    # MDS Token Service Settings
    confluent.metadata.server.token.key.path=/tmp/tokenKeypair.pem
    
    # Configure the RBAC Metadata Service authorizer
    authorizer.class.name=io.confluent.kafka.security.authorizer.ConfluentServerAuthorizer
    confluent.authorizer.access.rule.providers=CONFLUENT,ZK_ACL
    
    # Bind Metadata Service HTTP service to port 8090
    confluent.metadata.server.listeners=http://0.0.0.0:8090
    # Configure HTTP service advertised hostname. Set this to http://127.0.0.1:8090 if running locally.
    confluent.metadata.server.advertised.listeners=http://127.0.0.1:8090
    
    # HashLoginService Initializer
    confluent.metadata.server.authentication.method=BEARER
    confluent.metadata.server.user.store=FILE
    confluent.metadata.server.user.store.file.path=/tmp/login.properties
    
    # Add named listener TOKEN to existing listeners and advertised.listeners
    listeners=TOKEN://:9092,PLAINTEXT://:9093
    advertised.listeners=TOKEN://localhost:9092,PLAINTEXT://localhost:9093
     
    # Add protocol mapping for newly added named listener TOKEN
    listener.security.protocol.map=PLAINTEXT:PLAINTEXT,TOKEN:SASL_PLAINTEXT
    
    listener.name.token.sasl.enabled.mechanisms=OAUTHBEARER
     
    # Configure the public key used to verify tokens
    # Note: username, password and metadataServerUrls must be set if used for inter-broker communication
    listener.name.token.oauthbearer.sasl.jaas.config= \
        org.apache.kafka.common.security.oauthbearer.OAuthBearerLoginModule required \
            publicKeyPath="/tmp/tokenPublicKey.pem";
    # Set SASL callback handler for verifying authentication token signatures
    listener.name.token.oauthbearer.sasl.server.callback.handler.class=io.confluent.kafka.server.plugins.auth.token.TokenBearerValidatorCallbackHandler
    # Set SASL callback handler for handling tokens on login. This is essentially a noop if not used for inter-broker communication.
    listener.name.token.oauthbearer.sasl.login.callback.handler.class=io.confluent.kafka.server.plugins.auth.token.TokenBearerServerLoginCallbackHandler
    
    # Settings for Self-Balancing Clusters
    confluent.balancer.topic.replication.factor=1
    
    # Settings for Audit Logging
    confluent.security.event.logger.exporter.kafka.topic.replicas=1
    
  • Role bindings:

    # Broker Admin
    confluent iam rbac role-binding create --principal User:$USER_ADMIN_SYSTEM --role SystemAdmin --kafka-cluster $KAFKA_CLUSTER_ID
    
    # Producer/Consumer
    confluent iam rbac role-binding create --principal User:$USER_CLIENT_A --role ResourceOwner --resource Topic:$TOPIC1 --kafka-cluster $KAFKA_CLUSTER_ID
    confluent iam rbac role-binding create --principal User:$USER_CLIENT_A --role DeveloperRead --resource Group:console-consumer- --prefix --kafka-cluster $KAFKA_CLUSTER_ID
    

Schema Registry

  • Additional RBAC configurations required for schema-registry.properties

    kafkastore.bootstrap.servers=localhost:9092
    kafkastore.security.protocol=SASL_PLAINTEXT
    kafkastore.sasl.mechanism=OAUTHBEARER
    kafkastore.sasl.login.callback.handler.class=io.confluent.kafka.clients.plugins.auth.token.TokenUserLoginCallbackHandler
    kafkastore.sasl.jaas.config=org.apache.kafka.common.security.oauthbearer.OAuthBearerLoginModule required username="sr" password="sr1" metadataServerUrls="http://localhost:8090";
    
    # Schema Registry group id, which is the cluster id
    schema.registry.group.id=schema-registry-demo
    
    # These properties install the Schema Registry security plugin, and configure it to use RBAC for authorization and OAuth for authentication
    resource.extension.class=io.confluent.kafka.schemaregistry.security.SchemaRegistrySecurityResourceExtension
    confluent.schema.registry.authorizer.class=io.confluent.kafka.schemaregistry.security.authorizer.rbac.RbacAuthorizer
    rest.servlet.initializor.classes=io.confluent.common.security.jetty.initializer.InstallBearerOrBasicSecurityHandler
    
    # The location of a running metadata service; used to verify that requests are authorized by the users that make them
    confluent.metadata.bootstrap.server.urls=http://localhost:8090
    
    # Credentials to use when communicating with the MDS; these should usually match the ones used for communicating with Kafka
    confluent.metadata.basic.auth.user.info=sr:sr1
    confluent.metadata.http.auth.credentials.provider=BASIC
    
    # The path to public keys that should be used to verify json web tokens during authentication
    public.key.path=/tmp/tokenPublicKey.pem
    
    # This enables anonymous access with a principal of User:ANONYMOUS
    confluent.schema.registry.anonymous.principal=true
    authentication.skip.paths=/*
    
  • Role bindings:

    # Schema Registry Admin
    confluent iam rbac role-binding create --principal User:$USER_ADMIN_SCHEMA_REGISTRY --role ResourceOwner --resource Topic:_schemas --kafka-cluster $KAFKA_CLUSTER_ID
    confluent iam rbac role-binding create --principal User:$USER_ADMIN_SCHEMA_REGISTRY --role SecurityAdmin --kafka-cluster $KAFKA_CLUSTER_ID --schema-registry-cluster $SCHEMA_REGISTRY_CLUSTER_ID
    confluent iam rbac role-binding create --principal User:$USER_ADMIN_SCHEMA_REGISTRY --role ResourceOwner --resource Group:$SCHEMA_REGISTRY_CLUSTER_ID --kafka-cluster $KAFKA_CLUSTER_ID
    confluent iam rbac role-binding create --principal User:$USER_ADMIN_SCHEMA_REGISTRY --role DeveloperRead --resource Topic:$LICENSE_TOPIC --kafka-cluster $KAFKA_CLUSTER_ID
    confluent iam rbac role-binding create --principal User:$USER_ADMIN_SCHEMA_REGISTRY --role DeveloperWrite --resource Topic:$LICENSE_TOPIC --kafka-cluster $KAFKA_CLUSTER_ID
    
    # Client connecting to Schema Registry
    confluent iam rbac role-binding create --principal User:$USER_CLIENT_A --role ResourceOwner --resource Subject:$SUBJECT --kafka-cluster $KAFKA_CLUSTER_ID --schema-registry-cluster $SCHEMA_REGISTRY_CLUSTER_ID
    

Connect

  • Additional RBAC configurations required for connect-avro-distributed.properties

    bootstrap.servers=localhost:9092
    security.protocol=SASL_PLAINTEXT
    sasl.mechanism=OAUTHBEARER
    sasl.login.callback.handler.class=io.confluent.kafka.clients.plugins.auth.token.TokenUserLoginCallbackHandler
    sasl.jaas.config=org.apache.kafka.common.security.oauthbearer.OAuthBearerLoginModule required username="connect" password="connect1" metadataServerUrls="http://localhost:8090";
    
    ## Connector client (producer, consumer, admin client) properties ##
    key.converter=org.apache.kafka.connect.storage.StringConverter
    value.converter=io.confluent.connect.avro.AvroConverter
    group.id=connect-cluster
    offset.storage.topic=connect-offsets
    offset.storage.replication.factor=1
    config.storage.topic=connect-configs
    config.storage.replication.factor=1
    status.storage.topic=connect-statuses
    status.storage.replication.factor=1
    
    
    
    # Allow producer/consumer/admin client overrides (this enables per-connector principals)
    connector.client.config.override.policy=All
    
    producer.security.protocol=SASL_PLAINTEXT
    producer.sasl.mechanism=OAUTHBEARER
    producer.sasl.login.callback.handler.class=io.confluent.kafka.clients.plugins.auth.token.TokenUserLoginCallbackHandler
    # Intentionally omitting `producer.sasl.jaas.config` to force connectors to use their own
    
    consumer.security.protocol=SASL_PLAINTEXT
    consumer.sasl.mechanism=OAUTHBEARER
    consumer.sasl.login.callback.handler.class=io.confluent.kafka.clients.plugins.auth.token.TokenUserLoginCallbackHandler
    # Intentionally omitting `consumer.sasl.jaas.config` to force connectors to use their own
    
    admin.security.protocol=SASL_PLAINTEXT
    admin.sasl.mechanism=OAUTHBEARER
    admin.sasl.login.callback.handler.class=io.confluent.kafka.clients.plugins.auth.token.TokenUserLoginCallbackHandler
    # Intentionally omitting `admin.sasl.jaas.config` to force connectors to use their own
    
    ## REST extensions: RBAC and Secret Registry ##
    
    # Installs the RBAC and Secret Registry REST extensions
    rest.extension.classes=io.confluent.connect.security.ConnectSecurityExtension,io.confluent.connect.secretregistry.ConnectSecretRegistryExtension
    
    ## RBAC Authentication ##
    
    # Enables basic and bearer authentication for requests made to the worker
    rest.servlet.initializor.classes=io.confluent.common.security.jetty.initializer.InstallBearerOrBasicSecurityHandler
    
    # The path to a directory containing public keys that should be used to verify json web tokens during authentication
    public.key.path=/tmp/tokenPublicKey.pem
    
    ## RBAC Authorization ##
    
    # The location of a running metadata service; used to verify that requests are authorized by the users that make them
    confluent.metadata.bootstrap.server.urls=http://localhost:8090
    
    # Credentials to use when communicating with the MDS; these should usually match the ones used for communicating with Kafka
    confluent.metadata.basic.auth.user.info=connect:connect1
    confluent.metadata.http.auth.credentials.provider=BASIC
    
    ## Secret Registry Secret Provider ##
    
    config.providers=secret
    config.providers.secret.class=io.confluent.connect.secretregistry.rbac.config.provider.InternalSecretConfigProvider
    config.providers.secret.param.master.encryption.key=password1234
    
    config.providers.secret.param.kafkastore.bootstrap.servers=localhost:9092
    config.providers.secret.param.kafkastore.security.protocol=SASL_PLAINTEXT
    config.providers.secret.param.kafkastore.sasl.mechanism=OAUTHBEARER
    config.providers.secret.param.kafkastore.sasl.login.callback.handler.class=io.confluent.kafka.clients.plugins.auth.token.TokenUserLoginCallbackHandler
    config.providers.secret.param.kafkastore.sasl.jaas.config=org.apache.kafka.common.security.oauthbearer.OAuthBearerLoginModule required username="connect" password="connect1" metadataServerUrls="http://localhost:8090";
    
  • Additional RBAC configurations required for a source connector

    producer.override.sasl.jaas.config=org.apache.kafka.common.security.oauthbearer.OAuthBearerLoginModule required username="connector" password="connector1" metadataServerUrls="http://localhost:8090";
    
  • Additional RBAC configurations required for a sink connector

    consumer.override.sasl.jaas.config=org.apache.kafka.common.security.oauthbearer.OAuthBearerLoginModule required username="connector" password="connector1" metadataServerUrls="http://localhost:8090";
    
  • Role bindings:

    # Connect Admin
    confluent iam rbac role-binding create --principal User:$USER_ADMIN_CONNECT --role ResourceOwner --resource Topic:connect-configs --kafka-cluster $KAFKA_CLUSTER_ID
    confluent iam rbac role-binding create --principal User:$USER_ADMIN_CONNECT --role ResourceOwner --resource Topic:connect-offsets --kafka-cluster $KAFKA_CLUSTER_ID
    confluent iam rbac role-binding create --principal User:$USER_ADMIN_CONNECT --role ResourceOwner --resource Topic:connect-statuses --kafka-cluster $KAFKA_CLUSTER_ID
    confluent iam rbac role-binding create --principal User:$USER_ADMIN_CONNECT --role ResourceOwner --resource Group:connect-cluster --kafka-cluster $KAFKA_CLUSTER_ID
    confluent iam rbac role-binding create --principal User:$USER_ADMIN_CONNECT --role ResourceOwner --resource Topic:_confluent-secrets --kafka-cluster $KAFKA_CLUSTER_ID
    confluent iam rbac role-binding create --principal User:$USER_ADMIN_CONNECT --role ResourceOwner --resource Group:secret-registry --kafka-cluster $KAFKA_CLUSTER_ID
    confluent iam rbac role-binding create --principal User:$USER_ADMIN_CONNECT --role SecurityAdmin --kafka-cluster $KAFKA_CLUSTER_ID --connect-cluster $CONNECT_CLUSTER_ID
    
    # Connector Submitter
    confluent iam rbac role-binding create --principal User:$USER_CONNECTOR_SUBMITTER --role ResourceOwner --resource Connector:$CONNECTOR_NAME --kafka-cluster $KAFKA_CLUSTER_ID --connect-cluster $CONNECT_CLUSTER_ID
    
    # Connector
    confluent iam rbac role-binding create --principal User:$USER_CONNECTOR --role ResourceOwner --resource Topic:$TOPIC2_AVRO --kafka-cluster $KAFKA_CLUSTER_ID
    confluent iam rbac role-binding create --principal User:$USER_CONNECTOR --role ResourceOwner --resource Subject:${TOPIC2_AVRO}-value --kafka-cluster $KAFKA_CLUSTER_ID --schema-registry-cluster $SCHEMA_REGISTRY_CLUSTER_ID
    # Sink Connector
    confluent iam rbac role-binding create --principal User:$USER_CONNECTOR --role DeveloperRead --resource Group:$CONNECTOR_CONSUMER_GROUP_ID --prefix --kafka-cluster $KAFKA_CLUSTER_ID
    

REST Proxy

  • Additional RBAC configurations required for kafka-rest.properties

    # Configure connections to other Confluent Platform services
    bootstrap.servers=localhost:9092
    schema.registry.url=http://localhost:8081
    
    client.security.protocol=SASL_PLAINTEXT
    client.sasl.mechanism=OAUTHBEARER
    client.security.protocol=SASL_PLAINTEXT
    client.sasl.login.callback.handler.class=io.confluent.kafka.clients.plugins.auth.token.TokenUserLoginCallbackHandler
    client.sasl.jaas.config=org.apache.kafka.common.security.oauthbearer.OAuthBearerLoginModule required username="clientrp" password="clientrp1" metadataServerUrls="http://localhost:8090";
    
    kafka.rest.resource.extension.class=io.confluent.kafkarest.security.KafkaRestSecurityResourceExtension
    rest.servlet.initializor.classes=io.confluent.common.security.jetty.initializer.InstallBearerOrBasicSecurityHandler
    
    public.key.path=/tmp/tokenPublicKey.pem
    
    # Credentials to use with the MDS
    confluent.metadata.bootstrap.server.urls=http://localhost:8090
    confluent.metadata.basic.auth.user.info=rp:rp1
    confluent.metadata.http.auth.credentials.provider=BASIC
    
  • Role bindings:

    # REST Proxy Admin: role bindings for license management, no additional administrative rolebindings required because REST Proxy just does impersonation
    confluent iam rbac role-binding create --principal User:$USER_CLIENT_RP --role DeveloperRead --resource Topic:$LICENSE_TOPIC --kafka-cluster $KAFKA_CLUSTER_ID
    confluent iam rbac role-binding create --principal User:$USER_CLIENT_RP --role DeveloperWrite --resource Topic:$LICENSE_TOPIC --kafka-cluster $KAFKA_CLUSTER_ID
    
    # Producer/Consumer
    confluent iam rbac role-binding create --principal User:$USER_CLIENT_RP --role ResourceOwner --resource Topic:$TOPIC3 --kafka-cluster $KAFKA_CLUSTER_ID
    confluent iam rbac role-binding create --principal User:$USER_CLIENT_RP --role DeveloperRead --resource Group:$CONSUMER_GROUP --kafka-cluster $KAFKA_CLUSTER_ID
    

ksqlDB

  • Additional RBAC configurations required for ksql-server.properties

    bootstrap.servers=localhost:9092
    security.protocol=SASL_PLAINTEXT
    sasl.mechanism=OAUTHBEARER
    sasl.login.callback.handler.class=io.confluent.kafka.clients.plugins.auth.token.TokenUserLoginCallbackHandler
    sasl.jaas.config=org.apache.kafka.common.security.oauthbearer.OAuthBearerLoginModule required username="ksqlDBserver" password="ksqlDBserver1" metadataServerUrls="http://localhost:8090";
    
    # Specify KSQL service id used to bind user/roles to this cluster
    ksql.service.id=rbac-ksql
    
    # Enable KSQL authorization and impersonation
    ksql.security.extension.class=io.confluent.ksql.security.KsqlConfluentSecurityExtension
    
    # Enable KSQL Basic+Bearer authentication
    ksql.authentication.plugin.class=io.confluent.ksql.security.VertxBearerOrBasicAuthenticationPlugin
    public.key.path=/tmp/tokenPublicKey.pem
    
    # Metadata URL and access credentials
    confluent.metadata.bootstrap.server.urls=http://localhost:8090
    confluent.metadata.http.auth.credentials.provider=BASIC
    confluent.metadata.basic.auth.user.info=ksqlDBserver:ksqlDBserver1
    
    # Credentials for Schema Registry access
    ksql.schema.registry.url=http://localhost:8081
    ksql.schema.registry.basic.auth.user.info=ksqlDBserver:ksqlDBserver1
    
  • Role bindings:

    # ksqlDB Server Admin
    confluent iam rbac role-binding create --principal User:$USER_ADMIN_KSQLDB --role ResourceOwner --resource Topic:_confluent-ksql-${KSQL_SERVICE_ID}_command_topic --kafka-cluster $KAFKA_CLUSTER_ID
    confluent iam rbac role-binding create --principal User:$USER_ADMIN_KSQLDB --role ResourceOwner --resource Topic:${KSQL_SERVICE_ID}ksql_processing_log --kafka-cluster $KAFKA_CLUSTER_ID
    confluent iam rbac role-binding create --principal User:$USER_ADMIN_KSQLDB --role SecurityAdmin --kafka-cluster $KAFKA_CLUSTER_ID --ksql-cluster $KSQL_SERVICE_ID
    confluent iam rbac role-binding create --principal User:$USER_ADMIN_KSQLDB --role ResourceOwner --resource KsqlCluster:ksql-cluster --kafka-cluster $KAFKA_CLUSTER_ID --ksql-cluster $KSQL_SERVICE_ID
    
    # ksqlDB CLI queries
    confluent iam rbac role-binding create --principal User:${USER_KSQLDB} --role DeveloperWrite --resource KsqlCluster:ksql-cluster --kafka-cluster $KAFKA_CLUSTER_ID --ksql-cluster $KSQL_SERVICE_ID
    confluent iam rbac role-binding create --principal User:${USER_KSQLDB} --role DeveloperRead --resource Topic:$TOPIC1 --kafka-cluster $KAFKA_CLUSTER_ID
    confluent iam rbac role-binding create --principal User:${USER_KSQLDB} --role DeveloperRead --resource Group:_confluent-ksql-${KSQL_SERVICE_ID} --prefix --kafka-cluster $KAFKA_CLUSTER_ID
    confluent iam rbac role-binding create --principal User:${USER_KSQLDB} --role DeveloperRead --resource Topic:${KSQL_SERVICE_ID}ksql_processing_log --kafka-cluster $KAFKA_CLUSTER_ID
    confluent iam rbac role-binding create --principal User:${USER_ADMIN_KSQLDB} --role DeveloperRead --resource Group:_confluent-ksql-${KSQL_SERVICE_ID} --prefix --kafka-cluster $KAFKA_CLUSTER_ID
    confluent iam rbac role-binding create --principal User:${USER_ADMIN_KSQLDB} --role DeveloperRead --resource Topic:$TOPIC1 --kafka-cluster $KAFKA_CLUSTER_ID
    confluent iam rbac role-binding create --principal User:${USER_ADMIN_KSQLDB} --role ResourceOwner --resource TransactionalId:${KSQL_SERVICE_ID} --kafka-cluster $KAFKA_CLUSTER_ID
    confluent iam rbac role-binding create --principal User:${USER_KSQLDB} --role ResourceOwner --resource Topic:_confluent-ksql-${KSQL_SERVICE_ID}transient --prefix --kafka-cluster $KAFKA_CLUSTER_ID
    confluent iam rbac role-binding create --principal User:${USER_ADMIN_KSQLDB} --role ResourceOwner --resource Topic:_confluent-ksql-${KSQL_SERVICE_ID}transient --prefix --kafka-cluster $KAFKA_CLUSTER_ID
    confluent iam rbac role-binding create --principal User:${USER_KSQLDB} --role ResourceOwner --resource Topic:${CSAS_STREAM1} --kafka-cluster $KAFKA_CLUSTER_ID
    confluent iam rbac role-binding create --principal User:${USER_ADMIN_KSQLDB} --role ResourceOwner --resource Topic:${CSAS_STREAM1} --kafka-cluster $KAFKA_CLUSTER_ID
    confluent iam rbac role-binding create --principal User:${USER_KSQLDB} --role ResourceOwner --resource Topic:${CTAS_TABLE1} --kafka-cluster $KAFKA_CLUSTER_ID
    confluent iam rbac role-binding create --principal User:${USER_ADMIN_KSQLDB} --role ResourceOwner --resource Topic:${CTAS_TABLE1} --kafka-cluster $KAFKA_CLUSTER_ID
    confluent iam rbac role-binding create --principal User:${USER_ADMIN_KSQLDB} --role ResourceOwner --resource Topic:_confluent-ksql-${KSQL_SERVICE_ID} --prefix --kafka-cluster $KAFKA_CLUSTER_ID
    

Control Center

  • Additional RBAC configurations required for control-center-dev.properties

    confluent.controlcenter.rest.authentication.method=BEARER
    confluent.controlcenter.streams.security.protocol=SASL_PLAINTEXT
    public.key.path=/tmp/tokenPublicKey.pem
    
    confluent.metadata.basic.auth.user.info=c3:c31
    confluent.metadata.bootstrap.server.urls=http://localhost:8090
    
  • Role bindings:

    # Control Center Admin
    confluent iam rbac role-binding create --principal User:$USER_ADMIN_C3 --role SystemAdmin --kafka-cluster $KAFKA_CLUSTER_ID
    
    # Control Center user
    confluent iam rbac role-binding create --principal User:$USER_CLIENT_C --role DeveloperRead --resource Topic:$TOPIC1 --kafka-cluster $KAFKA_CLUSTER_ID
    confluent iam rbac role-binding create --principal User:$USER_CLIENT_C --role DeveloperRead --resource Topic:$TOPIC2_AVRO --kafka-cluster $KAFKA_CLUSTER_ID
    confluent iam rbac role-binding create --principal User:$USER_CLIENT_C --role DeveloperRead --resource Subject:${TOPIC2_AVRO}-value --kafka-cluster $KAFKA_CLUSTER_ID --schema-registry-cluster $SCHEMA_REGISTRY_CLUSTER_ID
    confluent iam rbac role-binding create --principal User:$USER_CLIENT_C --role DeveloperRead --resource Connector:$CONNECTOR_NAME --kafka-cluster $KAFKA_CLUSTER_ID --connect-cluster $CONNECT_CLUSTER_ID
    

General Rolebinding Syntax

  1. The general rolebinding syntax is:

    confluent iam rbac role-binding create --role [role name] --principal User:[username] --resource [resource type]:[resource name] --[cluster type]-cluster-id [insert cluster id]
    
  2. Available role types and permissions can be found here.

  3. Resource types include: Cluster, Group, Subject, Connector, TransactionalId, Topic.

Listing Roles for a User

General listing syntax:

confluent iam rbac role-binding list User:[username] [clusters and resources you want to view their roles on]

For example, list the roles of User:bender on Kafka cluster KAFKA_CLUSTER_ID

confluent iam rbac role-binding list --principal User:bender --kafka-cluster $KAFKA_CLUSTER_ID

Run example in Docker

A Docker-based RBAC example is Scripted Confluent Platform Demo. It is representative of a real deployment of a Kafka event streaming application, with RBAC and other Confluent Platform security features and LDAP integration.