.. _connect_security: |kconnect-long| Security Basics ------------------------------- ---------- Encryption ---------- If you have enabled TLS/SSL encryption in your |ak-tm| cluster, then you must make sure that |kconnect-long| is also configured for security. Click on the section to configure encryption in |kconnect-long|: :ref:`Encryption with TLS/SSL ` -------------- Authentication -------------- If you have enabled authentication in your |ak| cluster, then you must make sure that |kconnect-long| is also configured for security. Click on the section to configure authentication in |kconnect-long|: * :ref:`Authentication with TLS/SSL ` * :ref:`Authentication with SASL/GSSAPI ` * :ref:`Authentication with SASL/SCRAM ` * :ref:`Authentication with SASL/PLAIN ` .. _separate-principals: ------------------- Separate principals ------------------- Within the |kconnect| worker configuration, all properties having a prefix of ``producer.`` and ``consumer.`` are applied to all source and sink connectors created in the worker. The ``admin.`` prefix is used for error reporting in sink connectors. The following describes how these prefixes are used: * The ``consumer.`` prefix controls consumer behavior for sink connectors. * The ``producer.`` prefix controls producer behavior for source connectors. * Both the ``producer.`` and ``admin.`` prefixes control producer and client behavior for sink connector error reporting. You can override these properties for individual connectors using the ``producer.override.``, ``consumer.override.``, and ``admin.override.`` prefixes. This includes overriding the worker service principal configuration to create separate service principals for each connector. Overrides are disabled by default. They are enabled using the ``connector.client.config.override.policy`` worker property. This property sets the per-connector overrides the worker permits. The out-of-the-box (OOTB) options for the override policy are: * ``connector.client.config.override.policy=None`` Default. Does not allow any configuration overrides. * ``connector.client.config.override.policy=Principal`` Allows overrides for the ``security.protocol``, ``sasl.jaas.config``, and ``sasl.mechanism`` configuration properties, using the ``producer.override.``, ``consumer.override``, and ``admin.override`` prefixes. * ``connector.client.config.override.policy=All`` Allows overrides for all configuration properties using the ``producer.override.``, ``consumer.override``, and ``admin.override`` prefixes. .. tip:: You can write your own implementation of the `ConnectorClientConfigOverridePolicy `__ class if any of the OOTB policies don't meet your needs. If your |ak| broker supports client authentication over SSL, you can configure a separate principal for the worker and the connectors. In this case, you need to :ref:`generate a separate certificate ` for each of them and install them in separate keystores. The key |kconnect| configuration differences are as follows, notice the unique password, keystore location, and keystore password: .. codewithvars:: bash # Authentication settings for Connect workers ssl.keystore.location=/var/private/ssl/kafka.worker.keystore.jks ssl.keystore.password=worker1234 ssl.key.password=worker1234 |kconnect| workers manage the producers used by source connectors and the consumers used by sink connectors. So, for the connectors to leverage security, you also have to override the default producer/consumer configuration that the worker uses. .. codewithvars:: bash # Authentication settings for Connect producers used with source connectors producer.ssl.keystore.location=/var/private/ssl/kafka.source.keystore.jks producer.ssl.keystore.password=connector1234 producer.ssl.key.password=connector1234 # Authentication settings for Connect consumers used with sink connectors consumer.ssl.keystore.location=/var/private/ssl/kafka.sink.keystore.jks consumer.ssl.keystore.password=connector1234 consumer.ssl.key.password=connector1234 .. _connect-acl-considerations: ------------------ ACL Considerations ------------------ Using separate principals for the connectors allows you to define :ref:`access control lists ` (ACLs) with finer granularity. For example, you can use this capability to prevent the connectors themselves from writing to any of internal topics used by the |kconnect| cluster. Additionally, you can use different keystores for source and sink connectors and enable scenarios where source connectors have only write access to a topic but sink connectors have only read access to the same topic. Worker ACL Requirements ~~~~~~~~~~~~~~~~~~~~~~~ Workers must be given access to the common group that all workers in a cluster join, and to all the :connect-common:`internal topics required by Connect|userguide.html#worker-configuration-properties-file`. Read and write access to the internal topics are always required, but create access is only required if the internal topics don't yet exist and |kconnect-long| is to automatically create them. The table below shows each required permission and the relevant configuration setting used to define its value. ============== ============ =================== Operation(s) Resource Configuration Item ============== ============ =================== Create Cluster ``config.storage.topic`` Create Cluster ``config.storage.replication.factor`` Create Cluster ``offset.storage.topic`` Create Cluster ``offset.storage.partitions`` Create Cluster ``offset.storage.replication.factor`` Create Cluster ``status.storage.topic`` Create Cluster ``status.storage.partitions`` Create Cluster ``status.storage.replication.factor`` Read/Write Topic ``config.storage.topic`` Read/Write Topic ``offsets.storage.topic`` Read/Write Topic ``status.storage.topic`` Read Group ``group.id`` ============== ============ =================== See :ref:`kafka_adding_acls` for documentation on creating new ACLs from the command line. Connector ACL Requirements ~~~~~~~~~~~~~~~~~~~~~~~~~~ Source connectors must be given ``WRITE`` permission to any topics that they need to write to. Similarly, sink connectors need ``READ`` permission to any topics they read from. They also need Group ``READ`` permission since sink tasks depend on consumer groups internally. |kconnect| defines the consumer ``group.id`` conventionally for each sink connector as ``connect-{name}`` where ``{name}`` is substituted by the name of the connector. For example, if your sink connector is named "hdfs-logs" and it reads from a topic named "logs," then you could add an ACL with the following command: .. codewithvars:: bash kafka-acls --bootstrap-server localhost:9092 --command-config adminclient-configs.conf \ --add --allow-principal User: \ --consumer --topic logs --group connect-hdfs-logs Connectors that access the ``_confluent-command`` topic must configure the following ACLs: - CREATE and DESCRIBE on the resource cluster, if the connector needs to create the topic. - DESCRIBE, READ, and WRITE on the ``_confluent-command`` topic. See `License topic ACLs `__ for details about configuring ACLs for the ``_confluent-command`` topic. Enterprise Connector ACL Requirements ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~ The ``_confluent-command`` topic contains the license that corresponds to the license key supplied through the ``confluent.license`` property and is created by default. Connectors that access the ``_confluent-command`` topic must have the following ACLs configured: - ``CREATE`` and ``DESCRIBE`` on the resource cluster, if the connector needs to create the topic. - ``DESCRIBE``, ``READ``, and ``WRITE`` on the ``_confluent-command`` topic. .. _connect_security-reporter: ------------------- |kconnect| Reporter ------------------- .. include:: includes/connect-reporter.rst Reporter and Kerberos security ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~ .. include:: includes/reporter-security-properties.rst ------------------------- Role-Based Access Control ------------------------- .. include:: rbac/includes/connect-rbac-intro.rst .. _connect_externalize-secrets: ------------------- Externalize Secrets ------------------- You can use the :platform:`ConfigProvider|clients/javadocs/javadoc/org/apache/kafka/common/config/provider/ConfigProvider.html` class interface to prevent secrets from appearing in cleartext in connector configurations. .. include:: includes/config-provider.rst Use Secrets with a JSON Connector Configuration ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~ Use the following steps to use encrypted secrets for a connector JSON configuration. #. Create the ``security.properties`` file to store the master encryption key. You can use an existing ``security.properties`` file if the :ref:`Secrets ` feature is already enabled for the |cp| environment. Note that secrets are managed using :confluent-cli:`confluent secret|command-reference/secret/index.html` commands. #. If you created a new ``security.properties`` file, create a directory to store the file. For example: .. code-block:: bash mkdir /usr/secrets Choose a location for the secrets file on the local host and not a location where |cp| services run. The secrets file contains encrypted secrets for the master encryption key, data encryption key, configuration parameters, and metadata such as the cipher used for encryption. #. Generate the master encryption key based on a passphrase. For example: .. code-block:: bash confluent secret master-key generate \ --local-secrets-file /usr/secrets/security.properties \ --passphrase @ #. Save the master key. It cannot be retrieved later. .. code-block:: bash +------------+----------------------------------------------+ | Master Key | abc123def456ghi789JKLMNOP012346Qrst789123ab= | +------------+----------------------------------------------+ #. Export the master key using the following environment variable. You can also add the master key to a bash script. If the master key is not exported, any subsequent secret commands fail. .. code-block:: bash export CONFLUENT_SECURITY_MASTER_KEY=abc123def456ghi789JKLMNOP012346Qrst789123ab= #. Create a working properties file that contains only the properties you want to encrypt. The name of the file is not important. It's used to create the property key and encrypted secret added to the ``security.properties`` file. For example, create ``my-jdbc-connector.properties`` with the following secret: .. code-block:: bash database.password=super-secret #. Encrypt the properties in the working ``my-jdbc-connector.properties`` file. .. code-block:: bash confluent secret file encrypt --config-file my-jdbc-connect.properties \ --local-secrets-file /usr/secrets/security.properties \ --remote-secrets-file /usr/secrets/security.properties \ --config "database.password" This command updates the ``/usr/secrets/security.properties`` file with the encrypted ``database.password``. For example: .. code-block:: bash my-jdbc-connector.properties/database.password = ENC[AES/GCM/NoPadding,data:CUpHh5lRDfIfqaL49V3iGw==,iv:vPBmPkctA+yYGVQuOFmQJw==,type:str] You use the property key (that is, ``my-jdbc-connector.properties/database.password``) in the connector configuration. The working properties file ``my-jdbc-connector.properties`` file can be discarded. #. Distribute the updated ``security.properties`` file to all |kconnect| worker nodes where you want the connector to run. This assumes that |kconnect| is configured and restarted using the master password. #. Add the variable in place of the secret to the JSON connector configuration file. The variable is in the form ``{{$ {securepass::} }}``, where ```` is the path to the shared ``security.properties`` file containing the encrypted secret. For example: .. code-block:: json { "name": "my-jdbc-connector", "config": { "connector.class": "io.confluent.connect.jdbc.JdbcSourceConnector", "connection.url": "jdbc:oracle:thin:@//xx.xx.xx.xx:xxxx/xe", "connection.user": "", "connection.password": "${securepass:/usr/secrets/security.properties:my-jdbc-connector.properties/database.password}", "mode": "bulk", "query": "select * from Foo" } } You can use any number of secrets in the connector configuration file. The variables are generally set to the entire configuration property value as shown in the example configuration above. #. Enable externalized secrets in the distributed |kconnect| worker properties. .. code-block:: properties ### enable Externalized Secrets ### config.providers=securepass config.providers.securepass.class=io.confluent.kafka.security.config.provider.SecurePassConfigProvider #. Deploy the connector using the JSON configuration file. When |kconnect| starts the connector, it resolves the variable by looking up the secret with a matching secret key in the secrets file, uses the master key to decrypt the secret, replaces the variable(s) with the decrypted secret(s), and passes this configuration to the connector. .. _connect-rest-api-http: ----------------------------------------------------- Configuring the |kconnect| REST API for HTTP or HTTPS ----------------------------------------------------- By default you can make REST API calls over HTTP with |kconnect-long|. You can also configure |kconnect| to allow either HTTP or HTTPS, or both. The listeners configuration parameter determines the protocol used by |kconnect-long|. This configuration should contain a list of listeners in this format: ``protocol://host:port,protocol2://host2:port2``. For example: .. codewithvars:: bash listeners=http://localhost:8080,https://localhost:8443 By default, if no listeners are specified, the REST server runs on port 8083 using the HTTP protocol. When using HTTPS, the configuration must include the TLS/SSL configuration. By default, it will use the ``ssl.*`` settings. You can use a different configuration for the REST API than for the |ak| brokers, by using the ``listeners.https`` prefix. If you use the ``listeners.https`` prefix, the ``ssl.*`` options are ignored. You can use the following fields to configure HTTPS for the REST API: - ``ssl.keystore.location`` - ``ssl.keystore.password`` - ``ssl.keystore.type`` - ``ssl.key.password`` - ``ssl.truststore.location`` - ``ssl.truststore.password`` - ``ssl.truststore.type`` - ``ssl.enabled.protocols`` - ``ssl.provider`` - ``ssl.protocol`` - ``ssl.cipher.suites`` - ``ssl.keymanager.algorithm`` - ``ssl.secure.random.implementation`` - ``ssl.trustmanager.algorithm`` - ``ssl.endpoint.identification.algorithm`` - ``ssl.client.auth`` For ``ssl.*`` configuration property definitions and a list of all |kconnect| configuration properties, see :ref:`cp-config-connect`. The REST API is used to monitor and manage |kconnect-long| and for |kconnect-long| cross-cluster communication. Requests that are received on the follower nodes REST API are forwarded on to the leader node REST API. If the URI host is different from the URI that it listens on, you can change the URI with the ``rest.advertised.host.name``, ``rest.advertised.port`` and ``rest.advertised.listener`` configuration options. This URI will be used by the follower nodes to connect with the leader. When using both HTTP and HTTPS listeners, you can use the ``rest.advertised.listener`` option to define which listener is used for the cross-cluster communication. When using HTTPS for communication between nodes, the same ``ssl.*`` or ``listeners.https`` options are used to configure the HTTPS client. These are the currently supported REST API endpoints: - ``GET /connectors`` - Return a list of active connectors. - ``POST /connectors`` - Create a new connector; the request body should be a JSON object containing a string name field and an object configuration field with the connector configuration parameters. - ``GET /connectors/{name}`` - Get information about a specific connector. - ``GET /connectors/{name}/config`` - Get the configuration parameters for a specific connector. - ``PUT /connectors/{name}/config`` - Update the configuration parameters for a specific connector. - ``GET /connectors/{name}/status`` - Get the current status of the connector, including whether it is running, failed, or paused; which worker it is assigned to, error information if it has failed, and the state of all its tasks. - ``GET /connectors/{name}/tasks`` - Get a list of tasks currently running for a connector. - ``GET /connectors/{name}/tasks/{taskid}/status`` - Get current status of the task, including if it is running, failed, or paused; which worker it is assigned to, and error information if it has failed. - ``PUT /connectors/{name}/pause`` - Pause the connector and its tasks, which stops message processing until the connector is resumed. - ``PUT /connectors/{name}/resume`` - Resumes a paused connector or does nothing if the connector is not paused. - ``POST /connectors/{name}/restart`` - Restart a connector. This is typically used because it has failed. - ``POST /connectors/{name}/tasks/{taskId}/restart`` - Restart an individual task. This is typically used because it has failed. - ``DELETE /connectors/{name}`` - Delete a connector, halting all tasks and deleting its configuration. You can also use |kconnect-long| REST API to get information about connector plugins: - ``GET /connector-plugins`` - Returns a list of connector plugins installed in the |kconnect-long| cluster. The API only checks for connectors on the worker that handles the request. This means you might see inconsistent results, especially during a rolling upgrade if you add new connector JARs. - ``PUT /connector-plugins/{connector-type}/config/validate`` - Validate the provided configuration values against the configuration definition. This API performs perconfigurationvalidation, returns suggested values and error messages during validation. For more information, see :ref:`REST API `. For demo of |kconnect-long| configured with an HTTPS endpoint, and |c3| connecting to it, check out :ref:`Confluent Platform demo`.