.. _kafka_ssl_encryption: Encryption with SSL -------------------- SSL Overview ~~~~~~~~~~~~~~~~~~~ SSL can be configured for encryption or authentication. You may configure just SSL encryption and independently choose a separate mechanism for client authentication, e.g. :ref:`SSL `, :ref:`SASL `, etc. This section focuses on SSL encryption. .. include:: includes/intro_ssl.rst Creating SSL Keys and Certificates ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~ Refer to the Security Tutorial which describes how to :ref:`create SSL keys and certificates `. .. _encryption-ssl-brokers: Brokers ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~ .. include:: includes/intro_brokers.rst * :ref:`Confluent Metrics Reporter ` 1. Configure the password, truststore, and keystore in the ``server.properties`` file of every broker. Since this stores passwords directly in the broker configuration file, it is important to restrict access to these files via file system permissions. .. codewithvars:: bash ssl.truststore.location=/var/private/ssl/kafka.server.truststore.jks ssl.truststore.password=test1234 ssl.keystore.location=/var/private/ssl/kafka.server.keystore.jks ssl.keystore.password=test1234 ssl.key.password=test1234 2. If you want to enable SSL for inter-broker communication, add the following to the broker properties file (it defaults to ``PLAINTEXT``): .. codewithvars:: bash security.inter.broker.protocol=SSL 3. Tell the |ak-tm| brokers on which ports to listen for client and inter-broker ``SSL`` connections. You must configure ``listeners``, and optionally ``advertised.listeners`` if the value is different from ``listeners``. .. codewithvars:: bash listeners=SSL://kafka1:9093 advertised.listeners=SSL://0.0.0.0:9093 4. Configure both ``SSL`` ports and ``PLAINTEXT`` ports if: * SSL is not enabled for inter-broker communication * Some clients connecting to the cluster do not use SSL .. codewithvars:: bash listeners=PLAINTEXT://kafka1:9092,SSL://kafka1:9093 advertised.listeners=PLAINTEXT://0.0.0.0:9092,SSL://0.0.0.0:9093 *Note* that ``advertised.host.name`` and ``advertised.port`` configure a single ``PLAINTEXT`` port and are *incompatible* with secure protocols. Please use ``advertised.listeners`` instead. Optional settings ^^^^^^^^^^^^^^^^^^^^^^ Here are some optional settings: ``ssl.cipher.suites`` A cipher suite is a named combination of authentication, encryption, MAC and key exchange algorithm used to negotiate the security settings for a network connection using TLS or SSL network protocol * Type: list * Default: null (by default, all supported cipher suites are enabled) * Importance: medium ``ssl.enabled.protocols`` The list of protocols enabled for SSL connections * Type: list * Default: TLSv1.2,TLSv1.1,TLSv1 * Importance: medium ``ssl.truststore.type`` The file format of the truststore file. * Type: string * Default: JKS * Importance: medium Due to import regulations in some countries, the Oracle implementation limits the strength of cryptographic algorithms available by default. If stronger algorithms are needed (for example, AES with 256-bit keys), the `JCE Unlimited Strength Jurisdiction Policy Files `_ must be obtained and installed in the JDK/JRE. See the `JCA Providers Documentation `_ for more information. .. _encryption-ssl-clients: Clients ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~ .. include:: includes/intro_clients.rst If client authentication is not required by the broker, the following is a minimal configuration example that you can store in a client properties file ``client-ssl.properties``. Since this stores passwords directly in the client configuration file, it is important to restrict access to these files via file system permissions. .. codewithvars:: bash bootstrap.servers=kafka1:9093 security.protocol=SSL ssl.truststore.location=/var/private/ssl/kafka.client.truststore.jks ssl.truststore.password=test1234 If client authentication via SSL is required, the client must provide the keystore as well. Please read the additional configurations required in :ref:`SSL Authentication `. Examples using ``kafka-console-producer`` and ``kafka-console-consumer``, passing in the ``client-ssl.properties`` file with the properties defined above: .. codewithvars:: bash bin/kafka-console-producer --broker-list kafka1:9093 --topic test --producer.config client-ssl.properties bin/kafka-console-consumer --bootstrap-server kafka1:9093 --topic test --consumer.config client-ssl.properties --from-beginning .. _clients-optional-settings: Optional settings ^^^^^^^^^^^^^^^^^^^^ Here are some optional settings: ``ssl.provider`` The name of the security provider used for SSL connections. Default value is the default security provider of the JVM. * Type: string * Default: null * Importance: medium ``ssl.cipher.suites`` A cipher suite is a named combination of authentication, encryption, MAC and key exchange algorithm used to negotiate the security settings for a network connection using TLS or SSL network protocol * Type: list * Default: null (by default, all supported cipher suites are enabled) * Importance: medium ``ssl.enabled.protocols`` The list of protocols enabled for SSL connections * Type: list * Default: TLSv1.2,TLSv1.1,TLSv1 * Importance: medium ``ssl.truststore.type`` The file format of the truststore file. * Type: string * Default: JKS * Importance: medium .. _encryption-ssl-zookeeper: |zk| ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~ The version of |zk| that is bundled with |ak| does not support SSL. .. _encryption-ssl-connect: |kconnect-long| ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~ .. include:: includes/intro_connect.rst * :ref:`Confluent Monitoring Interceptors ` * :ref:`Confluent Metrics Reporter ` .. _encryption-ssl-connect-workers: Configure the top-level settings in the Connect workers to use SSL by adding these properties in ``connect-distributed.properties``. These top-level settings are used by the Connect worker for group coordination and to read and write to the internal topics which are used to track the cluster's state (e.g. configs and offsets). .. codewithvars:: bash bootstrap.servers=kafka1:9093 security.protocol=SSL ssl.truststore.location=/var/private/ssl/kafka.client.truststore.jks ssl.truststore.password=test1234 Connect workers manage the producers used by source connectors and the consumers used by sink connectors. So, for the connectors to leverage security, you also have to override the default producer/consumer configuration that the worker uses. Depending on whether the connector is a source or sink connector: * For source connectors: configure the same properties adding the ``producer`` prefix. .. codewithvars:: bash producer.bootstrap.servers=kafka1:9093 producer.security.protocol=SSL producer.ssl.truststore.location=/var/private/ssl/kafka.client.truststore.jks producer.ssl.truststore.password=test1234 * For sink connectors: configure the same properties adding the ``consumer`` prefix. .. codewithvars:: bash consumer.bootstrap.servers=kafka1:9093 consumer.security.protocol=SSL consumer.ssl.truststore.location=/var/private/ssl/kafka.client.truststore.jks consumer.ssl.truststore.password=test1234 .. _encryption-ssl-rest: * For SSL with the REST API, configure the following additional properties: +------------------------------+------------------------------------------------------------------------------------+ | Property | Note | +==============================+====================================================================================+ | ``listeners`` | | List of REST listeners in the format | | | | ``protocol://host:port,protocol2://host2:port``, | | | | where the protocol is either ``http`` or ``https``. When this property is | | | | defined, the properties ``rest.host.name`` and ``rest.port`` will be ignored. | +------------------------------+------------------------------------------------------------------------------------+ | ``rest.advertised.listener`` | | Configures the listener used for communication between workers. | | | | Valid values are either ``http`` or ``https``. | | | | If the ``listeners`` property is not defined or if it contains an ``http`` | | | | listener, the default value for this field is ``http``. When the ``listeners`` | | | | property is defined and contains only ``https`` listeners, the default value is | | | | ``https``. | +------------------------------+------------------------------------------------------------------------------------+ | ``ssl.client.auth`` | | Valid values are ``none``, ``requested``, and ``required``. It controls whether:| | | | 1. the client is required to do SSL/TLS client authentication (``required``) | | | | 2. it can decide to skip the SSL/TLS client authentication (``requested``) | | | | 3. the SSL/TLS client authentication will be disabled (``none``) | +------------------------------+------------------------------------------------------------------------------------+ | ``listeners.https.ssl.*`` | | You can use the ``listeners.https.`` prefix with an SSL configuration parameter | | | | to override the default SSL configuration which is shared with the connections to| | | | the |ak| broker. If at least one parameter with this prefix exists, the | | | | implementation uses only the SSL parameters with this prefix and ignores all SSL | | | | parameters without this prefix. In no parameter with prefix ``listeners.https.``| | | | exists, the parameters without a prefix are used. | +------------------------------+------------------------------------------------------------------------------------+ Note that if the ``listeners.https.ssl.*`` properties are not defined then the ``ssl.*`` properties will be used. For a list of all ``ssl.*`` properties, see :ref:`Configuration Options`. Here is an example that sets the ``ssl.*`` properties to use SSL connections to the broker, and since ``listeners`` includes ``https`` these same settings are used to configure Connect's SSL endpoint: .. codewithvars:: bash listeners=https://myhost:8443 rest.advertised.listener=https rest.advertised.host.name=0.0.0.0 rest.advertised.host.port=8083 ssl.client.auth=requested ssl.truststore.location=/var/private/ssl/kafka.server.truststore.jks ssl.truststore.password=test1234 ssl.keystore.location=/var/private/ssl/kafka.server.keystore.jks ssl.keystore.password=test1234 ssl.key.password=test1234 To configure Connect's SSL endpoint differently than the SSL connections to the broker, simply define the ``listeners.https.ssl.*`` properties with the correct settings. Note that as soon as any ``listeners.https.ssl.*`` properties are specified, none of the top level ``ssl.*`` properties will apply, so be sure to define all of the necessary ``listeners.https.ssl.*`` properties: .. codewithvars:: bash listeners=https://myhost:8443 rest.advertised.listener=https rest.advertised.host.name=0.0.0.0 rest.advertised.host.port=8083 listeners.https.ssl.client.auth=requested listeners.https.ssl.truststore.location=/var/private/ssl/kafka.server.truststore.jks listeners.https.ssl.truststore.password=test1234 listeners.https.ssl.keystore.location=/var/private/ssl/kafka.server.keystore.jks listeners.https.ssl.keystore.password=test1234 listeners.https.ssl.key.password=test1234 .. _encryption-ssl-replicator: |crep-full| ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~ .. include:: includes/intro_replicator.rst * :ref:`Kafka Connect ` To add SSL to the |crep-full| embedded consumer, modify the Replicator JSON properties file. Here is an example subset of configuration properties to add for SSL encryption: .. codewithvars:: bash { "name":"replicator", "config":{ .... "src.kafka.bootstrap.servers" : "kafka1:9093", "src.kafka.security.protocol" : "SSL", "src.kafka.ssl.truststore.location" : "var/private/ssl/kafka.server.truststore.jks", "src.kafka.ssl.truststore.password" : "test1234", .... } } } .. _encryption-ssl-c3: |c3| ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~ .. include:: includes/intro_c3.rst * :ref:`Confluent Metrics Reporter `: required on the production cluster being monitored * :ref:`Confluent Monitoring Interceptors `: optional if you are using Control Center streams monitoring Enable SSL for Control Center in the ``etc/confluent-control-center/control-center.properties`` file. .. codewithvars:: bash confluent.controlcenter.streams.security.protocol=SSL confluent.controlcenter.streams.ssl.truststore.location=/var/private/ssl/kafka.client.truststore.jks confluent.controlcenter.streams.ssl.truststore.password=test1234 .. _encryption-ssl-metrics-reporter: |cmetric-full| ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~ This section describes how to enable SSL encryption for |cmetric-full|, which is used for |c3| and Auto Data Balancer. To add SSL for the |cmetric-full|, add the following to the ``server.properties`` file on the brokers in the |ak| cluster being monitored. .. codewithvars:: bash confluent.metrics.reporter.bootstrap.servers=kafka1:9093 confluent.metrics.reporter.security.protocol=SSL confluent.metrics.reporter.ssl.truststore.location=/var/private/ssl/kafka.server.truststore.jks confluent.metrics.reporter.ssl.truststore.password=test1234 .. _encryption-ssl-interceptors: Confluent Monitoring Interceptors ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~ .. include:: includes/intro_interceptors.rst Interceptors for General Clients ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^ For |c3| stream monitoring to work with |ak| clients, you must configure SSL encryption for the Confluent Monitoring Interceptors in each client. 1. Verify that the client has configured interceptors. * Producer: .. codewithvars:: bash interceptor.classes=io.confluent.monitoring.clients.interceptor.MonitoringProducerInterceptor * Consumer: .. codewithvars:: bash interceptor.classes=io.confluent.monitoring.clients.interceptor.MonitoringConsumerInterceptor 2. Configure SSL encryption for the interceptor. .. codewithvars:: bash confluent.monitoring.interceptor.bootstrap.servers=kafka1:9093 confluent.monitoring.interceptor.security.protocol=SSL confluent.monitoring.interceptor.ssl.truststore.location=/var/private/ssl/kafka.server.truststore.jks confluent.monitoring.interceptor.ssl.truststore.password=test1234 Interceptors for |kconnect-long| ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^ For |c3| stream monitoring to work with |kconnect-long|, you must configure SSL for the Confluent Monitoring Interceptors in |kconnect-long|. Configure the Connect workers by adding these properties in ``connect-distributed.properties``, depending on whether the connectors are sources or sinks. * Source connector: configure the Confluent Monitoring Interceptors for SSL encryption with the ``producer`` prefix. .. codewithvars:: bash producer.interceptor.classes=io.confluent.monitoring.clients.interceptor.MonitoringProducerInterceptor producer.confluent.monitoring.interceptor.bootstrap.servers=kafka1:9093 producer.confluent.monitoring.interceptor.security.protocol=SSL producer.confluent.monitoring.interceptor.ssl.truststore.location=/var/private/ssl/kafka.server.truststore.jks producer.confluent.monitoring.interceptor.ssl.truststore.password=test1234 * Sink connector: configure the Confluent Monitoring Interceptors for SSL encryption with the ``consumer`` prefix. .. codewithvars:: bash consumer.interceptor.classes=io.confluent.monitoring.clients.interceptor.MonitoringConsumerInterceptor consumer.confluent.monitoring.interceptor.bootstrap.servers=kafka1:9093 consumer.confluent.monitoring.interceptor.security.protocol=SSL consumer.confluent.monitoring.interceptor.ssl.truststore.location=/var/private/ssl/kafka.server.truststore.jks consumer.confluent.monitoring.interceptor.ssl.truststore.password=test1234 Interceptors for Replicator ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^ For |c3| stream monitoring to work with Replicator, you must configure SSL for the Confluent Monitoring Interceptors in the Replicator JSON configuration file. Here is an example subset of configuration properties to add for SSL encryption. .. codewithvars:: bash { "name":"replicator", "config":{ .... "src.consumer.group.id": "replicator", "src.consumer.interceptor.classes": "io.confluent.monitoring.clients.interceptor.MonitoringConsumerInterceptor", "src.consumer.confluent.monitoring.interceptor.bootstrap.servers": "kafka1:9093", "src.consumer.confluent.monitoring.interceptor.security.protocol": "SSL", "src.consumer.confluent.monitoring.interceptor.ssl.truststore.location": "/var/private/ssl/kafka.client.truststore.jks", "src.consumer.confluent.monitoring.interceptor.ssl.truststore.password": "test1234", .... } } } .. _encryption-ssl-schema-registry: |sr| ~~~~ .. include:: includes/intro_sr.rst Here is an example subset of ``schema-registry.properties`` configuration parameters to add for SSL encryption: .. codewithvars:: bash kafkastore.bootstrap.servers=SSL://kafka1:9093 kafkastore.security.protocol=SSL kafkastore.ssl.truststore.location=/var/private/ssl/kafka.server.truststore.jks kafkastore.ssl.truststore.password=test1234 .. _encryption-ssl-rest-proxy: REST Proxy ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~ Securing Confluent REST Proxy with SSL encryption requires that you configure security between: #. REST clients and the REST Proxy (HTTPS) #. REST proxy and the |ak| cluster You may also refer to the complete list of `REST Proxy configuration options `_. First configure HTTPS between REST clients and the REST Proxy. Here is an example subset of ``kafka-rest.properties`` configuration parameters to configure HTTPS: .. codewithvars:: bash ssl.truststore.location=/var/private/ssl/kafka.server.truststore.jks ssl.truststore.password=test1234 Then, configure SSL encryption between REST proxy and the |ak| cluster. Here is an example subset of ``kafka-rest.properties`` configuration parameters to add for SSL encryption: .. codewithvars:: bash client.bootstrap.servers=kafka1:9093 client.security.protocol=SSL client.ssl.truststore.location=/var/private/ssl/kafka.server.truststore.jks client.ssl.truststore.password=test1234 .. include:: includes/ssl_logging.rst