.. title:: Encrypt and authenticate Confluent Platform and resources with TLS .. meta:: :description: Follow the procedures in this section to encrypt and authenticate Confluent Platform and its components using TLS. .. _kafka_ssl_authentication: Encrypt and Authenticate with TLS --------------------------------- TLS Overview ~~~~~~~~~~~~ |cp| supports `Transport Layer Security (TLS) `__ encryption based on `OpenSSL `__, an open source cryptography toolkit that provides an implementation of the Transport Layer Security (TLS) and Secure Socket Layer (SSL) protocols With TLS authentication, the server authenticates the client (also called "two-way authentication"). .. note:: You can learn more about securing your Kafka deployment in the free course, `Apache Kafka Security `_. Because TLS authentication requires TLS encryption, this page shows you how to configure both at the same time and is a superset of configurations required just for :ref:`TLS encryption `. .. include:: includes/intro_ssl.rst .. include:: ../includes/cp-demo-tip.rst Creating TLS Keys and Certificates ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~ Refer to the :ref:`security_tutorial`, which describes how to :ref:`create TLS keys and certificates `. Brokers ~~~~~~~ .. include:: includes/intro_brokers.rst * :ref:`Confluent Metrics Reporter ` .. note:: For details on all required and optional broker configuration properties, see :ref:`cp-config-brokers`. #. Configure the truststore, keystore, and password in the ``server.properties`` file of every broker. Because this stores passwords directly in the broker configuration file, it is important to restrict access to these files using file system permissions. .. codewithvars:: bash ssl.truststore.location=/var/private/ssl/kafka.server.truststore.jks ssl.truststore.password=test1234 ssl.keystore.location=/var/private/ssl/kafka.server.keystore.jks ssl.keystore.password=test1234 ssl.key.password=test1234 Note that ``ssl.truststore.password`` is technically optional, but strongly recommended. If a password is not set, access to the truststore is still available, but integrity checking is disabled. #. If you want to enable TLS for interbroker communication, add the following to the broker properties file, which defaults to ``PLAINTEXT``: .. codewithvars:: bash security.inter.broker.protocol=SSL #. Configure the ports for the |ak-tm| brokers to listen for client and interbroker TLS (``SSL``) connections. You should configure ``listeners``, and optionally, ``advertised.listeners`` if the value is different from ``listeners``. .. codewithvars:: bash listeners=SSL://kafka1:9093 advertised.listeners=SSL://localhost:9093 #. Configure both TLS (``SSL``) ports and ``PLAINTEXT`` ports if: * TLS is not enabled for interbroker communication * Some clients connecting to the cluster do not use TLS .. codewithvars:: bash listeners=PLAINTEXT://kafka1:9092,SSL://kafka1:9093 advertised.listeners=PLAINTEXT://localhost:9092,SSL://localhost:9093 Note that ``advertised.host.name`` and ``advertised.port`` configure a single ``PLAINTEXT`` port and are incompatible with secure protocols. Use ``advertised.listeners`` instead. #. To enable the broker to authenticate clients (two-way authentication), you must configure all the brokers for client authentication. Configure this to use ``required`` rather than ``requested`` because misconfigured clients can still connect successfully and this provides a false sense of security. .. codewithvars:: bash ssl.client.auth=required .. note:: If you specify ``ssl.client.auth=required``, client authentication fails if valid client certificates are not provided. SASL listeners can be enabled in parallel to mTLS if you have defined SASL listeners with the following listener prefix: .. code-block:: bash listener.name..ssl.client.auth For details, see `KIP-684 `__. Optional settings ^^^^^^^^^^^^^^^^^ ``ssl.endpoint.identification.algorithm`` The endpoint identification algorithm used by clients to validate server host name. The default value is ``https``. Clients including client connections created by the broker for interbroker communication verify that the broker host name matches the host name in the broker's certificate. Disable server host name verification by setting ``ssl.endpoint.identification.algorithm`` to an empty string. * Type: string * Default: https * Importance: medium ``ssl.cipher.suites`` A cipher suite is a named combination of authentication, encryption, MAC, and key exchange algorithm used to negotiate the security settings for a network connection (using the TLS network protocol). * Type: list * Default: null (by default, all supported cipher suites are enabled) * Importance: medium ``ssl.enabled.protocols`` The comma-separated list of protocols enabled for TLS connections. The default value is ``TLSv1.2,TLSv1.3`` when running with Java 11 or later, ``TLSv1.2`` otherwise. With the default value for Java 11 (``TLSv1.2,TLSv1.3``), |ak| clients and brokers prefer TLSv1.3 if both support it, and falls back to TLSv1.2 otherwise (assuming both support at least TLSv1.2). * Type: list * Default: ``TLSv1.2, TLSv1.3`` * Importance: medium ``ssl.truststore.type`` The file format of the truststore file. * Type: string * Default: JKS * Importance: medium Due to import regulations in some countries, the Oracle implementation limits the strength of cryptographic algorithms available by default. If stronger algorithms are needed (for example, AES with 256-bit keys), the `JCE Unlimited Strength Jurisdiction Policy Files `_ must be obtained and installed in the JDK/JRE. See the `JCA Providers Documentation `_ for more information. .. _authentication-ssl-clients: Clients ~~~~~~~ .. important:: If you are configuring this for |sr| or |crest|, you must prefix each parameter with ``confluent.license``. For example, ``sasl.mechanism`` becomes ``confluent.license.sasl.mechanism``. For additional information, see :ref:`kafka-rest-and-sasl-ssl-configs`. .. include:: includes/intro_clients.rst In the following configuration example, the underlying assumption is that client authentication is required by the broker so that you can store it in a client properties file ``client-ssl.properties``. Because this stores passwords directly in the broker configuration file, it is important to restrict access to these files using file system permissions. .. codewithvars:: bash bootstrap.servers=kafka1:9093 security.protocol=SSL ssl.truststore.location=/var/private/ssl/kafka.client.truststore.jks ssl.truststore.password=test1234 ssl.keystore.location=/var/private/ssl/kafka.client.keystore.jks ssl.keystore.password=test1234 ssl.key.password=test1234 Note that ``ssl.truststore.password`` is technically optional, but strongly recommended. If a password is not set, access to the truststore is still available, but integrity checking is disabled. The following examples use ``kafka-console-producer`` and ``kafka-console-consumer``, and pass in the ``client-ssl.properties`` defined above: .. codewithvars:: bash kafka-console-producer --broker-list kafka1:9093 --topic test --producer.config client-ssl.properties kafka-console-consumer --bootstrap-server kafka1:9093 --topic test --consumer.config client-ssl.properties --from-beginning Optional settings ^^^^^^^^^^^^^^^^^ ``ssl.endpoint.identification.algorithm`` The endpoint identification algorithm used by clients to validate server host name. The default value is ``https``. Clients including client connections created by the broker for interbroker communication verify that the broker host name matches the host name in the broker's certificate. Disable server host name verification by setting ``ssl.endpoint.identification.algorithm`` to an empty string. * Type: string * Default: https * Importance: medium ``ssl.provider`` The name of the security provider used for TLS connections. Default value is the default security provider of the JVM. * Type: string * Default: null * Importance: medium ``ssl.cipher.suites`` A cipher suite is a named combination of authentication, encryption, MAC, and key exchange algorithm used to negotiate the security settings for a network connection (using the TLS network protocol). * Type: list * Default: null (by default, all supported cipher suites are enabled) * Importance: medium ``ssl.enabled.protocols`` The comma-separated list of protocols enabled for TLS connections. The default value is ``TLSv1.2,TLSv1.3`` when running with Java 11 or later, ``TLSv1.2`` otherwise. With the default value for Java 11 (``TLSv1.2,TLSv1.3``), |ak| clients and brokers prefer TLSv1.3 if both support it, and falls back to TLSv1.2 otherwise (assuming both support at least TLSv1.2). * Type: list * Default: ``TLSv1.2,TLSv1.3`` * Importance: medium ``ssl.truststore.type`` The file format of the truststore file. * Type: string * Default: JKS * Importance: medium .. _authentication-ssl-zookeeper: |zk| ~~~~ Starting in |cp| version 5.5.0, the version of |zk| bundled with |ak| supports TLS. For details, refer to :ref:`kafka_adding_security`. .. important:: .. include:: ../includes/zk-deprecation.rst .. _authentication-ssl-connect: |kconnect-long| ~~~~~~~~~~~~~~~ .. include:: includes/intro_connect.rst * :ref:`Confluent Monitoring Interceptors ` * :ref:`Confluent Metrics Reporter ` .. _authentication-ssl-connect-workers: Configure the top-level settings in the |kconnect| workers to use TLS by adding these properties in ``connect-distributed.properties``. These top-level settings are used by the Connect worker for group coordination and to read and write to the internal topics that are used to track the cluster's state (for example, configs and offsets). The assumption here is that client authentication is required by the brokers. .. codewithvars:: bash bootstrap.servers=kafka1:9093 security.protocol=SSL ssl.truststore.location=/var/private/ssl/kafka.client.truststore.jks ssl.truststore.password=test1234 ssl.keystore.location=/var/private/ssl/kafka.client.keystore.jks ssl.keystore.password=test1234 ssl.key.password=test1234 |kconnect| workers manage the producers used by source connectors and the consumers used by sink connectors. So, for the connectors to leverage security, must also override the default producer/consumer configuration that the worker uses. The assumption here is that client authentication is required by the brokers. * For source connectors: configure the same properties adding the ``producer`` prefix. .. codewithvars:: bash producer.bootstrap.servers=kafka1:9093 producer.security.protocol=SSL producer.ssl.truststore.location=/var/private/ssl/kafka.client.truststore.jks producer.ssl.truststore.password=test1234 producer.ssl.keystore.location=/var/private/ssl/kafka.client.keystore.jks producer.ssl.keystore.password=test1234 producer.ssl.key.password=test1234 * For sink connectors: configure the same properties adding the ``consumer`` prefix. .. codewithvars:: bash consumer.bootstrap.servers=kafka1:9093 consumer.security.protocol=SSL consumer.ssl.truststore.location=/var/private/ssl/kafka.client.truststore.jks consumer.ssl.truststore.password=test1234 consumer.ssl.keystore.location=/var/private/ssl/kafka.client.keystore.jks consumer.ssl.keystore.password=test1234 consumer.ssl.key.password=test1234 .. important:: **Updating the certificate authority (CA)** When you update the certificate authority (CA), the |kconnect| workers must be restarted. You can pass the CA at the JVM level as shown here: .. code-block:: KAFKA_OPTS="-Djavax.net.ssl.trustStore=${CERTS_PATH}/truststore.jks \ -Djavax.net.ssl.trustStorePassword=$TRUSTSTORE_PASSWORD \ -Djavax.net.ssl.keyStore=${CERTS_PATH}/keystore.jks \ -Djavax.net.ssl.keyStorePassword=$KEYSTOR For more information, see :ref:`connect_security`. .. _authentication-ssl-replicator: |crep-full| ~~~~~~~~~~~ .. include:: includes/intro_replicator.rst * :ref:`Kafka Connect ` To add TLS to the |crep-full| embedded consumer, modify the Replicator JSON properties file. This example is a subset of configuration properties to add for TLS encryption and authentication. The assumption here is that client authentication is required by the brokers. .. codewithvars:: bash { "name":"replicator", "config":{ .... "src.kafka.ssl.truststore.location":"/etc/kafka/secrets/kafka.connect.truststore.jks", "src.kafka.ssl.truststore.password":"confluent", "src.kafka.ssl.keystore.location":"/etc/kafka/secrets/kafka.connect.keystore.jks", "src.kafka.ssl.keystore.password":"confluent", "src.kafka.ssl.key.password":"confluent", "src.kafka.security.protocol":"SSL" .... } } } .. seealso:: To see an example |crep-full| configuration, refer to the :devx-examples:`TLS source authentication demo script|/replicator-security/scripts/submit_replicator_source_ssl_auth.sh`. For demos of common security configurations refer to :devx-examples:`Replicator security demos|/replicator-security`. To configure |crep-full| for a destination cluster with TLS authentication, modify the |crep| JSON configuration to include the following: .. codewithvars:: bash { "name":"replicator", "config":{ .... "dest.kafka.ssl.truststore.location":"/etc/kafka/secrets/kafka.connect.truststore.jks", "dest.kafka.ssl.truststore.password":"confluent", "dest.kafka.ssl.keystore.location":"/etc/kafka/secrets/kafka.connect.keystore.jks", "dest.kafka.ssl.keystore.password":"confluent", "dest.kafka.ssl.key.password":"confluent", "dest.kafka.security.protocol":"SSL" .... } } } Additionally, the following properties are required in the Connect worker: .. codewithvars:: bash security.protocol=SSL ssl.truststore.location=/etc/kafka/secrets/kafka.connect.truststore.jks ssl.truststore.password=confluent ssl.keystore.location=/etc/kafka/secrets/kafka.connect.keystore.jks ssl.keystore.password=confluent ssl.key.password=confluent producer.security.protocol=SSL producer.ssl.truststore.location=/etc/kafka/secrets/kafka.connect.truststore.jks producer.ssl.truststore.password=confluent producer.ssl.keystore.location=/etc/kafka/secrets/kafka.connect.keystore.jks producer.ssl.keystore.password=confluent producer.ssl.key.password=confluent For more details, see :ref:`general security configuration for Connect workers `. .. seealso:: To see an example |crep-full| configuration, see the :devx-examples:`TLS destination authentication demo script|/replicator-security/scripts/submit_replicator_dest_ssl_auth.sh`. For demos of common security configurations see: :devx-examples:`Replicator security demos|/replicator-security`. .. _authentication-ssl-c3: |c3| ~~~~ You can configure TLS for |c3-short| so access is secured through HTTPS. In addition, |c3-short| uses |kstreams| as a state store, so if all the |ak| brokers in the cluster backing |c3| are secured, then |c3| also needs to be secured. Also, since the |c3-short| acts as a proxy server for other components, you can configure TLS for |c3-short| to secure its communication with other secured |cp| components. Enable TLS for |c3| in the ``etc/confluent-control-center/control-center.properties`` file. The assumption here is that client authentication is required by the brokers. For details on how to enable TLS for |c3-short| as a server or a proxy server, see :ref:`controlcenter_security_ssl`. .. _authentication-ssl-metrics-reporter: |cmetric-full| ~~~~~~~~~~~~~~ This section describes how to enable TLS encryption and authentication for |cmetric-full|, which is used for |c3| and Auto Data Balancer. To add TLS for the |cmetric-full|, add the following to ``server.properties`` on the brokers in the |ak| cluster being monitored. The assumption here is that client authentication is required by the brokers. .. codewithvars:: bash confluent.metrics.reporter.bootstrap.servers=kafka1:9093 confluent.metrics.reporter.security.protocol=SSL confluent.metrics.reporter.ssl.truststore.location=/var/private/ssl/kafka.client.truststore.jks confluent.metrics.reporter.ssl.truststore.password=test1234 confluent.metrics.reporter.ssl.keystore.location=/var/private/ssl/kafka.client.keystore.jks confluent.metrics.reporter.ssl.keystore.password=test1234 confluent.metrics.reporter.ssl.key.password=test1234 .. _authentication-ssl-interceptors: Confluent Monitoring Interceptors ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~ .. include:: includes/intro_interceptors.rst Interceptors for General Clients ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^ For |c3| stream monitoring to work with |ak| clients, you must configure TLS encryption and authentication for the Confluent Monitoring Interceptors in each client. #. Verify that the client has configured interceptors. * Producer: .. codewithvars:: bash interceptor.classes=io.confluent.monitoring.clients.interceptor.MonitoringProducerInterceptor * Consumer: .. codewithvars:: bash interceptor.classes=io.confluent.monitoring.clients.interceptor.MonitoringConsumerInterceptor Configure TLS encryption and authentication for the interceptor. The assumption here is that client authentication is required by the brokers. .. codewithvars:: bash confluent.monitoring.interceptor.bootstrap.servers=kafka1:9093 confluent.monitoring.interceptor.security.protocol=SSL confluent.monitoring.interceptor.ssl.truststore.location=/var/private/ssl/kafka.client.truststore.jks confluent.monitoring.interceptor.ssl.truststore.password=test1234 confluent.monitoring.interceptor.ssl.keystore.location=/var/private/ssl/kafka.client.keystore.jks confluent.monitoring.interceptor.ssl.keystore.password=test1234 confluent.monitoring.interceptor.ssl.key.password=test1234 .. _authentication-ssl-kafka-connect-monitoring: Interceptors for |kconnect-long| ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^ For |c3| stream monitoring to work with |kconnect-long|, you must configure TLS for the Confluent Monitoring Interceptors in |kconnect-long|. The assumption here is that client authentication is required by the brokers. Configure the |kconnect| workers by adding these properties in ``connect-distributed.properties``, depending on whether the connectors are sources or sinks. * Source connector: configure the Confluent Monitoring Interceptors for TLS encryption and authentication with the ``producer`` prefix. .. codewithvars:: bash producer.interceptor.classes=io.confluent.monitoring.clients.interceptor.MonitoringProducerInterceptor producer.confluent.monitoring.interceptor.bootstrap.servers=kafka1:9093 producer.confluent.monitoring.interceptor.security.protocol=SSL producer.confluent.monitoring.interceptor.ssl.truststore.location=/var/private/ssl/kafka.client.truststore.jks producer.confluent.monitoring.interceptor.ssl.truststore.password=test1234 producer.confluent.monitoring.interceptor.ssl.keystore.location=/var/private/ssl/kafka.client.keystore.jks producer.confluent.monitoring.interceptor.ssl.keystore.password=test1234 producer.confluent.monitoring.interceptor.ssl.key.password=test1234 * Sink connector: configure the Confluent Monitoring Interceptors for TLS encryption and authentication with the ``consumer`` prefix. .. codewithvars:: bash consumer.interceptor.classes=io.confluent.monitoring.clients.interceptor.MonitoringConsumerInterceptor consumer.confluent.monitoring.interceptor.bootstrap.servers=kafka1:9093 consumer.confluent.monitoring.interceptor.security.protocol=SSL consumer.confluent.monitoring.interceptor.ssl.truststore.location=/var/private/ssl/kafka.client.truststore.jks consumer.confluent.monitoring.interceptor.ssl.truststore.password=test1234 consumer.confluent.monitoring.interceptor.ssl.keystore.location=/var/private/ssl/kafka.client.keystore.jks consumer.confluent.monitoring.interceptor.ssl.keystore.password=test1234 consumer.confluent.monitoring.interceptor.ssl.key.password=test1234 .. _authentication-ssl-replicator-monitoring: Interceptors for Replicator ^^^^^^^^^^^^^^^^^^^^^^^^^^^ For |c3| stream monitoring to work with Replicator, you must configure TLS for the Confluent Monitoring Interceptors in the Replicator JSON configuration file. Here is an example subset of configuration properties to add for TLS encryption and authentication: .. codewithvars:: bash { "name":"replicator", "config":{ .... "src.consumer.group.id": "replicator", "src.consumer.interceptor.classes": "io.confluent.monitoring.clients.interceptor.MonitoringConsumerInterceptor", "src.consumer.confluent.monitoring.interceptor.security.protocol": "SSL", "src.consumer.confluent.monitoring.interceptor.bootstrap.servers": "kafka1:9093", "src.consumer.confluent.monitoring.interceptor.ssl.truststore.location": "/var/private/ssl/kafka.client.truststore.jks", "src.consumer.confluent.monitoring.interceptor.ssl.truststore.password": "test1234", "src.consumer.confluent.monitoring.interceptor.ssl.keystore.location": "/var/private/ssl/kafka.client.keystore.jks", "src.consumer.confluent.monitoring.interceptor.ssl.keystore.password": "test1234", "src.consumer.confluent.monitoring.interceptor.ssl.key.password": "test1234", .... } } } .. _authentication-encryption-ssl-self-balancing: Enable TLS in a Self-Balancing cluster ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~ To enable TLS encryption in a Self-Balancing cluster, add the following to the ``server.properties`` file on the brokers in the |ak| cluster. .. codewithvars:: bash confluent.rebalancer.metrics.security.protocol=SSL confluent.rebalancer.metrics.ssl.truststore.location=/etc/kafka/secrets/kafka.client.truststore.jks confluent.rebalancer.metrics.ssl.truststore.password=confluent confluent.rebalancer.metrics.ssl.keystore.location=/etc/kafka/secrets/kafka.client.keystore.jks confluent.rebalancer.metrics.ssl.keystore.password=confluent confluent.rebalancer.metrics.ssl.key.password=confluent .. _authentication-ssl-schema-registry: |sr| ~~~~ .. include:: includes/intro_sr.rst The following is an example subset of ``schema-registry.properties`` configuration parameters to add for TLS encryption and authentication. The assumption here is that client authentication is required by the brokers. .. codewithvars:: bash kafkastore.bootstrap.servers=SSL://kafka1:9093 kafkastore.security.protocol=SSL kafkastore.ssl.truststore.location=/var/private/ssl/kafka.client.truststore.jks kafkastore.ssl.truststore.password=test1234 kafkastore.ssl.keystore.location=/var/private/ssl/kafka.client.keystore.jks kafkastore.ssl.keystore.password=test1234 kafkastore.ssl.key.password=test1234 .. _authentication-ssl-rest-proxy: REST Proxy ~~~~~~~~~~ Securing |crest-long| with TLS encryption and authentication requires that you configure security between: #. REST clients and the REST Proxy (HTTPS) #. REST proxy and the |ak| cluster Also, refer to the complete list of :ref:`REST Proxy configuration options `. #. Configure HTTPS between REST clients and the REST Proxy. The following is an example subset of ``kafka-rest.properties`` configuration parameters to configure HTTPS. .. codewithvars:: bash ssl.truststore.location=/var/private/ssl/kafka.client.truststore.jks ssl.truststore.password=test1234 ssl.keystore.location=/var/private/ssl/kafka.client.keystore.jks ssl.keystore.password=test1234 ssl.key.password=test1234 #. Configure TLS encryption and authentication between REST proxy and the |ak| cluster. The following is an example subset of ``kafka-rest.properties`` configuration parameters to add for TLS encryption and authentication. The assumption here is that client authentication is required by the brokers. .. codewithvars:: bash client.bootstrap.servers=kafka1:9093 client.security.protocol=SSL client.ssl.truststore.location=/var/private/ssl/kafka.client.truststore.jks client.ssl.truststore.password=test1234 client.ssl.keystore.location=/var/private/ssl/kafka.client.keystore.jks client.ssl.keystore.password=test1234 client.ssl.key.password=test1234 .. include:: includes/ssl_logging.rst