Configuring RBAC for connectors

Tip

Before configuring RBAC for Kafka Connect, read the white paper Role-Based Access Control (RBAC) for Kafka Connect. This white paper covers basic RBAC concepts and provides a deep dive into using RBAC with Kafka Connect and connectors. It also contains a link to a GitHub demo so you can see how it all works on a local Confluent Platform installation.

In an RBAC-enabled environment, individual connectors can override the Connect worker principal configuration. This allows each connector to use a separate principal with specific access privileges for specific topics, increasing security for your Kafka environment. This is recommended for Kafka production environments using RBAC.

See Secret Registry if you are using a Secret Registry for connector credentials.

Important

The configuration steps in the following sections assume you have included worker-wide default properties.

Source connectors

Add the following lines and a valid service principal to every Source connector created in the Connect cluster.

producer.override.sasl.jaas.config=org.apache.kafka.common.security.oauthbearer.OAuthBearerLoginModule required \
   username="<username>" \
   password="<password>" \
   metadataServerUrls="<metadata_server_urls>";

Sink connectors

Add the following lines and a valid service principal to every Sink connector created in the Connect cluster.

consumer.override.sasl.jaas.config=org.apache.kafka.common.security.oauthbearer.OAuthBearerLoginModule required \
   username="<username>" \
   password="<password>" \
   metadataServerUrls="<metadata_server_urls>";

Additionally, if the connector is using the dead letter queue feature, you need to add a configuration block for both a Producer and Admin Client in the connector. The reason for this is that invalid (dropped) sink messages are passed to a Producer constructed to send records to the dead letter queue and then the Admin Client creates the dead letter queue topic. Both of these need to have service principals to function.

To use the dead letter queue feature, add two additional configuration sections as shown below.

producer.override.sasl.jaas.config=org.apache.kafka.common.security.oauthbearer.OAuthBearerLoginModule required \
   username="<username>" \
   password="<password>" \
   metadataServerUrls="<metadata_server_urls>";
admin.override.sasl.jaas.config=org.apache.kafka.common.security.oauthbearer.OAuthBearerLoginModule required \
   username="<username>" \
   password="<password>" \
   metadataServerUrls="<metadata_server_urls>";

Schema Registry-based converters

The following Schema Registry-based converters are available for Connect:

  • Avro converter: io.confluent.connect.avro.AvroConverter
  • Protobuf converter: io.confluent.connect.protobuf.ProtobufConverter
  • JSON Schema converter: io.confluent.connect.json.JsonSchemaConverter

To use an Schema Registry-based converter with an RBAC-enabled Schema Registry, you first add the key.converter or value converter property to the connector configuration. The following example shows the value.converter property for Avro, Protobuf, and JSON Schema:

"value.converter": "io.confluent.connect.avro.AvroConverter"
"value.converter": "io.confluent.connect.protobuf.ProtobufConverter"
"value.converter": "io.confluent.connect.json.JsonSchemaConverter"

Then, to properly authenticate with Schema Registry, you add the following properties. Note that the <username and <password> entered are the connector’s service principal username and password. The following properties are used interchangeably between the three converters:

"value.converter.schema.registry.url": "<schema-registry-url>",
"value.converter.basic.auth.credentials.source": "USER_INFO",
"value.converter.basic.auth.user.info": "<username>:<password>"

Using Avro as an example, a complete converter configuration snippet is shown below:

"value.converter": "io.confluent.connect.avro.AvroConverter"
"value.converter.schema.registry.url": "<schema-registry-url>",
"value.converter.basic.auth.credentials.source": "USER_INFO",
"value.converter.basic.auth.user.info": "<username>:<password>"

For additional information about Connect converters, see Configuring Key and Value Converters.