Configure RBAC for a Connect Worker

Tip

Before configuring RBAC for Kafka Connect, read the white paper Role-Based Access Control (RBAC) for Kafka Connect. This white paper covers basic RBAC concepts and provides a deep dive into using RBAC with Kafka Connect and connectors. It also contains a link to a GitHub demo so you can see how it all works on a local Confluent Platform installation.

In an RBAC-enabled environment, several RBAC configuration lines need to be added to each Connect worker file. Refer to the following for information about what needs to be added to each Connect worker file.

  1. Add the following parameter to enable per-connector principals.

    connector.client.config.override.policy=All
    
  2. Add the following parameters to enable the Connect framework to authenticate with Kafka using a service principal. The service principal is used by Connect to read from and write to internal configuration topics. Note that the <username> and <passsword> are the service principal username and password granted permissions when setting up the service principal.

    # Or SASL_SSL if using TLS/SSL
    security.protocol=SASL_PLAINTEXT
    sasl.mechanism=OAUTHBEARER
    sasl.login.callback.handler.class=io.confluent.kafka.clients.plugins.auth.token.TokenUserLoginCallbackHandler
    sasl.jaas.config=org.apache.kafka.common.security.oauthbearer.OAuthBearerLoginModule required \
      username="<username>" \
      password="<password>" \
      metadataServerUrls="http(s)://<host>:<port>";
    
  3. Add the following parameters to establish worker-wide default properties for each type of Kafka client used by connectors in the cluster.

    producer.security.protocol=SASL_PLAINTEXT
    producer.sasl.mechanism=OAUTHBEARER
    producer.sasl.login.callback.handler.class=io.confluent.kafka.clients.plugins.auth.token.TokenUserLoginCallbackHandler
    

    Note

    Any principal used by Idempotent producers must be granted IdempotentWrite on the cluster or Write permission on any topic to initialize the producer client. Binding either the DeveloperWrite or ResourceOwner RBAC roles on the Kafka cluster grants Write permission. Note that DeveloperWrite is the less permissive of the two roles, and is the first recommendation. Consume does not require additional Kafka permissions to be Idempotent consumers. The following role binding ensures that Write has access to the cluster:

    confluent iam rbac role-binding create \
      --principal $PRINCIPAL \
      --role DeveloperWrite \
      --resource Cluster:kafka-cluster \
      --kafka-cluster $KAFKA_CLUSTER_ID
    
    consumer.security.protocol=SASL_PLAINTEXT
    consumer.sasl.mechanism=OAUTHBEARER
    consumer.sasl.login.callback.handler.class=io.confluent.kafka.clients.plugins.auth.token.TokenUserLoginCallbackHandler
    
    admin.security.protocol=SASL_PLAINTEXT
    admin.sasl.mechanism=OAUTHBEARER
    admin.sasl.login.callback.handler.class=io.confluent.kafka.clients.plugins.auth.token.TokenUserLoginCallbackHandler
    
  4. Add the following Metadata Service (MDS) parameters to require user RBAC authentication for Connect. RBAC authentication is required to allow users to create connectors, read connector configurations, and delete connectors.

    # Adds the RBAC REST extension to the Connect worker
    rest.extension.classes=io.confluent.connect.security.ConnectSecurityExtension
    
    # The location of a running metadata service
    confluent.metadata.bootstrap.server.urls=<mds_server_url>
    
    # Credentials to use when communicating with the MDS
    confluent.metadata.basic.auth.user.info=<username>:<password>
    confluent.metadata.http.auth.credentials.provider=BASIC
    

    Note

    For additional configurations available to any client communicating with MDS, see REST client configurations in the Confluent Platform Security documentation.

  5. Add the following parameter to have Connect use basic authentication for user requests and token authentication for impersonated requests (for example, from REST proxy).

    rest.servlet.initializor.classes=io.confluent.common.security.jetty.initializer.InstallBearerOrBasicSecurityHandler
    
    # The path to a directory containing public keys that should be used to verify json web tokens
    # during authentication
    public.key.path=<public key path>
    

See Secret Registry if you are using a Secret Registry for connector credentials.