.. _kafka_ssl_authentication: Encryption and Authentication with SSL -------------------------------------- SSL Overview ~~~~~~~~~~~~~~~~~~~ With SSL authentication, the server authenticates the client (also called "2-way authentication"). Since SSL authentication requires SSL encryption, this page shows you how to configure both at the same time. It is a superset of configurations required just for :ref:`SSL encryption `. .. include:: includes/intro_ssl.rst Creating SSL Keys and Certificates ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~ Refer to the Security Tutorial which describes how to :ref:`create SSL keys and certificates `. Brokers ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~ .. include:: includes/intro_brokers.rst * :ref:`Confluent Metrics Reporter ` 1. Configure the truststore, keystore, and password in the ``server.properties`` file of every broker. Since this stores passwords directly in the broker configuration file, it is important to restrict access to these files via file system permissions. .. codewithvars:: bash ssl.truststore.location=/var/private/ssl/kafka.server.truststore.jks ssl.truststore.password=test1234 ssl.keystore.location=/var/private/ssl/kafka.server.keystore.jks ssl.keystore.password=test1234 ssl.key.password=test1234 Note: ``ssl.truststore.password`` is technically optional but highly recommended. If a password is not set, access to the truststore is still available, but integrity checking is disabled. 2. If you want to enable SSL for inter-broker communication, add the following to the broker properties file (it defaults to ``PLAINTEXT``): .. codewithvars:: bash security.inter.broker.protocol=SSL 3. Configure the ports for the |ak-tm| brokers to listen for client and inter-broker ``SSL`` connections. We need to configure ``listeners``, and optionally ``advertised.listeners`` if the value is different from ``listeners``. .. codewithvars:: bash listeners=SSL://kafka1:9093 advertised.listeners=SSL://0.0.0.0:9093 4. Configure both ``SSL`` ports and ``PLAINTEXT`` ports if: * SSL is not enabled for inter-broker communication * Some clients connecting to the cluster do not use SSL .. codewithvars:: bash listeners=PLAINTEXT://kafka1:9092,SSL://kafka1:9093 advertised.listeners=PLAINTEXT://0.0.0.0:9092,SSL://0.0.0.0:9093 *Note* that ``advertised.host.name`` and ``advertised.port`` configure a single ``PLAINTEXT`` port and are *incompatible* with secure protocols. Please use ``advertised.listeners`` instead. 5. To enable the broker to authenticate clients (2-way authentication), you need to configure all the brokers for client authentication. We recommend configuring this to ``required``. (We discourage ``requested`` because misconfigured clients will still connect successfully and it provides a false sense of security) .. codewithvars:: bash ssl.client.auth=required .. note:: If any of the SASL authentication mechanisms are enabled for a given listener, then SSL client authentication is disabled even if ssl.client.auth=required is configured, and the broker will authenticate clients only via SASL on that listener. Optional settings ^^^^^^^^^^^^^^^^^^^^^^ ``ssl.endpoint.identification.algorithm`` The endpoint identification algorithm used by clients to validate server host name. The default value is ``HTTPS``. Clients including client connections created by the broker for inter-broker communication verify that the broker host name matches the host name in the broker's certificate. Server host name verification may be disabled by setting ``ssl.endpoint.identification.algorithm`` to an empty string. * Type: string * Default: none * Importance: medium ``ssl.cipher.suites`` A cipher suite is a named combination of authentication, encryption, MAC and key exchange algorithm used to negotiate the security settings for a network connection using TLS or SSL network protocol * Type: list * Default: null (by default, all supported cipher suites are enabled) * Importance: medium ``ssl.enabled.protocols`` The list of protocols enabled for SSL connections * Type: list * Default: TLSv1.2,TLSv1.1,TLSv1 * Importance: medium ``ssl.truststore.type`` The file format of the truststore file. * Type: string * Default: JKS * Importance: medium Due to import regulations in some countries, the Oracle implementation limits the strength of cryptographic algorithms available by default. If stronger algorithms are needed (for example, AES with 256-bit keys), the `JCE Unlimited Strength Jurisdiction Policy Files `_ must be obtained and installed in the JDK/JRE. See the `JCA Providers Documentation `_ for more information. .. _authentication-ssl-clients: Clients ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~ .. include:: includes/intro_clients.rst We assume client authentication is required by the broker in the following configuration example that you can store in a client properties file ``client-ssl.properties``. Since this stores passwords directly in the broker configuration file, it is important to restrict access to these files via file system permissions. .. codewithvars:: bash bootstrap.servers=kafka1:9093 security.protocol=SSL ssl.truststore.location=/var/private/ssl/kafka.client.truststore.jks ssl.truststore.password=test1234 ssl.keystore.location=/var/private/ssl/kafka.client.keystore.jks ssl.keystore.password=test1234 ssl.key.password=test1234 Note: ``ssl.truststore.password`` is technically optional but highly recommended. If a password is not set, access to the truststore is still available, but integrity checking is disabled. Examples using ``kafka-console-producer`` and ``kafka-console-consumer``, passing in the ``client-ssl.properties`` file with the properties defined above: .. codewithvars:: bash bin/kafka-console-producer --broker-list kafka1:9093 --topic test --producer.config client-ssl.properties bin/kafka-console-consumer --bootstrap-server kafka1:9093 --topic test --consumer.config client-ssl.properties --from-beginning Optional settings ^^^^^^^^^^^^^^^^^^^^ Here are some optional settings: ``ssl.endpoint.identification.algorithm`` The endpoint identification algorithm used by clients to validate server host name. The default value is ``HTTPS``. Clients verify that the broker host name matches the host name in the broker's certificate. Server host name verification may be disabled by setting ``ssl.endpoint.identification.algorithm`` to an empty string. * Type: string * Default: none * Importance: medium ``ssl.provider`` The name of the security provider used for SSL connections. Default value is the default security provider of the JVM. * Type: string * Default: null * Importance: medium ``ssl.cipher.suites`` A cipher suite is a named combination of authentication, encryption, MAC and key exchange algorithm used to negotiate the security settings for a network connection using TLS or SSL network protocol * Type: list * Default: null (by default, all supported cipher suites are enabled) * Importance: medium ``ssl.enabled.protocols`` The list of protocols enabled for SSL connections * Type: list * Default: TLSv1.2,TLSv1.1,TLSv1 * Importance: medium ``ssl.truststore.type`` The file format of the truststore file. * Type: string * Default: JKS * Importance: medium .. _authentication-ssl-zookeeper: |zk| ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~ The version of |zk| that is bundled with |ak| does not support SSL. .. _authentication-ssl-connect: |kconnect-long| ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~ .. include:: includes/intro_connect.rst * :ref:`Confluent Monitoring Interceptors ` * :ref:`Confluent Metrics Reporter ` .. _authentication-ssl-connect-workers: Configure the top-level settings in the Connect workers to use SSL by adding these properties in ``connect-distributed.properties``. These top-level settings are used by the Connect worker for group coordination and to read and write to the internal topics which are used to track the cluster's state (e.g. configs and offsets). Here we assume client authentication is required by the brokers. .. codewithvars:: bash bootstrap.servers=kafka1:9093 security.protocol=SSL ssl.truststore.location=/var/private/ssl/kafka.client.truststore.jks ssl.truststore.password=test1234 ssl.keystore.location=/var/private/ssl/kafka.client.keystore.jks ssl.keystore.password=test1234 ssl.key.password=test1234 Connect workers manage the producers used by source connectors and the consumers used by sink connectors. So, for the connectors to leverage security, you also have to override the default producer/consumer configuration that the worker uses. Here we assume client authentication is required by the brokers. Depending on whether the connector is a source or sink connector: * For source connectors: configure the same properties adding the ``producer`` prefix. .. codewithvars:: bash producer.bootstrap.servers=kafka1:9093 producer.security.protocol=SSL producer.ssl.truststore.location=/var/private/ssl/kafka.client.truststore.jks producer.ssl.truststore.password=test1234 producer.ssl.keystore.location=/var/private/ssl/kafka.client.keystore.jks producer.ssl.keystore.password=test1234 producer.ssl.key.password=test1234 * For sink connectors: configure the same properties adding the ``consumer`` prefix. .. codewithvars:: bash consumer.bootstrap.servers=kafka1:9093 consumer.security.protocol=SSL consumer.ssl.truststore.location=/var/private/ssl/kafka.client.truststore.jks consumer.ssl.truststore.password=test1234 consumer.ssl.keystore.location=/var/private/ssl/kafka.client.keystore.jks consumer.ssl.keystore.password=test1234 consumer.ssl.key.password=test1234 .. _authentication-ssl-replicator: |crep-full| ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~ .. include:: includes/intro_replicator.rst * :ref:`Kafka Connect ` To add SSL to the |crep-full| embedded consumer, modify the Replicator JSON properties file Here is an example subset of configuration properties to add for SSL encryption and authentication. Here we assume client authentication is required by the brokers. .. codewithvars:: bash { "name":"replicator", "config":{ .... "src.kafka.bootstrap.servers" : "kafka1:9093", "src.kafka.security.protocol" : "SSL", "src.kafka.ssl.truststore.location" : "/var/private/ssl/kafka.client.truststore.jks", "src.kafka.ssl.truststore.password" : "test1234", "src.kafka.ssl.keystore.location" : "/var/private/ssl/kafka.client.keystore.jks", "src.kafka.ssl.keystore.password" : "test1234", "src.kafka.ssl.key.password" : "test1234", .... } } } .. _authentication-ssl-c3: |c3| ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~ .. include:: includes/intro_c3.rst * :ref:`Confluent Metrics Reporter `: required on the production cluster being monitored * :ref:`Confluent Monitoring Interceptors `: optional if you are using Control Center streams monitoring Enable SSL for Control Center in the ``etc/confluent-control-center/control-center.properties`` file. Here we assume client authentication is required by the brokers. .. codewithvars:: bash confluent.controlcenter.streams.security.protocol=SSL confluent.controlcenter.streams.ssl.truststore.location=/var/private/ssl/kafka.client.truststore.jks confluent.controlcenter.streams.ssl.truststore.password=test1234 confluent.controlcenter.streams.ssl.keystore.location=/var/private/ssl/kafka.client.keystore.jks confluent.controlcenter.streams.ssl.keystore.password=test1234 confluent.controlcenter.streams.ssl.key.password=test1234 .. _authentication-ssl-metrics-reporter: |cmetric-full| ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~ This section describes how to enable SSL encryption and authentication for |cmetric-full|, which is used for |c3| and Auto Data Balancer. To add SSL for the |cmetric-full|, add the following to the ``server.properties`` file on the brokers in the |ak| cluster being monitored. Here we assume client authentication is required by the brokers. .. codewithvars:: bash confluent.metrics.reporter.bootstrap.servers=kafka1:9093 confluent.metrics.reporter.security.protocol=SSL confluent.metrics.reporter.ssl.truststore.location=/var/private/ssl/kafka.client.truststore.jks confluent.metrics.reporter.ssl.truststore.password=test1234 confluent.metrics.reporter.ssl.keystore.location=/var/private/ssl/kafka.client.keystore.jks confluent.metrics.reporter.ssl.keystore.password=test1234 confluent.metrics.reporter.ssl.key.password=test1234 .. _authentication-ssl-interceptors: Confluent Monitoring Interceptors ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~ .. include:: includes/intro_interceptors.rst Interceptors for General Clients ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^ For |c3| stream monitoring to work with |ak| clients, you must configure SSL encryption and authentication for the Confluent Monitoring Interceptors in each client. 1. Verify that the client has configured interceptors. * Producer: .. codewithvars:: bash interceptor.classes=io.confluent.monitoring.clients.interceptor.MonitoringProducerInterceptor * Consumer: .. codewithvars:: bash interceptor.classes=io.confluent.monitoring.clients.interceptor.MonitoringConsumerInterceptor Configure SSL encryption and authentication for the interceptor. Here we assume client authentication is required by the brokers. .. codewithvars:: bash confluent.monitoring.interceptor.bootstrap.servers=kafka1:9093 confluent.monitoring.interceptor.security.protocol=SSL confluent.monitoring.interceptor.ssl.truststore.location=/var/private/ssl/kafka.client.truststore.jks confluent.monitoring.interceptor.ssl.truststore.password=test1234 confluent.monitoring.interceptor.ssl.keystore.location=/var/private/ssl/kafka.client.keystore.jks confluent.monitoring.interceptor.ssl.keystore.password=test1234 confluent.monitoring.interceptor.ssl.key.password=test1234 Interceptors for |kconnect-long| ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^ For |c3| stream monitoring to work with |kconnect-long|, you need to configure SSL for the Confluent Monitoring Interceptors in |kconnect-long|. Here we assume client authentication is required by the brokers. Configure the Connect workers by adding these properties in ``connect-distributed.properties``, depending on whether the connectors are sources or sinks. * Source connector: configure the Confluent Monitoring Interceptors for SSL encryption and authentication with the ``producer`` prefix. .. codewithvars:: bash producer.interceptor.classes=io.confluent.monitoring.clients.interceptor.MonitoringProducerInterceptor producer.confluent.monitoring.interceptor.bootstrap.servers=kafka1:9093 producer.confluent.monitoring.interceptor.security.protocol=SSL producer.confluent.monitoring.interceptor.ssl.truststore.location=/var/private/ssl/kafka.client.truststore.jks producer.confluent.monitoring.interceptor.ssl.truststore.password=test1234 producer.confluent.monitoring.interceptor.ssl.keystore.location=/var/private/ssl/kafka.client.keystore.jks producer.confluent.monitoring.interceptor.ssl.keystore.password=test1234 producer.confluent.monitoring.interceptor.ssl.key.password=test1234 * Sink connector: configure the Confluent Monitoring Interceptors for SSL encryption and authentication with the ``consumer`` prefix. .. codewithvars:: bash consumer.interceptor.classes=io.confluent.monitoring.clients.interceptor.MonitoringConsumerInterceptor consumer.confluent.monitoring.interceptor.bootstrap.servers=kafka1:9093 consumer.confluent.monitoring.interceptor.security.protocol=SSL consumer.confluent.monitoring.interceptor.ssl.truststore.location=/var/private/ssl/kafka.client.truststore.jks consumer.confluent.monitoring.interceptor.ssl.truststore.password=test1234 consumer.confluent.monitoring.interceptor.ssl.keystore.location=/var/private/ssl/kafka.client.keystore.jks consumer.confluent.monitoring.interceptor.ssl.keystore.password=test1234 consumer.confluent.monitoring.interceptor.ssl.key.password=test1234 Interceptors for Replicator ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^ For |c3| stream monitoring to work with Replicator, you must configure SSL for the Confluent Monitoring Interceptors in the Replicator JSON configuration file. Here is an example subset of configuration properties to add for SSL encryption and authentication. .. codewithvars:: bash { "name":"replicator", "config":{ .... "src.consumer.group.id": "replicator", "src.consumer.interceptor.classes": "io.confluent.monitoring.clients.interceptor.MonitoringConsumerInterceptor", "src.consumer.confluent.monitoring.interceptor.security.protocol": "SSL", "src.consumer.confluent.monitoring.interceptor.bootstrap.servers": "kafka1:9093", "src.consumer.confluent.monitoring.interceptor.ssl.truststore.location": "/var/private/ssl/kafka.client.truststore.jks", "src.consumer.confluent.monitoring.interceptor.ssl.truststore.password": "test1234", "src.consumer.confluent.monitoring.interceptor.ssl.keystore.location": "/var/private/ssl/kafka.client.keystore.jks", "src.consumer.confluent.monitoring.interceptor.ssl.keystore.password": "test1234", "src.consumer.confluent.monitoring.interceptor.ssl.key.password": "test1234", .... } } } .. _authentication-ssl-schema-registry: |sr| ~~~~ .. include:: includes/intro_sr.rst The following is an example subset of ``schema-registry.properties`` configuration parameters to add for SSL encryption and authentication. Here we assume client authentication is required by the brokers. .. codewithvars:: bash kafkastore.bootstrap.servers=SSL://kafka1:9093 kafkastore.security.protocol=SSL kafkastore.ssl.truststore.location=/var/private/ssl/kafka.client.truststore.jks kafkastore.ssl.truststore.password=test1234 kafkastore.ssl.keystore.location=/var/private/ssl/kafka.client.keystore.jks kafkastore.ssl.keystore.password=test1234 kafkastore.ssl.key.password=test1234 .. _authentication-ssl-rest-proxy: REST Proxy ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~ Securing Confluent REST Proxy with SSL encryption and authentication requires that you configure security between: #. REST clients and the REST Proxy (HTTPS) #. REST proxy and the |ak| cluster You may also refer to the complete list of `REST Proxy configuration options `_. First configure HTTPS between REST clients and the REST Proxy. The following is an example subset of ``kafka-rest.properties`` configuration parameters to configure HTTPS. .. codewithvars:: bash ssl.truststore.location=/var/private/ssl/kafka.client.truststore.jks ssl.truststore.password=test1234 ssl.keystore.location=/var/private/ssl/kafka.client.keystore.jks ssl.keystore.password=test1234 ssl.key.password=test1234 Then, configure SSL encryption and authentication between REST proxy and the |ak| cluster. The following is an example subset of ``kafka-rest.properties`` configuration parameters to add for SSL encryption and authentication. Here we assume client authentication is required by the brokers. .. codewithvars:: bash client.bootstrap.servers=kafka1:9093 client.security.protocol=SSL client.ssl.truststore.location=/var/private/ssl/kafka.client.truststore.jks client.ssl.truststore.password=test1234 client.ssl.keystore.location=/var/private/ssl/kafka.client.keystore.jks client.ssl.keystore.password=test1234 client.ssl.key.password=test1234 .. include:: includes/ssl_logging.rst