Kafka Connect Security

Encryption

If you have enabled SSL encryption in your Kafka cluster, then you must make sure that Kafka Connect is also configured for security. Click on the section to configure encryption in Kafka Connect:

Authentication

If you have enabled authentication in your Kafka cluster, then you must make sure that Kafka Connect is also configured for security. Click on the section to configure authentication in Kafka Connect:

Separate principals

As of now, there is no way to change the configuration for connectors individually, but if your server supports client authentication over SSL, it is possible to use a separate principal for the worker and the connectors. In this case, you need to generate a separate certificate for each of them and install them in separate keystores.

The key Connect configuration _differences_ are as follows, notice the unique password, keystore location, and keystore password:

# Authentication settings for Connect workers
ssl.keystore.location=/var/private/ssl/kafka.worker.keystore.jks
ssl.keystore.password=worker1234
ssl.key.password=worker1234

Connect workers manage the producers used by source connectors and the consumers used by sink connectors. So, for the connectors to leverage security, you also have to override the default producer/consumer configuration that the worker uses.

# Authentication settings for Connect producers used with source connectors
producer.ssl.keystore.location=/var/private/ssl/kafka.source.keystore.jks
producer.ssl.keystore.password=connector1234
producer.ssl.key.password=connector1234

# Authentication settings for Connect consumers used with sink connectors
consumer.ssl.keystore.location=/var/private/ssl/kafka.sink.keystore.jks
consumer.ssl.keystore.password=connector1234
consumer.ssl.key.password=connector1234

ACL Considerations

Using separate principals for the connectors allows you to define access control lists (ACLs) with finer granularity. For example, you can use this capability to prevent the connectors themselves from writing to any of internal topics used by the Connect cluster. Additionally, you can use different keystores for source and sink connectors and enable scenarios where source connectors have only write access to a topic but sink connectors have only read access to the same topic.

Note that if you are using SASL for authentication, you must use the same principal for workers and connectors as only a single JAAS is currently supported on the client side at this time as described here.

Worker ACL Requirements

Workers must be given access to the common group that all workers in a cluster join, and to all the internal topics required by Connect. Read and write access to the internal topics are always required, but create access is only required if the internal topics don’t yet exist and Kafka Connect is to automatically create them. The table below shows each required permission and the relevant configuration setting used to define its value.

Operation(s) Resource Configuration Item
Create Cluster config.storage.topic
Create Cluster config.storage.replication.factor
Create Cluster offset.storage.topic
Create Cluster offset.storage.partitions
Create Cluster offset.storage.replication.factor
Create Cluster status.storage.topic
Create Cluster status.storage.partitions
Create Cluster status.storage.replication.factor
Read/Write Topic config.storage.topic
Read/Write Topic offsets.storage.topic
Read/Write Topic status.storage.topic
Read Group group.id

See Adding ACLs for documentation on creating new ACLs from the command line.

Connector ACL Requirements

Source connectors must be given WRITE permission to any topics that they need to write to. Similarly, sink connectors need READ permission to any topics they will read from. They also need Group READ permission since sink tasks depend on consumer groups internally. Connect defines the consumer group.id conventionally for each sink connector as connect-{name} where {name} is substituted by the name of the connector. For example, if your sink connector is named “hdfs-logs” and it reads from a topic named “logs,” then you could add an ACL with the following command:

$ bin/kafka-acls --authorizer-properties zookeeper.connect=<Zk host:port> \
   --add --allow-principal User:<Sink Connector Principal> \
   --consumer --topic logs --group connect-hdfs-logs