Important
You are viewing documentation for an older version of Confluent Platform. For the latest, click here.
Configuring RBAC for connectors¶
In an RBAC-enabled environment, individual connectors can override the Connect worker principal configuration. This allows each connector to use a separate principal with specific access privileges for specific topics, increasing security for your Kafka environment. This is recommended for Kafka production environments using RBAC.
Note
See Secret Registry if you are using a Secret Registry for connector credentials.
Important
The configuration steps in the following sections assume you have included worker-wide default properties.
Tip
Before configuring RBAC for Kafka Connect, read the white paper Role-Based Access Control (RBAC) for Kafka Connect. This white paper covers basic RBAC concepts and provides a deep dive into using RBAC with Kafka Connect and connectors. It also contains a link to a GitHub demo so you can see how it all works on a local Confluent Platform installation.
Source connectors¶
Add the following lines and a valid service principal to every Source connector created in the Connect cluster.
producer.override.sasl.jaas.config=org.apache.kafka.common.security.oauthbearer.OAuthBearerLoginModule required \
username="<username>" \
password="<password>" \
metadataServerUrls="<metadata_server_urls>";
Sink connectors¶
Add the following lines and a valid service principal to every Sink connector created in the Connect cluster.
consumer.override.sasl.jaas.config=org.apache.kafka.common.security.oauthbearer.OAuthBearerLoginModule required \
username="<username>" \
password="<password>" \
metadataServerUrls="<metadata_server_urls>";
Additionally, if the connector is using the dead letter queue feature, you need to add a configuration block for both a Producer and Admin Client in the connector. The reason for this is that invalid (dropped) sink messages are passed to a Producer constructed to send records to the dead letter queue and then the Admin Client creates the dead letter queue topic. Both of these need to have service principals to function.
To use the dead letter queue feature, add two additional configuration sections as shown below.
producer.override.sasl.jaas.config=org.apache.kafka.common.security.oauthbearer.OAuthBearerLoginModule required \
username="<username>" \
password="<password>" \
metadataServerUrls="<metadata_server_urls>";
admin.override.sasl.jaas.config=org.apache.kafka.common.security.oauthbearer.OAuthBearerLoginModule required \
username="<username>" \
password="<password>" \
metadataServerUrls="<metadata_server_urls>";
Schema Registry-based converters¶
The following Schema Registry-based converters are available for Connect:
- Avro converter:
io.confluent.connect.avro.AvroConverter
- Protobuf converter:
io.confluent.connect.protobuf.ProtobufConverter
- JSON Schema converter:
io.confluent.connect.json.JsonSchemaConverter
To use an Schema Registry-based converter with an RBAC-enabled Schema Registry, you first
add the key.converter
or value converter
property to the connector
configuration. The following example shows the value.converter
property for
Avro, Protobuf, and JSON Schema:
"value.converter": "io.confluent.connect.avro.AvroConverter"
"value.converter": "io.confluent.connect.protobuf.ProtobufConverter"
"value.converter": "io.confluent.connect.json.JsonSchemaConverter"
Then, to properly authenticate with Schema Registry, you add the following properties. Note
that the <username
and <password>
entered are the connector’s service principal
username and password. The following properties are used interchangeably between
the three converters:
"value.converter.schema.registry.url": "<schema-registry-url>",
"value.converter.basic.auth.credentials.source": "USER_INFO",
"value.converter.basic.auth.user.info": "<username>:<password>"
Using Avro as an example, a complete converter configuration snippet is shown below:
"value.converter": "io.confluent.connect.avro.AvroConverter"
"value.converter.schema.registry.url": "<schema-registry-url>",
"value.converter.basic.auth.credentials.source": "USER_INFO",
"value.converter.basic.auth.user.info": "<username>:<password>"
For additional information about Connect converters, see Configuring Key and Value Converters.