.. _kafka_ssl_encryption: Encryption with SSL -------------------- **Table of Contents** .. contents:: :local: :depth: 1 SSL Overview ~~~~~~~~~~~~~~~~~~~ SSL can be configured for encryption or authentication. You may configure just SSL encryption and independently choose a separate mechanism for client authentication, e.g. :ref:`SSL `, :ref:`SASL `, etc. This section focuses on SSL encryption. .. include:: includes/intro_ssl.rst Creating SSL Keys and Certificates ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~ Refer to the Security Tutorial which describes how to :ref:`create SSL keys and certificates `. .. _encryption-ssl-brokers: Brokers ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~ .. include:: includes/intro_brokers.rst * :ref:`Confluent Metrics Reporter ` 1. Configure the password, truststore, and keystore in the ``server.properties`` file of every broker. Since this stores passwords directly in the broker configuration file, it is important to restrict access to these files via file system permissions. .. sourcecode:: bash ssl.truststore.location=/var/private/ssl/kafka.server.truststore.jks ssl.truststore.password=test1234 ssl.keystore.location=/var/private/ssl/kafka.server.keystore.jks ssl.keystore.password=test1234 ssl.key.password=test1234 2. If you want to enable SSL for inter-broker communication, add the following to the broker properties file (it defaults to ``PLAINTEXT``): .. sourcecode:: bash security.inter.broker.protocol=SSL 3. Tell the Kafka brokers on which ports to listen for client and inter-broker ``SSL`` connections. You must configure ``listeners``, and optionally ``advertised.listeners`` if the value is different from ``listeners``. .. sourcecode:: bash listeners=SSL://kafka1:9093 advertised.listeners=SSL://0.0.0.0:9093 4. Configure both ``SSL`` ports and ``PLAINTEXT`` ports if: * SSL is not enabled for inter-broker communication * Some clients connecting to the cluster do not use SSL .. sourcecode:: bash listeners=PLAINTEXT://kafka1:9092,SSL://kafka1:9093 advertised.listeners=PLAINTEXT://0.0.0.0:9092,SSL://0.0.0.0:9093 *Note* that ``advertised.host.name`` and ``advertised.port`` configure a single ``PLAINTEXT`` port and are *incompatible* with secure protocols. Please use ``advertised.listeners`` instead. Optional settings ^^^^^^^^^^^^^^^^^^^^^^ Here are some optional settings: ``ssl.cipher.suites`` A cipher suite is a named combination of authentication, encryption, MAC and key exchange algorithm used to negotiate the security settings for a network connection using TLS or SSL network protocol * Type: list * Default: null (by default, all supported cipher suites are enabled) * Importance: medium ``ssl.enabled.protocols`` The list of protocols enabled for SSL connections * Type: list * Default: TLSv1.2,TLSv1.1,TLSv1 * Importance: medium ``ssl.truststore.type`` The file format of the truststore file. * Type: string * Default: JKS * Importance: medium Due to import regulations in some countries, the Oracle implementation limits the strength of cryptographic algorithms available by default. If stronger algorithms are needed (for example, AES with 256-bit keys), the `JCE Unlimited Strength Jurisdiction Policy Files `_ must be obtained and installed in the JDK/JRE. See the `JCA Providers Documentation `_ for more information. .. _encryption-ssl-clients: Clients ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~ .. include:: includes/intro_clients.rst If client authentication is not required by the broker, the following is a minimal configuration example that you can store in a client properties file ``client-ssl.properties``. Since this stores passwords directly in the client configuration file, it is important to restrict access to these files via file system permissions. .. sourcecode:: bash bootstrap.servers=kafka1:9093 security.protocol=SSL ssl.truststore.location=/var/private/ssl/kafka.client.truststore.jks ssl.truststore.password=test1234 If client authentication via SSL is required, the client must provide the keystore as well. Please read the additional configurations required in :ref:`SSL Authentication `. Examples using ``kafka-console-producer`` and ``kafka-console-consumer``, passing in the ``client-ssl.properties`` file with the properties defined above: .. sourcecode:: bash bin/kafka-console-producer --broker-list kafka1:9093 --topic test --producer.config client-ssl.properties bin/kafka-console-consumer --bootstrap-server kafka1:9093 --topic test --new-consumer --consumer.config client-ssl.properties --from-beginning Optional settings ^^^^^^^^^^^^^^^^^^^^ Here are some optional settings: ``ssl.provider`` The name of the security provider used for SSL connections. Default value is the default security provider of the JVM. * Type: string * Default: null * Importance: medium ``ssl.cipher.suites`` A cipher suite is a named combination of authentication, encryption, MAC and key exchange algorithm used to negotiate the security settings for a network connection using TLS or SSL network protocol * Type: list * Default: null (by default, all supported cipher suites are enabled) * Importance: medium ``ssl.enabled.protocols`` The list of protocols enabled for SSL connections * Type: list * Default: TLSv1.2,TLSv1.1,TLSv1 * Importance: medium ``ssl.truststore.type`` The file format of the truststore file. * Type: string * Default: JKS * Importance: medium .. _encryption-ssl-zookeeper: ZooKeeper ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~ The version of ZooKeeper that is bundled with Apache Kafka does not support SSL. .. _encryption-ssl-connect: Kafka Connect ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~ .. include:: includes/intro_connect.rst * :ref:`Confluent Monitoring Interceptors ` * :ref:`Confluent Metrics Reporter ` .. _encryption-ssl-connect-workers: Configure the top-level settings in the Connect workers to use SSL by adding these properties in ``connect-distributed.properties``. These top-level settings are used by the Connect worker for group coordination and to read and write to the internal topics which are used to track the cluster's state (e.g. configs and offsets). .. sourcecode:: bash bootstrap.servers=kafka1:9093 security.protocol=SSL ssl.truststore.location=/var/private/ssl/kafka.client.truststore.jks ssl.truststore.password=test1234 Connect workers manage the producers used by source connectors and the consumers used by sink connectors. So, for the connectors to leverage security, you also have to override the default producer/consumer configuration that the worker uses. Depending on whether the connector is a source or sink connector: * For source connectors: configure the same properties adding the ``producer`` prefix. .. sourcecode:: bash producer.bootstrap.servers=kafka1:9093 producer.security.protocol=SSL producer.ssl.truststore.location=/var/private/ssl/kafka.client.truststore.jks producer.ssl.truststore.password=test1234 * For sink connectors: configure the same properties adding the ``consumer`` prefix. .. sourcecode:: bash consumer.bootstrap.servers=kafka1:9093 consumer.security.protocol=SSL consumer.ssl.truststore.location=/var/private/ssl/kafka.client.truststore.jks consumer.ssl.truststore.password=test1234 .. _encryption-ssl-replicator: Confluent Replicator ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~ .. include:: includes/intro_replicator.rst * :ref:`Kafka Connect ` To add SSL to the Confluent Replicator embedded consumer, modify the Replicator JSON properties file. Here is an example subset of configuration properties to add for SSL encryption: .. sourcecode:: bash { "name":"replicator", "config":{ .... "src.kafka.bootstrap.servers" : "kafka1:9093", "src.kafka.security.protocol" : "SSL", "src.kafka.ssl.truststore.location" : "var/private/ssl/kafka.server.truststore.jks", "src.kafka.ssl.truststore.password" : "test1234", .... } } } .. _encryption-ssl-c3: Confluent Control Center ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~ .. include:: includes/intro_c3.rst * :ref:`Confluent Metrics Reporter `: required on the production cluster being monitored * :ref:`Confluent Monitoring Interceptors `: optional if you are using Control Center streams monitoring Enable SSL for Control Center in the ``etc/confluent-control-center/control-center.properties`` file. .. sourcecode:: bash confluent.controlcenter.streams.security.protocol=SSL confluent.controlcenter.streams.ssl.truststore.location=/var/private/ssl/kafka.client.truststore.jks confluent.controlcenter.streams.ssl.truststore.password=test1234 .. _encryption-ssl-metrics-reporter: Confluent Metrics Reporter ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~ This section describes how to enable SSL encryption for Confluent Metrics Reporter, which is used for Confluent Control Center and Auto Data Balancer. To add SSL for the Confluent Metrics Reporter, add the following to the ``server.properties`` file on the brokers in the Kafka cluster being monitored. .. sourcecode:: bash confluent.metrics.reporter.bootstrap.servers=kafka1:9093 confluent.metrics.reporter.security.protocol=SSL confluent.metrics.reporter.ssl.truststore.location=/var/private/ssl/kafka.server.truststore.jks confluent.metrics.reporter.ssl.truststore.password=test1234 .. _encryption-ssl-interceptors: Confluent Monitoring Interceptors ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~ .. include:: includes/intro_interceptors.rst Interceptors for General Clients ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^ For Confluent Control Center stream monitoring to work with Kafka clients, you must configure SSL encryption for the Confluent Monitoring Interceptors in each client. 1. Verify that the client has configured interceptors. * Producer: .. sourcecode:: bash interceptor.classes=io.confluent.monitoring.clients.interceptor.MonitoringProducerInterceptor * Consumer: .. sourcecode:: bash interceptor.classes=io.confluent.monitoring.clients.interceptor.MonitoringConsumerInterceptor 2. Configure SSL encryption for the interceptor. .. sourcecode:: bash confluent.monitoring.interceptor.bootstrap.servers=kafka1:9093 confluent.monitoring.interceptor.security.protocol=SSL confluent.monitoring.interceptor.ssl.truststore.location=/var/private/ssl/kafka.server.truststore.jks confluent.monitoring.interceptor.ssl.truststore.password=test1234 Interceptors for Kafka Connect ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^ For Confluent Control Center stream monitoring to work with Kafka Connect, you must configure SSL for the Confluent Monitoring Interceptors in Kafka Connect. Configure the Connect workers by adding these properties in ``connect-distributed.properties``, depending on whether the connectors are sources or sinks. * Source connector: configure the Confluent Monitoring Interceptors for SSL encryption with the ``producer`` prefix. .. sourcecode:: bash producer.interceptor.classes=io.confluent.monitoring.clients.interceptor.MonitoringProducerInterceptor producer.confluent.monitoring.interceptor.bootstrap.servers=kafka1:9093 producer.confluent.monitoring.interceptor.security.protocol=SSL producer.confluent.monitoring.interceptor.ssl.truststore.location=/var/private/ssl/kafka.server.truststore.jks producer.confluent.monitoring.interceptor.ssl.truststore.password=test1234 * Sink connector: configure the Confluent Monitoring Interceptors for SSL encryption with the ``consumer`` prefix. .. sourcecode:: bash consumer.interceptor.classes=io.confluent.monitoring.clients.interceptor.MonitoringConsumerInterceptor consumer.confluent.monitoring.interceptor.bootstrap.servers=kafka1:9093 consumer.confluent.monitoring.interceptor.security.protocol=SSL consumer.confluent.monitoring.interceptor.ssl.truststore.location=/var/private/ssl/kafka.server.truststore.jks consumer.confluent.monitoring.interceptor.ssl.truststore.password=test1234 Interceptors for Replicator ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^ For Confluent Control Center stream monitoring to work with Replicator, you must configure SSL for the Confluent Monitoring Interceptors in the Replicator JSON configuration file. Here is an example subset of configuration properties to add for SSL encryption. .. sourcecode:: bash { "name":"replicator", "config":{ .... "src.consumer.group.id": "replicator", "src.consumer.interceptor.classes": "io.confluent.monitoring.clients.interceptor.MonitoringConsumerInterceptor", "src.consumer.confluent.monitoring.interceptor.bootstrap.servers": "kafka1:9093", "src.consumer.confluent.monitoring.interceptor.security.protocol": "SSL", "src.consumer.confluent.monitoring.interceptor.ssl.truststore.location": "/var/private/ssl/kafka.client.truststore.jks", "src.consumer.confluent.monitoring.interceptor.ssl.truststore.password": "test1234", .... } } } .. _encryption-ssl-schema-registry: Schema Registry ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~ .. include:: includes/intro_sr.rst Here is an example subset of ``schema-registry.properties`` configuration parameters to add for SSL encryption: .. sourcecode:: bash kafkastore.bootstrap.servers=SSL://kafka1:9093 kafkastore.security.protocol=SSL kafkastore.ssl.truststore.location=/var/private/ssl/kafka.server.truststore.jks kafkastore.ssl.truststore.password=test1234 .. _encryption-ssl-rest-proxy: REST Proxy ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~ Securing Confluent REST Proxy with SSL encryption requires that you configure security between: #. REST clients and the REST Proxy (HTTPS) #. REST proxy and the Kafka cluster You may also refer to the complete list of `REST Proxy configuration options `_. First configure HTTPS between REST clients and the REST Proxy. Here is an example subset of ``kafka-rest.properties`` configuration parameters to configure HTTPS: .. sourcecode:: bash ssl.truststore.location=/var/private/ssl/kafka.server.truststore.jks ssl.truststore.password=test1234 Then, configure SSL encryption between REST proxy and the Kafka cluster. Here is an example subset of ``kafka-rest.properties`` configuration parameters to add for SSL encryption: .. sourcecode:: bash client.bootstrap.servers=kafka1:9093 client.security.protocol=SSL client.ssl.truststore.location=/var/private/ssl/kafka.server.truststore.jks client.ssl.truststore.password=test1234 .. _ssl_logging: SSL Logging ~~~~~~~~~~~~~ Enable SSL debug logging at the JVM level by starting the Kafka broker and/or clients with the ``javax.net.debug`` system property. For example: .. sourcecode:: bash $ export KAFKA_OPTS=-Djavax.net.debug=all $ bin/kafka-server-start etc/kafka/server.properties Once you start the broker you should be able to see in the ``server.log``: .. sourcecode:: bash with addresses: PLAINTEXT -> EndPoint(192.168.64.1,9092,PLAINTEXT),SSL -> EndPoint(192.168.64.1,9093,SSL) To verify if the server's keystore and truststore are setup correctly you can run the following command: .. sourcecode:: bash openssl s_client -debug -connect localhost:9093 -tls1 Note: TLSv1 should be listed under ``ssl.enabled.protocols``. In the output of this command you should see the server's certificate: .. sourcecode:: bash -----BEGIN CERTIFICATE----- {variable sized random bytes} -----END CERTIFICATE----- subject=/C=US/ST=CA/L=Santa Clara/O=org/OU=org/CN=Joe Smith issuer=/C=US/ST=CA/L=Santa Clara/O=org/OU=org/CN=kafka/emailAddress=test@test.com You can find more details on this in `the Oracle documentation `__ on `debugging SSL/TLS connections `__. If the certificate does not show up with the ``openssl`` command, or if there are any other error messages, then your keys or certificates are not setup correctly. Review your configurations.