Kafka Connect Security Basics¶
Encryption¶
If you have enabled SSL encryption in your Apache Kafka® cluster, then you must make sure that Kafka Connect is also configured for security. Click on the section to configure encryption in Kafka Connect:
Authentication¶
If you have enabled authentication in your Kafka cluster, then you must make sure that Kafka Connect is also configured for security. Click on the section to configure authentication in Kafka Connect:
Separate principals¶
Within the Connect worker configuration, all properties having a prefix of
producer.
and consumer.
are applied to all source and sink connectors
created in the worker. The admin.
prefix is used for error reporting in sink connectors. The following describes how these prefixes are used:
- The
consumer.
prefix controls consumer behavior for sink connectors. - The
producer.
prefix controls producer behavior for source connectors. - Both the
producer.
andadmin.
prefixes control producer and client behavior for sink connector error reporting.
You can override these properties for individual connectors using the
producer.override.
, consumer.override.
, and admin.override.
prefixes. This includes overriding the worker service principal configuration to
create separate service principals for each connector. Overrides are disabled by
default. They are enabled using the connector.client.config.override.policy
worker property. This property sets the per-connector overrides the worker
permits. The out-of-the-box (OOTB) options for the override policy are:
connector.client.config.override.policy=None
- Default. Does not allow any configuration overrides.
connector.client.config.override.policy=Principal
- Allows overrides for the
security.protocol
,sasl.jaas.config
, andsasl.mechanism
configuration properties, using theproducer.override.
,consumer.override
, andadmin.override
prefixes.
connector.client.config.override.policy=All
- Allows overrides for all configuration properties using the
producer.override.
,consumer.override
, andadmin.override
prefixes.
Tip
You can write your own implementation of the ConnectorClientConfigOverridePolicy class if any of the OOTB policies don’t meet your needs.
If your Kafka broker supports client authentication over SSL, you can configure a separate principal for the worker and the connectors. In this case, you need to generate a separate certificate for each of them and install them in separate keystores.
The key Connect configuration differences are as follows, notice the unique password, keystore location, and keystore password:
# Authentication settings for Connect workers
ssl.keystore.location=/var/private/ssl/kafka.worker.keystore.jks
ssl.keystore.password=worker1234
ssl.key.password=worker1234
Connect workers manage the producers used by source connectors and the consumers used by sink connectors. So, for the connectors to leverage security, you also have to override the default producer/consumer configuration that the worker uses.
# Authentication settings for Connect producers used with source connectors
producer.ssl.keystore.location=/var/private/ssl/kafka.source.keystore.jks
producer.ssl.keystore.password=connector1234
producer.ssl.key.password=connector1234
# Authentication settings for Connect consumers used with sink connectors
consumer.ssl.keystore.location=/var/private/ssl/kafka.sink.keystore.jks
consumer.ssl.keystore.password=connector1234
consumer.ssl.key.password=connector1234
ACL Considerations¶
Using separate principals for the connectors allows you to define access control lists (ACLs) with finer granularity. For example, you can use this capability to prevent the connectors themselves from writing to any of internal topics used by the Connect cluster. Additionally, you can use different keystores for source and sink connectors and enable scenarios where source connectors have only write access to a topic but sink connectors have only read access to the same topic.
Worker ACL Requirements¶
Workers must be given access to the common group that all workers in a cluster join, and to all the internal topics required by Connect. Read and write access to the internal topics are always required, but create access is only required if the internal topics don’t yet exist and Kafka Connect is to automatically create them. The table below shows each required permission and the relevant configuration setting used to define its value.
Operation(s) | Resource | Configuration Item |
---|---|---|
Create | Cluster | config.storage.topic |
Create | Cluster | config.storage.replication.factor |
Create | Cluster | offset.storage.topic |
Create | Cluster | offset.storage.partitions |
Create | Cluster | offset.storage.replication.factor |
Create | Cluster | status.storage.topic |
Create | Cluster | status.storage.partitions |
Create | Cluster | status.storage.replication.factor |
Read/Write | Topic | config.storage.topic |
Read/Write | Topic | offsets.storage.topic |
Read/Write | Topic | status.storage.topic |
Read | Group | group.id |
See Adding ACLs for documentation on creating new ACLs from the command line.
Connector ACL Requirements¶
Source connectors must be given WRITE
permission to any topics
that they need to write to. Similarly, sink connectors need READ
permission to any topics they will read from. They also need Group
READ
permission since sink tasks depend on consumer groups
internally. Connect defines the consumer group.id
conventionally
for each sink connector as connect-{name}
where {name}
is
substituted by the name of the connector. For example, if your sink
connector is named “hdfs-logs” and it reads from a topic named “logs,”
then you could add an ACL with the following command:
bin/kafka-acls --bootstrap-server localhost:9092 --command-config adminclient-configs.conf \
--add --allow-principal User:<Sink Connector Principal> \
--consumer --topic logs --group connect-hdfs-logs
Connectors that access the _confluent-command
topic must configure the
following ACLs:
- CREATE and DESCRIBE on the resource cluster, if the connector needs to create the topic.
- DESCRIBE, READ, and WRITE on the
_confluent-command
topic.
See License topic ACLs
for details about configuring ACLs for the _confluent-command
topic.
Enterprise Connector ACL Requirements¶
The _confluent-command
topic contains the license that corresponds to the
license key supplied through the confluent.license
property and is created
by default. Connectors that access the _confluent-command
topic must have
the following ACLs configured:
CREATE
andDESCRIBE
on the resource cluster, if the connector needs to create the topic.DESCRIBE
,READ
, andWRITE
on the_confluent-command
topic.
Connect Reporter¶
The Kafka Connect Reporter submits the result of a sink operation to a reporter topic. After successfully sinking a record or following an error condition, the Connect Reporter is called to submit the result report. The report is constructed to include details about how the original record was handled along with additional information about the sink event. These records are written to configurable success and error topics for further consumption. The following is an example of the basic Connect Reporter configuration properties added to a sink connector configuration:
reporter.bootstrap.servers=localhost:9092
reporter.result.topic.name=success-responses
reporter.result.topic.replication.factor=1
reporter.error.topic.name=error-responses
reporter.error.topic.replication.factor=1
To completely disable Connect Reporter, see Disabling Connect Reporter.
If you have a secure environment, you use configuration blocks for both an Admin Client and Producer. A Producer is constructed to send records to the reporter topic. The Admin Client creates the topic. Credentials need to be added in a secure environment. Example Admin and Producer properties are shown below:
reporter.admin.bootstrap.servers=localhost:9092
reporter.admin.sasl.jaas.config=org.apache.kafka.common.security.plain.PlainLoginModule \
required username="<username>" password="<password>";
reporter.admin.security.protocol=SASL_SSL
reporter.admin.sasl.mechanism=PLAIN
reporter.producer.bootstrap.servers=localhost:9092
reporter.producer.sasl.jaas.config=org.apache.kafka.common.security.plain.PlainLoginModule \
required username="<username>" password="<password>";
reporter.producer.security.protocol=SASL_SSL
reporter.producer.sasl.mechanism=PLAIN
Additional Reporter configuration property examples are provided in each applicable Kafka Connect sink connector document. For an example, see the Reporter properties in the HTTP Sink connecter.
Reporter and Kerberos security¶
The following configuration example shows a sink connector with all the necessary configuration properties for Reporter and Kerberos security. This example shows the Prometheus Metrics Sink Connector for Confluent Platform, but can be modified for any applicable sink connector.
{
"name" : "prometheus-connector",
"config" : {
"topics":"prediction-metrics",
"connector.class" : "io.confluent.connect.prometheus.PrometheusMetricsSinkConnector",
"tasks.max" : "1",
"confluent.topic.bootstrap.servers":"localhost:9092",
"confluent.topic.ssl.truststore.location":"/etc/pki/hadoop/kafkalab.jks",
"confluent.topic.ssl.truststore.password":"xxxx",
"confluent.topic.ssl.keystore.location":"/etc/pki/hadoop/kafkalab.jks",
"confluent.topic.ssl.keystore.password":"xxxx",
"confluent.topic.ssl.key.password":"xxxx",
"confluent.topic.security.protocol":"SASL_SSL",
"confluent.topic.replication.factor": "3",
"confluent.topic.sasl.kerberos.service.name":"kafka",
"confluent.topic.sasl.jaas.config":"com.sun.security.auth.module.Krb5LoginModule required \nuseKeyTab=true \nstoreKey=true \nkeyTab=\"/etc/security/keytabs/svc.kfkconnect.lab.keytab\" \nprincipal=\"svc.kfkconnect.lab@DS.DTVENG.NET\";",
"prometheus.scrape.url": "http://localhost:8889/metrics",
"value.converter": "org.apache.kafka.connect.json.JsonConverter",
"value.converter.schemas.enable": "false",
"behavior.on.error": "LOG",
"reporter.result.topic.replication.factor": "3",
"reporter.error.topic.replication.factor": "3",
"reporter.bootstrap.servers":"localhost:9092",
"reporter.producer.ssl.truststore.location":"/etc/pki/hadoop/kafkalab.jks",
"reporter.producer.ssl.truststore.password":"xxxx",
"reporter.producer.ssl.keystore.location":"/etc/pki/hadoop/kafkalab.jks",
"reporter.producer.ssl.keystore.password":"xxxx",
"reporter.producer.ssl.key.password":"xxxx",
"reporter.producer.security.protocol":"SASL_SSL",
"reporter.producer.sasl.kerberos.service.name":"kafka",
"reporter.producer.sasl.jaas.config":"com.sun.security.auth.module.Krb5LoginModule required \nuseKeyTab=true \nstoreKey=true \nkeyTab=\"/etc/security/keytabs/svc.kfkconnect.lab.keytab\" \nprincipal=\"svc.kfkconnect.lab@DS.DTVENG.NET\";",
"reporter.admin.ssl.truststore.location":"/etc/pki/hadoop/kafkalab.jks",
"reporter.admin.ssl.truststore.password":"xxxx",
"reporter.admin.ssl.keystore.location":"/etc/pki/hadoop/kafkalab.jks",
"reporter.admin.ssl.keystore.password":"xxxx",
"reporter.admin.ssl.key.password":"xxxx",
"reporter.admin.security.protocol":"SASL_SSL",
"reporter.admin.sasl.kerberos.service.name":"kafka",
"reporter.admin.sasl.jaas.config":"com.sun.security.auth.module.Krb5LoginModule required \nuseKeyTab=true \nstoreKey=true \nkeyTab=\"/etc/security/keytabs/svc.kfkconnect.lab.keytab\" \nprincipal=\"svc.kfkconnect.lab@DS.DTVENG.NET\";",
"confluent.license":"eyJ0eXAiOiJK ...omitted"
}
Role-Based Access Control¶
If your organization has enabled Role-Based Access Control (RBAC), you need to review your user principal, RBAC role, and RBAC role permissions before performing any Kafka Connect or Apache Kafka® cluster operations. Refer to Kafka Connect and RBAC to learn more about how RBAC is configured for Kafka Connect to protect your Kafka cluster.
Externalizing Secrets¶
You can use the ConfigProvider class interface to prevent secrets from appearing in cleartext in connector configurations.
The ConfigProvider class interface allows you to use variables in your worker configuration that are dynamically resolved upon startup. It also allows you to use variables in your connector configurations that are dynamically resolved when the connector is (re)started. You can use variables within configuration property values in place of secrets, or in place of any information that should be resolved dynamically at runtime.
Note
Connector configurations are persisted and shared over the Connect REST API with the variables. Only when the connector starts does it transiently resolve and replace variables in-memory. Secrets are never persisted in connector configs, logs, or in REST API requests and responses.
The Connect worker relies upon the named ConfigProviders defined in the worker configuration to resolve the variables. Each variable specifies the name of the ConfigProvider that should be used, and the information the ConfigProvider uses to resolve the variable into a replacement string.
All ConfigProvider
implementations are discovered using the standard Java
ServiceLoader
mechanism. To create a custom implementation of
ConfigProvider
, implement the ConfigProvider
interface. Package the implementation class(es) and a file named
META-INF/services/org.apache.kafka.common.config.provider.ConfigProvider
containing the fully qualified name of the ConfigProvider
implementation
class into a JAR file. Note that the JAR file can use third-party libraries
other than those provided by the Connect framework, but they must be
installed with the JAR file as described below.
To install the custom implementation, add a new subdirectory
containing the JAR files to the directory that is in Connect’s
plugin.path
and (re)start the Connect workers.
To implement a custom ConfigProvider
, add a new subdirectory containing the
JAR files under the default CLASSPATH
location—for example,
/usr/local/share/java
—and restart the Connect workers. Note that the
JAR files should not be placed under a subdirectory in Connect’s
plugin.path
. The plugin.path
locations are for connectors and their
specific dependent JARs and not for config.providers
.
When the Connect worker starts, it instantiates all ConfigProvider
implementations specified in the worker configuration. All properties prefixed
with config.providers.[provider].param.*
are passed to the configure()
method of the ConfigProvider
. When the Connect worker shuts down, it
calls the close()
method of the ConfigProvider
.
Important
Any worker in a Connect cluster must be able to resolve every variable in the worker configuration, and must be able to resolve all variables used in every connector configuration. This requires the following:
- All of the workers in a Connect cluster must have the same set of named config providers.
- Each provider on every worker must have access to any resources required to resolve variables used in the worker config or in the connector configs.
The following configuration properties are added to the distributed worker configuration to make this work:
config.providers
: A comma-separated list of names for providers.config.providers.{name}.class
: The Java class name for a provider.config.providers.{name}.param.{param-name}
: A parameter (or parameters) to be passed to the above Java class on initialization.
FileConfigProvider¶
Kafka provides an implementation of ConfigProvider
called
FileConfigProvider
that allows variable references to be replaced with
values from local files on each worker. For example, rather than having a secret
in a configuration property, you can put the secret in a local file and use a
variable in connector configurations. When the connector is started, Connect
will use the file config provider to resolve and replace the variable with the
actual secret, ensuring that the connector configuration does not include the
secret when it is persisted and shared over the Connect REST API.
Important
Every worker in the Connect cluster must have access to the files referenced by all variables referencing the config provider.
Variables that refer by name to a FileConfigProvider
should be in the form
${provider:[path:]key}
. The path is the fully-qualified path of the
property file on each Connect worker; the key is the name of the key within
that property file. Note that the FileConfigProvider
supports reading any
file, where the path (and property key in that file) is specified in each
variable. When Connect resolves one of these variables, it will read the
properties file, extract the value for the corresponding key, and replace the
whole variable with that value.
The following shows a JDBC connector configuration that includes the database URL, username, and password:
connection.url=jdbc:oracle:thin:@myhost:1521:orcl
connection.user=scott
connection.password=<my-secret-password>
Instead of having these details exposed in the connector configuration, you can
use FileConfigProvider
and store them in a file accessible to each
Connect worker and protect them from other OS users. In the following
examples, the separate file is named /opt/connect-secrets.properties
. The
properties added to /opt/connect-secrets.properties
are listed below:
productsdb-url=jdbc:oracle:thin:@myhost:1521:orcl
productsdb-username=scott
productsdb-password=my-secret-password
other-connector-url=jdbc:oracle:thin:@myhost:1521:orcl
other-connector-username=customers
other-connector-password=superSecret!
Then, you can configure each Connect worker to use FileConfigProvider
. The worker configuration would include the following properties:
# Additional properties added to the worker configuration
config.providers=file
config.providers.file.class=org.apache.kafka.common.config.provider.FileConfigProvider
The JDBC connector configuration can now use variables in place of the secrets:
# Additional properties added to the connector configuration
connection.url=${file:/opt/connect-secrets.properties:productsdb-url}
connection.user=${file:/opt/connect-secrets.properties:productsdb-username}
connection.password=${file:/opt/connect-secrets.properties:productsdb-password}
Another connector configuration could use variables with a different file, or variables that use different properties in the same file:
# Additional properties added to another connector configuration
connection.url=${file:/opt/connect-secrets.properties:other-connector-url}
connection.user=${file:/opt/connect-secrets.properties:other-connector-username}
connection.password=${file:/opt/connect-secrets.properties:other-connector-password}
InternalSecretConfigProvider¶
Confluent Platform provides another implementation of ConfigProvider
named
InternalSecretConfigProvider
which is used with the Connect
Secret Registry. The
InternalSecretConfigProvider
requires Role-based access control (RBAC) with Secret Registry. The Secret Registry is a secret serving
layer that enables Connect to store encrypted Connect credentials in a
topic exposed through a REST API. This eliminates any unencrypted credentials
being located in the actual connector configuration. The following example shows
how InternalSecretConfigProvider
is configured in the worker configuration
file:
### Secret Provider
config.providers=secret
config.providers.secret.class=io.confluent.connect.secretregistry.rbac.config.provider.InternalSecretConfigProvider
config.providers.secret.param.master.encryption.key=<encryption key>
config.providers.secret.param.kafkastore.bootstrap.servers=SASL_PLAINTEXT://<Kafka broker URLs>
config.providers.secret.param.kafkastore.security.protocol=SASL_PLAINTEXT
config.providers.secret.param.kafkastore.sasl.mechanism=OAUTHBEARER
config.providers.secret.param.kafkastore.sasl.login.callback.handler.class=io.confluent.kafka.clients.plugins.auth.token.TokenUserLoginCallbackHandler
config.providers.secret.param.kafkastore.sasl.jaas.config=org.apache.kafka.common.security.oauthbearer.OAuthBearerLoginModule required \
username="<service-principal-username>" \
password="<service-principal-password>" \
metadataServerUrls="<metadata server URLs>";
Using Secrets with a JSON Connector Configuration¶
Use the following steps to use encrypted secrets for a connector JSON configuration.
Create the
security.properties
file to store the master encryption key. You can use an existingsecurity.properties
file if the Secrets feature is already enabled for the Confluent Platform environment. Note that secrets are managed using confluent secret commands.If you created a new
security.properties
file, create a directory to store the file. For example:mkdir /usr/secrets
Choose a location for the secrets file on the local host and not a location where Confluent Platform services run. The secrets file contains encrypted secrets for the master encryption key, data encryption key, configuration parameters, and metadata such as the cipher used for encryption.
Generate the master encryption key based on a passphrase. For example:
confluent secret master-key generate \ --local-secrets-file /usr/secrets/security.properties \ --passphrase @<passphrase.txt>
Save the master key. It cannot be retrieved later.
+------------+----------------------------------------------+ | Master Key | abc123def456ghi789JKLMNOP012346Qrst789123ab= | +------------+----------------------------------------------+
Export the master key using the following environment variable. You can also add the master key to a bash script. If the master key is not exported, any subsequent secret commands will fail.
export CONFLUENT_SECURITY_MASTER_KEY=abc123def456ghi789JKLMNOP012346Qrst789123ab=
Create a working properties file that contains only the properties you want to encrypt. The name of the file is not important. It’s used to create the property key and encrypted secret added to the
security.properties
file. For example, createmy-jdbc-connector.properties
with the following secret:database.password=super-secret
Encrypt the properties in the working
my-jdbc-connector.properties
file.confluent secret file encrypt --config-file my-jdbc-connect.properties \ --local-secrets-file /usr/secrets/security.properties \ --remote-secrets-file /usr/secrets/security.properties \ --config "database.password"
This command updates the
/usr/secrets/security.properties
file with the encrypteddatabase.password
. For example:my-jdbc-connector.properties/database.password = ENC[AES/CBC/PKCS5Padding,data:CUpHh5lRDfIfqaL49V3iGw==,iv:vPBmPkctA+yYGVQuOFmQJw==,type:str]
You use the property key (that is,
my-jdbc-connector.properties/database.password
) in the connector configuration. The working properties filemy-jdbc-connector.properties
file can be discarded.Distribute the updated
security.properties
file to all Connect worker nodes where you want the connector to run. This assumes that Connect is configured and restarted using the master password.Add the variable in place of the secret to the JSON connector configuration file. The variable is in the form
{{$ {securepass:<path-to-secret-file>:<secret-key>} }}
, where<path-to-secret-file>
is the path to the sharedsecurity.properties
file containing the encrypted secret. For example:{ "name": "my-jdbc-connector", "config": { "connector.class": "io.confluent.connect.jdbc.JdbcSourceConnector", "connection.url": "jdbc:oracle:thin:@//xx.xx.xx.xx:xxxx/xe", "connection.user": "<username>", "connection.password": "${securepass:/usr/secrets/security.properties:my-jdbc-connector.properties/database.password}", "mode": "bulk", "query": "select * from Foo" } }
You can use any number of secrets in the connector configuration file. The variables are generally set to the entire configuration property value as shown in the example configuration above.
Enable externalized secrets in the distributed Connect worker properties.
### enable Externalized Secrets ### config.providers=securepass config.providers.securepass.class=io.confluent.kafka.security.config.provider.SecurePassConfigProvider
Deploy the connector using the JSON configuration file.
When Connect starts the connector, it resolves the variable by looking up the secret with a matching secret key in the secrets file, uses the master key to decrypt the secret, replaces the variable(s) with the decrypted secret(s), and passes this configuration to the connector.
Configuring the Connect REST API for HTTP or HTTPS¶
By default you can make REST API calls over HTTP with Kafka Connect. You can also configure Connect to allow either HTTP or HTTPS, or both.
The listeners configuration parameter determines the protocol used by Kafka Connect. This configuration should contain a
list of listeners in this format: protocol://host:port,protocol2://host2:port2
. For example:
listeners=http://localhost:8080,https://localhost:8443
By default, if no listeners are specified, the REST server runs on port 8083 using the HTTP protocol. When using HTTPS,
the configuration must include the SSL configuration. By default, it will use the ssl.*
settings. You can use a different
configuration for the REST API than for the Kafka brokers, by using the listeners.https
prefix. If you use the listeners.https
prefix, the ssl.*
options are ignored.
You can use the following fields to configure HTTPS for the REST API:
ssl.keystore.location
ssl.keystore.password
ssl.keystore.type
ssl.key.password
ssl.truststore.location
ssl.truststore.password
ssl.truststore.type
ssl.enabled.protocols
ssl.provider
ssl.protocol
ssl.cipher.suites
ssl.keymanager.algorithm
ssl.secure.random.implementation
ssl.trustmanager.algorithm
ssl.endpoint.identification.algorithm
ssl.client.auth
For more information, see Distributed Worker Configuration.
The REST API is used to monitor and manage Kafka Connect and for Kafka Connect cross-cluster communication. Requests that are
received on the follower nodes REST API are forwarded on to the leader node REST API. If the URI host is different from the
URI that it listens on, you can change the URI with the rest.advertised.host.name
, rest.advertised.port
and
rest.advertised.listener
configuration options. This URI will be used by the follower nodes to connect with the leader.
When using both HTTP and HTTPS listeners, you can use the rest.advertised.listener
option to define which listener is
used for the cross-cluster communication. When using HTTPS for communication between nodes, the same ssl.*
or listeners.https
options are used to configure the HTTPS client.
These are the currently supported REST API endpoints:
GET /connectors
- Return a list of active connectors.POST /connectors
- Create a new connector; the request body should be a JSON object containing a string name field and an object config field with the connector configuration parameters.GET /connectors/{name}
- Get information about a specific connector.GET /connectors/{name}/config
- Get the configuration parameters for a specific connector.PUT /connectors/{name}/config
- Update the configuration parameters for a specific connector.GET /connectors/{name}/status
- Get the current status of the connector, including whether it is running, failed, or paused; which worker it is assigned to, error information if it has failed, and the state of all its tasks.GET /connectors/{name}/tasks
- Get a list of tasks currently running for a connector.GET /connectors/{name}/tasks/{taskid}/status
- Get current status of the task, including if it is running, failed, or paused; which worker it is assigned to, and error information if it has failed.PUT /connectors/{name}/pause
- Pause the connector and its tasks, which stops message processing until the connector is resumed.PUT /connectors/{name}/resume
- Resumes a paused connector or does nothing if the connector is not paused.POST /connectors/{name}/restart
- Restart a connector. This is typically used because it has failed.POST /connectors/{name}/tasks/{taskId}/restart
- Restart an individual task. This is typically used because it has failed.DELETE /connectors/{name}
- Delete a connector, halting all tasks and deleting its configuration.
You can also use Kafka Connect REST API to get information about connector plugins:
GET /connector-plugins
- Returns a list of connector plugins installed in the Kafka Connect cluster. The API only checks for connectors on the worker that handles the request. This means you might see inconsistent results, especially during a rolling upgrade if you add new connector JARs.PUT /connector-plugins/{connector-type}/config/validate
- Validate the provided configuration values against the configuration definition. This API performs per config validation, returns suggested values and error messages during validation.
For more information, see REST API.
For demo of Kafka Connect configured with an HTTPS endpoint, and Confluent Control Center connecting to it, check out Confluent Platform demo.