If you have enabled SSL or SASL in your Kafka cluster, then you will need to make sure that the Connect workers are also configured to use them. Underneath the covers, the Connect worker is really just an advanced client. You can configure security support for it using the same configuration options used by the standard producer and consumer. For example, to enable SSL (without client authentication), you could add the following options to the worker’s configuration:
# Worker security are located at the top level security.protocol=SSL ssl.truststore.location=/var/private/ssl/kafka.client.truststore.jks ssl.truststore.password=test1234
See the security documentation for more information on the different options that are available. The top-level settings are used by the worker for group coordination and to read and write to the internal topics which are used to track the cluster’s state (e.g. configs and offsets).
For the connectors to leverage security, however, you also need to override the default producer/consumer configuration that the worker uses. To do so, you simply prefix “producer” for the configuration options which the worker should use for source connectors, and “consumer” for sink connectors. Continuing the same example, you might add the following to the worker’s configuration:
# Source security settings are prefixed with "producer" producer.security.protocol=SSL producer.ssl.truststore.location=/var/private/ssl/kafka.client.truststore.jks producer.ssl.truststore.password=test1234 # Sink security settings are prefixed with "consumer" consumer.security.protocol=SSL consumer.ssl.truststore.location=/var/private/ssl/kafka.client.truststore.jks consumer.ssl.truststore.password=test1234
As of now, there is no way to change the configuration for connectors individually, but if your server supports client authentication over SSL, it is possible to use a separate principal for the worker and the connectors. In this case, you need to generate a separate certificate for each of them and install them in separate keystores. In the example below, we show what the worker configuration might look like in this case:
# Worker authentication settings security.protocol=SSL ssl.truststore.location=/var/private/ssl/kafka.client.truststore.jks ssl.truststore.password=test1234 ssl.keystore.location=/var/private/ssl/kafka.worker.keystore.jks ssl.keystore.password=worker1234 ssl.key.password=worker1234 # Source authentication settings producer.security.protocol=SSL producer.ssl.truststore.location=/var/private/ssl/kafka.client.truststore.jks producer.ssl.truststore.password=test1234 producer.ssl.keystore.location=/var/private/ssl/kafka.connector.keystore.jks producer.ssl.keystore.password=connector1234 producer.ssl.key.password=connector1234 # Sink authentication settings consumer.security.protocol=SSL consumer.ssl.truststore.location=/var/private/ssl/kafka.client.truststore.jks consumer.ssl.truststore.password=test1234 consumer.ssl.keystore.location=/var/private/ssl/kafka.connector.keystore.jks consumer.ssl.keystore.password=connector1234 consumer.ssl.key.password=connector1234
Using separate principals for the connectors allows you to define access control lists (ACLs) with some finer granularity. For example, you can use this capability to prevent the connectors themselves from writing to any of internal topics used by the Connect cluster. Although we have used the same principal for sources and sinks in this example, they could be different as well. This would allow you to prevent sink connectors from writing to any topics.
Note that if you are using SASL for authentication, you must use the same principal for workers and connectors.
At a minimum, the worker must be given access to all the internal topics it needs to access and to the common group which all workers in the cluster join. The table below shows each required permission and the relevant configuration setting used to define its value.
See Adding ACLs for documentation on creating new ACLs from the command line.
Source connectors must be given
WRITE permission to any topics
that they need to write to. Similarly, sink connectors need
permission to any topics they will read from. They also need Group
READ permission since sink tasks depend on consumer groups
internally. Connect defines the consumer
for each sink connector as
substituted by the name of the connector. For example, if your sink
connector is named “hdfs-logs” and it reads from a topic named “logs,”
then you could add an ACL with the following command:
$ bin/kafka-acls --authorizer-properties zookeeper.connect=<Zk host:port> \ --add --allow-principal User:<Sink Connector Principal> \ --consumer --topic logs --group connect-hdfs-logs