.. _kafka_ssl_authentication: Encryption and Authentication with SSL -------------------------------------- SSL Overview ~~~~~~~~~~~~ With SSL authentication, the server authenticates the client (also called "2-way authentication"). Because SSL authentication requires SSL encryption, this page shows you how to configure both at the same time and is a superset of configurations required just for :ref:`SSL encryption `. .. include:: includes/intro_ssl.rst .. include:: ../includes/cp-demo-tip.rst Creating SSL Keys and Certificates ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~ Refer to the :ref:`security_tutorial`, which describes how to :ref:`create SSL keys and certificates `. Brokers ~~~~~~~ .. include:: includes/intro_brokers.rst * :ref:`Confluent Metrics Reporter ` #. Configure the truststore, keystore, and password in the ``server.properties`` file of every broker. Because this stores passwords directly in the broker configuration file, it is important to restrict access to these files using file system permissions. .. codewithvars:: bash ssl.truststore.location=/var/private/ssl/kafka.server.truststore.jks ssl.truststore.password=test1234 ssl.keystore.location=/var/private/ssl/kafka.server.keystore.jks ssl.keystore.password=test1234 ssl.key.password=test1234 Note that ``ssl.truststore.password`` is technically optional, but strongly recommended. If a password is not set, access to the truststore is still available, but integrity checking is disabled. #. If you want to enable SSL for inter-broker communication, add the following to the broker properties file, which defaults to ``PLAINTEXT``: .. codewithvars:: bash security.inter.broker.protocol=SSL #. Configure the ports for the |ak-tm| brokers to listen for client and inter-broker ``SSL`` connections. You should configure ``listeners``, and optionally, ``advertised.listeners`` if the value is different from ``listeners``. .. codewithvars:: bash listeners=SSL://kafka1:9093 advertised.listeners=SSL://localhost:9093 #. Configure both ``SSL`` ports and ``PLAINTEXT`` ports if: * SSL is not enabled for inter-broker communication * Some clients connecting to the cluster do not use SSL .. codewithvars:: bash listeners=PLAINTEXT://kafka1:9092,SSL://kafka1:9093 advertised.listeners=PLAINTEXT://localhost:9092,SSL://localhost:9093 Note that ``advertised.host.name`` and ``advertised.port`` configure a single ``PLAINTEXT`` port and are incompatible with secure protocols. Use ``advertised.listeners`` instead. #. To enable the broker to authenticate clients (2-way authentication), you must configure all the brokers for client authentication. Configure this to use ``required`` rather than ``requested`` because misconfigured clients can still connect successfully and this provides a false sense of security. .. codewithvars:: bash ssl.client.auth=required .. note:: If any SASL authentication mechanisms are enabled for a given listener, then SSL client authentication is disabled--even if you have specified ``ssl.client.auth=required`` and the broker authenticates clients only using SASL on that listener. Optional settings ^^^^^^^^^^^^^^^^^ ``ssl.endpoint.identification.algorithm`` The endpoint identification algorithm used by clients to validate server host name. The default value is ``https``. Clients including client connections created by the broker for inter-broker communication verify that the broker host name matches the host name in the broker's certificate. Disable server host name verification by setting ``ssl.endpoint.identification.algorithm`` to an empty string. * Type: string * Default: https * Importance: medium ``ssl.cipher.suites`` A cipher suite is a named combination of authentication, encryption, MAC, and key exchange algorithm used to negotiate the security settings for a network connection (using TLS or SSL network protocol). * Type: list * Default: null (by default, all supported cipher suites are enabled) * Importance: medium ``ssl.enabled.protocols`` The list of protocols enabled for SSL connections. * Type: list * Default: TLSv1.2,TLSv1.1,TLSv1 * Importance: medium ``ssl.truststore.type`` The file format of the truststore file. * Type: string * Default: JKS * Importance: medium Due to import regulations in some countries, the Oracle implementation limits the strength of cryptographic algorithms available by default. If stronger algorithms are needed (for example, AES with 256-bit keys), the `JCE Unlimited Strength Jurisdiction Policy Files `_ must be obtained and installed in the JDK/JRE. See the `JCA Providers Documentation `_ for more information. .. _authentication-ssl-clients: Clients ~~~~~~~ .. include:: includes/intro_clients.rst In the following configuration example, the underlying assumption is that client authentication is required by the broker so that you can store it in a client properties file ``client-ssl.properties``. Because this stores passwords directly in the broker configuration file, it is important to restrict access to these files using file system permissions. .. codewithvars:: bash bootstrap.servers=kafka1:9093 security.protocol=SSL ssl.truststore.location=/var/private/ssl/kafka.client.truststore.jks ssl.truststore.password=test1234 ssl.keystore.location=/var/private/ssl/kafka.client.keystore.jks ssl.keystore.password=test1234 ssl.key.password=test1234 Note that ``ssl.truststore.password`` is technically optional, but strongly recommended. If a password is not set, access to the truststore is still available, but integrity checking is disabled. The following examples use ``kafka-console-producer`` and ``kafka-console-consumer``, and pass in the ``client-ssl.properties`` defined above: .. codewithvars:: bash bin/kafka-console-producer --broker-list kafka1:9093 --topic test --producer.config client-ssl.properties bin/kafka-console-consumer --bootstrap-server kafka1:9093 --topic test --consumer.config client-ssl.properties --from-beginning Optional settings ^^^^^^^^^^^^^^^^^ ``ssl.endpoint.identification.algorithm`` The endpoint identification algorithm used by clients to validate server host name. The default value is ``https``. Clients including client connections created by the broker for inter-broker communication verify that the broker host name matches the host name in the broker's certificate. Disable server host name verification by setting ``ssl.endpoint.identification.algorithm`` to an empty string. * Type: string * Default: https * Importance: medium ``ssl.provider`` The name of the security provider used for SSL connections. Default value is the default security provider of the JVM. * Type: string * Default: null * Importance: medium ``ssl.cipher.suites`` A cipher suite is a named combination of authentication, encryption, MAC, and key exchange algorithm used to negotiate the security settings for a network connection (using TLS or SSL network protocol). * Type: list * Default: null (by default, all supported cipher suites are enabled) * Importance: medium ``ssl.enabled.protocols`` The list of protocols enabled for SSL connections. * Type: list * Default: TLSv1.2,TLSv1.1,TLSv1 * Importance: medium ``ssl.truststore.type`` The file format of the truststore file. * Type: string * Default: JKS * Importance: medium .. _authentication-ssl-zookeeper: |zk| ~~~~ The version of |zk| bundled with |ak| does not support SSL. .. _authentication-ssl-connect: |kconnect-long| ~~~~~~~~~~~~~~~ .. include:: includes/intro_connect.rst * :ref:`Confluent Monitoring Interceptors ` * :ref:`Confluent Metrics Reporter ` .. _authentication-ssl-connect-workers: Configure the top-level settings in the Connect workers to use SSL by adding these properties in ``connect-distributed.properties``. These top-level settings are used by the Connect worker for group coordination and to read and write to the internal topics that are used to track the cluster's state (for example, configs and offsets). The assumption here is that client authentication is required by the brokers. .. codewithvars:: bash bootstrap.servers=kafka1:9093 security.protocol=SSL ssl.truststore.location=/var/private/ssl/kafka.client.truststore.jks ssl.truststore.password=test1234 ssl.keystore.location=/var/private/ssl/kafka.client.keystore.jks ssl.keystore.password=test1234 ssl.key.password=test1234 Connect workers manage the producers used by source connectors and the consumers used by sink connectors. So, for the connectors to leverage security, must also override the default producer/consumer configuration that the worker uses. The assumption here is that client authentication is required by the brokers. * For source connectors: configure the same properties adding the ``producer`` prefix. .. codewithvars:: bash producer.bootstrap.servers=kafka1:9093 producer.security.protocol=SSL producer.ssl.truststore.location=/var/private/ssl/kafka.client.truststore.jks producer.ssl.truststore.password=test1234 producer.ssl.keystore.location=/var/private/ssl/kafka.client.keystore.jks producer.ssl.keystore.password=test1234 producer.ssl.key.password=test1234 * For sink connectors: configure the same properties adding the ``consumer`` prefix. .. codewithvars:: bash consumer.bootstrap.servers=kafka1:9093 consumer.security.protocol=SSL consumer.ssl.truststore.location=/var/private/ssl/kafka.client.truststore.jks consumer.ssl.truststore.password=test1234 consumer.ssl.keystore.location=/var/private/ssl/kafka.client.keystore.jks consumer.ssl.keystore.password=test1234 consumer.ssl.key.password=test1234 .. tip:: For more information, see :ref:`connect_security`. .. _authentication-ssl-replicator: |crep-full| ~~~~~~~~~~~ .. include:: includes/intro_replicator.rst * :ref:`Kafka Connect ` To add SSL to the |crep-full| embedded consumer, modify the Replicator JSON properties file. This example is a subset of configuration properties to add for SSL encryption and authentication. The assumption here is that client authentication is required by the brokers. .. codewithvars:: bash { "name":"replicator", "config":{ .... "src.kafka.ssl.truststore.location":"/etc/kafka/secrets/kafka.connect.truststore.jks", "src.kafka.ssl.truststore.password":"confluent", "src.kafka.ssl.keystore.location":"/etc/kafka/secrets/kafka.connect.keystore.jks", "src.kafka.ssl.keystore.password":"confluent", "src.kafka.ssl.key.password":"confluent", "src.kafka.security.protocol":"SSL" .... } } } .. seealso:: To see an example |crep-full| configuration, refer to the :devx-examples:`SSL source authentication demo script|/replicator-security/scripts/submit_replicator_source_ssl_auth.sh`. For demos of common security configurations refer to :devx-examples:`Replicator security demos|/replicator-security`. To configure |crep-full| for a destination cluster with SSL authentication, modify the |crep| JSON configuration to include the following: .. codewithvars:: bash { "name":"replicator", "config":{ .... "dest.kafka.ssl.truststore.location":"/etc/kafka/secrets/kafka.connect.truststore.jks", "dest.kafka.ssl.truststore.password":"confluent", "dest.kafka.ssl.keystore.location":"/etc/kafka/secrets/kafka.connect.keystore.jks", "dest.kafka.ssl.keystore.password":"confluent", "dest.kafka.ssl.key.password":"confluent", "dest.kafka.security.protocol":"SSL" .... } } } Additionally, the following properties are required in the Connect worker: .. codewithvars:: bash security.protocol=SSL ssl.truststore.location=/etc/kafka/secrets/kafka.connect.truststore.jks ssl.truststore.password=confluent ssl.keystore.location=/etc/kafka/secrets/kafka.connect.keystore.jks ssl.keystore.password=confluent ssl.key.password=confluent producer.security.protocol=SSL producer.ssl.truststore.location=/etc/kafka/secrets/kafka.connect.truststore.jks producer.ssl.truststore.password=confluent producer.ssl.keystore.location=/etc/kafka/secrets/kafka.connect.keystore.jks producer.ssl.keystore.password=confluent producer.ssl.key.password=confluent For more details see the general security configuration for Connect workers :ref:`here `. .. seealso:: To see an example |crep-full| configuration, see the :devx-examples:`SSL destination authentication demo script|/replicator-security/scripts/submit_replicator_dest_ssl_auth.sh`. For demos of common security configurations see: :devx-examples:`Replicator security demos|/replicator-security`. .. _authentication-ssl-c3: |c3| ~~~~ .. include:: includes/intro_c3.rst * :ref:`Confluent Metrics Reporter `: required on the production cluster being monitored * :ref:`Confluent Monitoring Interceptors `: optional if you are using Control Center streams monitoring Enable SSL for |c3| in the ``etc/confluent-control-center/control-center.properties`` file. The assumption here is that client authentication is required by the brokers. .. codewithvars:: bash confluent.controlcenter.streams.security.protocol=SSL confluent.controlcenter.streams.ssl.truststore.location=/var/private/ssl/kafka.client.truststore.jks confluent.controlcenter.streams.ssl.truststore.password=test1234 confluent.controlcenter.streams.ssl.keystore.location=/var/private/ssl/kafka.client.keystore.jks confluent.controlcenter.streams.ssl.keystore.password=test1234 confluent.controlcenter.streams.ssl.key.password=test1234 .. _authentication-ssl-metrics-reporter: |cmetric-full| ~~~~~~~~~~~~~~ This section describes how to enable SSL encryption and authentication for |cmetric-full|, which is used for |c3| and Auto Data Balancer. To add SSL for the |cmetric-full|, add the following to ``server.properties`` on the brokers in the |ak| cluster being monitored. The assumption here is that client authentication is required by the brokers. .. codewithvars:: bash confluent.metrics.reporter.bootstrap.servers=kafka1:9093 confluent.metrics.reporter.security.protocol=SSL confluent.metrics.reporter.ssl.truststore.location=/var/private/ssl/kafka.client.truststore.jks confluent.metrics.reporter.ssl.truststore.password=test1234 confluent.metrics.reporter.ssl.keystore.location=/var/private/ssl/kafka.client.keystore.jks confluent.metrics.reporter.ssl.keystore.password=test1234 confluent.metrics.reporter.ssl.key.password=test1234 .. _authentication-ssl-interceptors: Confluent Monitoring Interceptors ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~ .. include:: includes/intro_interceptors.rst Interceptors for General Clients ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^ For |c3| stream monitoring to work with |ak| clients, you must configure SSL encryption and authentication for the Confluent Monitoring Interceptors in each client. #. Verify that the client has configured interceptors. * Producer: .. codewithvars:: bash interceptor.classes=io.confluent.monitoring.clients.interceptor.MonitoringProducerInterceptor * Consumer: .. codewithvars:: bash interceptor.classes=io.confluent.monitoring.clients.interceptor.MonitoringConsumerInterceptor Configure SSL encryption and authentication for the interceptor. The assumption here is that client authentication is required by the brokers. .. codewithvars:: bash confluent.monitoring.interceptor.bootstrap.servers=kafka1:9093 confluent.monitoring.interceptor.security.protocol=SSL confluent.monitoring.interceptor.ssl.truststore.location=/var/private/ssl/kafka.client.truststore.jks confluent.monitoring.interceptor.ssl.truststore.password=test1234 confluent.monitoring.interceptor.ssl.keystore.location=/var/private/ssl/kafka.client.keystore.jks confluent.monitoring.interceptor.ssl.keystore.password=test1234 confluent.monitoring.interceptor.ssl.key.password=test1234 .. _authentication-ssl-kafka-connect-monitoring: Interceptors for |kconnect-long| ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^ For |c3| stream monitoring to work with |kconnect-long|, you must configure SSL for the Confluent Monitoring Interceptors in |kconnect-long|. The assumption here is that client authentication is required by the brokers. Configure the Connect workers by adding these properties in ``connect-distributed.properties``, depending on whether the connectors are sources or sinks. * Source connector: configure the Confluent Monitoring Interceptors for SSL encryption and authentication with the ``producer`` prefix. .. codewithvars:: bash producer.interceptor.classes=io.confluent.monitoring.clients.interceptor.MonitoringProducerInterceptor producer.confluent.monitoring.interceptor.bootstrap.servers=kafka1:9093 producer.confluent.monitoring.interceptor.security.protocol=SSL producer.confluent.monitoring.interceptor.ssl.truststore.location=/var/private/ssl/kafka.client.truststore.jks producer.confluent.monitoring.interceptor.ssl.truststore.password=test1234 producer.confluent.monitoring.interceptor.ssl.keystore.location=/var/private/ssl/kafka.client.keystore.jks producer.confluent.monitoring.interceptor.ssl.keystore.password=test1234 producer.confluent.monitoring.interceptor.ssl.key.password=test1234 * Sink connector: configure the Confluent Monitoring Interceptors for SSL encryption and authentication with the ``consumer`` prefix. .. codewithvars:: bash consumer.interceptor.classes=io.confluent.monitoring.clients.interceptor.MonitoringConsumerInterceptor consumer.confluent.monitoring.interceptor.bootstrap.servers=kafka1:9093 consumer.confluent.monitoring.interceptor.security.protocol=SSL consumer.confluent.monitoring.interceptor.ssl.truststore.location=/var/private/ssl/kafka.client.truststore.jks consumer.confluent.monitoring.interceptor.ssl.truststore.password=test1234 consumer.confluent.monitoring.interceptor.ssl.keystore.location=/var/private/ssl/kafka.client.keystore.jks consumer.confluent.monitoring.interceptor.ssl.keystore.password=test1234 consumer.confluent.monitoring.interceptor.ssl.key.password=test1234 .. _authentication-ssl-replicator-monitoring: Interceptors for Replicator ^^^^^^^^^^^^^^^^^^^^^^^^^^^ For |c3| stream monitoring to work with Replicator, you must configure SSL for the Confluent Monitoring Interceptors in the Replicator JSON configuration file. Here is an example subset of configuration properties to add for SSL encryption and authentication: .. codewithvars:: bash { "name":"replicator", "config":{ .... "src.consumer.group.id": "replicator", "src.consumer.interceptor.classes": "io.confluent.monitoring.clients.interceptor.MonitoringConsumerInterceptor", "src.consumer.confluent.monitoring.interceptor.security.protocol": "SSL", "src.consumer.confluent.monitoring.interceptor.bootstrap.servers": "kafka1:9093", "src.consumer.confluent.monitoring.interceptor.ssl.truststore.location": "/var/private/ssl/kafka.client.truststore.jks", "src.consumer.confluent.monitoring.interceptor.ssl.truststore.password": "test1234", "src.consumer.confluent.monitoring.interceptor.ssl.keystore.location": "/var/private/ssl/kafka.client.keystore.jks", "src.consumer.confluent.monitoring.interceptor.ssl.keystore.password": "test1234", "src.consumer.confluent.monitoring.interceptor.ssl.key.password": "test1234", .... } } } .. _authentication-ssl-schema-registry: |sr| ~~~~ .. include:: includes/intro_sr.rst The following is an example subset of ``schema-registry.properties`` configuration parameters to add for SSL encryption and authentication. The assumption here is that client authentication is required by the brokers. .. codewithvars:: bash kafkastore.bootstrap.servers=SSL://kafka1:9093 kafkastore.security.protocol=SSL kafkastore.ssl.truststore.location=/var/private/ssl/kafka.client.truststore.jks kafkastore.ssl.truststore.password=test1234 kafkastore.ssl.keystore.location=/var/private/ssl/kafka.client.keystore.jks kafkastore.ssl.keystore.password=test1234 kafkastore.ssl.key.password=test1234 .. _authentication-ssl-rest-proxy: REST Proxy ~~~~~~~~~~ Securing |crest-long| with SSL encryption and authentication requires that you configure security between: #. REST clients and the REST Proxy (HTTPS) #. REST proxy and the |ak| cluster Also refer to the complete list of `REST Proxy configuration options `_. #. Configure HTTPS between REST clients and the REST Proxy. The following is an example subset of ``kafka-rest.properties`` configuration parameters to configure HTTPS. .. codewithvars:: bash ssl.truststore.location=/var/private/ssl/kafka.client.truststore.jks ssl.truststore.password=test1234 ssl.keystore.location=/var/private/ssl/kafka.client.keystore.jks ssl.keystore.password=test1234 ssl.key.password=test1234 #. Configure SSL encryption and authentication between REST proxy and the |ak| cluster. The following is an example subset of ``kafka-rest.properties`` configuration parameters to add for SSL encryption and authentication. The assumption here is that client authentication is required by the brokers. .. codewithvars:: bash client.bootstrap.servers=kafka1:9093 client.security.protocol=SSL client.ssl.truststore.location=/var/private/ssl/kafka.client.truststore.jks client.ssl.truststore.password=test1234 client.ssl.keystore.location=/var/private/ssl/kafka.client.keystore.jks client.ssl.keystore.password=test1234 client.ssl.key.password=test1234 .. include:: includes/ssl_logging.rst