Encryption with SSL

Table of Contents

SSL Overview

SSL can be configured for encryption or authentication. You may configure just SSL encryption and independently choose a separate mechanism for client authentication, e.g. SSL, SASL, etc. This section focuses on SSL encryption.

By default, Apache Kafka communicates in PLAINTEXT, which means that all data is sent in the clear. To encrypt communication, it is recommended to configure all the Confluent Platform components in your deployment to use SSL encryption.

Quick note on terminology: Secure Sockets Layer (SSL) is the predecessor of Transport Layer Security (TLS), and SSL has been deprecated since June 2015. However, for historical reasons, Kafka (like Java) uses the term SSL instead of TLS in configuration and code, which can be a bit confusing. This document uses only the term SSL.

SSL can be configured for encryption or authentication. You may configure just SSL encryption (by default SSL encryption includes certificate authentication of the server) and independently choose a separate mechanism for client authentication, e.g. SSL, SASL, etc. Note that SSL encryption, technically speaking, already enables 1-way authentication in which the client authenticates the server certificate. So when referring to SSL authentication, it is really referring to 2-way authentication in which the broker also authenticates the client certificate.

Note that enabling SSL may have a performance impact due to encryption overhead.

SSL uses private-key/certificates pairs which are used during the SSL handshake process.

  • each broker needs its own private-key/certificate pair, and the client uses the certificate to authenticate the broker
  • each logical client needs a private-key/certificate pair if client authentication is enabled, and the broker uses the certificate to authenticate the client

Each broker and logical client can be configured with a truststore, which is used to determine which certificates (broker or logical client identities) to trust (authenticate). The truststore can be configured in many ways, consider the following two examples:

  1. the truststore contains one or many certificates: the broker or logical client will trust any certificate listed in the truststore
  2. the truststore contains a Certificate Authority (CA): the broker or logical client will trust any certificate that was signed by the CA in the truststore

Using the CA (2) is more convenient, because adding a new broker or client doesn’t require a change to the truststore. The CA case (2) is outlined in this diagram.

However, with the CA case (2), Kafka does not conveniently support blocking authentication for individual brokers or clients that were previously trusted via this mechanism (certificate revocation is typically done via Certificate Revocation Lists or the Online Certificate Status Protocol), so one would have to rely on authorization to block access. In contrast, with case (1), blocking authentication would be achieved by removing the broker or client’s certificate from the truststore.

Creating SSL Keys and Certificates

The general steps are as follows:

  1. Generate the keys and certificates
  2. Create your own Certificate Authority (CA)
  3. Sign the certificate

Generate the keys and certificates

The first step of deploying SSL is to generate the key and the certificate for each Kafka broker in the cluster. You can use Java’s keytool utility to accomplish this task. You will generate the key into a temporary keystore initially so that you can export and sign it later with CA.

keytool -keystore kafka.server.keystore.jks -alias localhost -validity {validity} -genkey

You must specify two parameters in the above command:

  1. keystore: the keystore file that stores the certificate. The keystore file contains the private key of the certificate; therefore, it needs to be kept safely.
  2. validity: the valid time of the certificate in days.

Ensure that the common name (CN) exactly matches the fully qualified domain name (FQDN) of the server. The client compares the CN with the DNS domain name to ensure that it is indeed connecting to the desired server, not a malicious one. The hostname of the server can also be specified in the SubjectAlternativeName. Since the distinguished name is used as the server principal when SSL is used as the inter-broker security protocol, it is useful to have hostname as a subject alternative name rather than the CN.

Create your own Certificate Authority (CA)

After the first step, each machine in the cluster has a public-private key pair, and a certificate to identify the machine. The certificate, however, is unsigned, which means that an attacker can create such a certificate to pretend to be any machine.

Therefore, it is important to prevent forged certificates by signing them for each machine in the cluster. A certificate authority (CA) is responsible for signing certificates. CA works likes a government that issues passports — the government stamps (signs) each passport so that the passport becomes difficult to forge. Other governments verify the stamps to ensure the passport is authentic. Similarly, the CA signs the certificates, and the cryptography guarantees that a signed certificate is computationally difficult to forge. Thus, as long as the CA is a genuine and trusted authority, the clients have high assurance that they are connecting to the authentic machines.

openssl req -new -x509 -keyout ca-key -out ca-cert -days 365

The generated CA is simply a public-private key pair and certificate, and it is intended to sign other certificates.

The next step is to add the generated CA to the clients’ truststore so that the clients can trust this CA:

keytool -keystore kafka.client.truststore.jks -alias CARoot -import -file ca-cert

Note: If you configure the Kafka brokers to require client authentication by setting ssl.client.auth to requested or required on the broker config then you must also provide a truststore for the Kafka brokers and it should have all the CA certificates that clients keys were signed by.

keytool -keystore kafka.server.truststore.jks -alias CARoot -import -file ca-cert

In contrast to the keystore, which stores each machine’s own identity, the truststore of a client stores all the certificates that the client should trust. Importing a certificate into one’s truststore also means trusting all certificates that are signed by that certificate. As the analogy above, trusting the government (CA) also means trusting all passports (certificates) that it has issued. This attribute is called the chain of trust, and it is particularly useful when deploying SSL on a large Kafka cluster. You can sign all certificates in the cluster with a single CA, and have all machines share the same truststore that trusts the CA. That way all machines can authenticate all other machines.

Sign the certificate

The next step is to sign all certificates in the keystore with the CA that you generated. First, you must export the certificate from the keystore:

keytool -keystore kafka.server.keystore.jks -alias localhost -certreq -file cert-file

Then sign it with the CA:

openssl x509 -req -CA ca-cert -CAkey ca-key -in cert-file -out cert-signed -days {validity} -CAcreateserial -passin pass:{ca-password}

Finally, import both the certificate of the CA and the signed certificate into the keystore:

keytool -keystore kafka.server.keystore.jks -alias CARoot -import -file ca-cert
keytool -keystore kafka.server.keystore.jks -alias localhost -import -file cert-signed

The definitions of the parameters are the following:

  1. keystore: the location of the keystore
  2. ca-cert: the certificate of the CA
  3. ca-key: the private key of the CA
  4. ca-password: the passphrase of the CA
  5. cert-file: the exported, unsigned certificate of the server
  6. cert-signed: the signed certificate of the server

confluent-platform-security-tools.git has a script that can generate truststores and keystores.

Brokers

Configure all brokers in the Kafka cluster to accept secure connections from clients. Any configuration changes made to the broker will require a rolling restart.

Enable security for Kafka brokers as described in the section below. Additionally, if you are using Confluent Control Center or Auto Data Balancer, configure your brokers for:

  1. Configure the password, truststore, and keystore in the server.properties file of every broker. Since this stores passwords directly in the broker configuration file, it is important to restrict access to these files via file system permissions.
ssl.truststore.location=/var/private/ssl/kafka.server.truststore.jks
ssl.truststore.password=test1234
ssl.keystore.location=/var/private/ssl/kafka.server.keystore.jks
ssl.keystore.password=test1234
ssl.key.password=test1234
  1. If you want to enable SSL for inter-broker communication, add the following to the broker properties file (it defaults to PLAINTEXT):
security.inter.broker.protocol=SSL
  1. Tell the Kafka brokers on which ports to listen for client and inter-broker SSL connections. You must configure listeners, and optionally advertised.listeners if the value is different from listeners.
listeners=SSL://kafka1:9093
advertised.listeners=SSL://0.0.0.0:9093
  1. Configure both SSL ports and PLAINTEXT ports if:
  • SSL is not enabled for inter-broker communication
  • Some clients connecting to the cluster do not use SSL
listeners=PLAINTEXT://kafka1:9092,SSL://kafka1:9093
advertised.listeners=PLAINTEXT://0.0.0.0:9092,SSL://0.0.0.0:9093

Note that advertised.host.name and advertised.port configure a single PLAINTEXT port and are incompatible with secure protocols. Please use advertised.listeners instead.

Optional settings

Here are some optional settings:

ssl.cipher.suites A cipher suite is a named combination of authentication, encryption, MAC and key exchange algorithm used to negotiate the security settings for a network connection using TLS or SSL network protocol

  • Type: list
  • Default: null (by default, all supported cipher suites are enabled)
  • Importance: medium

ssl.enabled.protocols The list of protocols enabled for SSL connections

  • Type: list
  • Default: TLSv1.2,TLSv1.1,TLSv1
  • Importance: medium

ssl.truststore.type The file format of the truststore file.

  • Type: string
  • Default: JKS
  • Importance: medium

Due to import regulations in some countries, the Oracle implementation limits the strength of cryptographic algorithms available by default. If stronger algorithms are needed (for example, AES with 256-bit keys), the JCE Unlimited Strength Jurisdiction Policy Files must be obtained and installed in the JDK/JRE. See the JCA Providers Documentation for more information.

Clients

The new Producer and Consumer clients support security for Kafka versions 0.9.0 and higher.

If you are using the Kafka Streams API, you can read on how to configure equivalent SSL and SASL parameters.

If client authentication is not required by the broker, the following is a minimal configuration example that you can store in a client properties file client-ssl.properties. Since this stores passwords directly in the client configuration file, it is important to restrict access to these files via file system permissions.

bootstrap.servers=kafka1:9093
security.protocol=SSL
ssl.truststore.location=/var/private/ssl/kafka.client.truststore.jks
ssl.truststore.password=test1234

If client authentication via SSL is required, the client must provide the keystore as well. Please read the additional configurations required in SSL Authentication.

Examples using kafka-console-producer and kafka-console-consumer, passing in the client-ssl.properties file with the properties defined above:

bin/kafka-console-producer --broker-list kafka1:9093 --topic test --producer.config client-ssl.properties
bin/kafka-console-consumer --bootstrap-server kafka1:9093 --topic test --new-consumer --consumer.config client-ssl.properties --from-beginning

Optional settings

Here are some optional settings:

ssl.provider The name of the security provider used for SSL connections. Default value is the default security provider of the JVM.

  • Type: string
  • Default: null
  • Importance: medium

ssl.cipher.suites A cipher suite is a named combination of authentication, encryption, MAC and key exchange algorithm used to negotiate the security settings for a network connection using TLS or SSL network protocol

  • Type: list
  • Default: null (by default, all supported cipher suites are enabled)
  • Importance: medium

ssl.enabled.protocols The list of protocols enabled for SSL connections

  • Type: list
  • Default: TLSv1.2,TLSv1.1,TLSv1
  • Importance: medium

ssl.truststore.type The file format of the truststore file.

  • Type: string
  • Default: JKS
  • Importance: medium

ZooKeeper

The version of ZooKeeper that is bundled with Apache Kafka does not support SSL.

Kafka Connect

This section describes how to enable security for Kafka Connect. Securing Kafka Connect requires that you configure security for:

  1. Kafka Connect workers: part of the Kafka Connect API, a worker is really just an advanced client, underneath the covers
  2. Kafka Connect connectors: connectors may have embedded producers or consumers, so you must override the default configurations for Connect producers used with source connectors and Connect consumers used with sink connectors

Configure security for Kafka Connect as described in the section below. Additionally, if you are using Confluent Control Center streams monitoring for Kafka Connect, configure security for:

Configure the top-level settings in the Connect workers to use SSL by adding these properties in connect-distributed.properties. These top-level settings are used by the Connect worker for group coordination and to read and write to the internal topics which are used to track the cluster’s state (e.g. configs and offsets).

bootstrap.servers=kafka1:9093
security.protocol=SSL
ssl.truststore.location=/var/private/ssl/kafka.client.truststore.jks
ssl.truststore.password=test1234

Connect workers manage the producers used by source connectors and the consumers used by sink connectors. So, for the connectors to leverage security, you also have to override the default producer/consumer configuration that the worker uses. Depending on whether the connector is a source or sink connector:

  • For source connectors: configure the same properties adding the producer prefix.
producer.bootstrap.servers=kafka1:9093
producer.security.protocol=SSL
producer.ssl.truststore.location=/var/private/ssl/kafka.client.truststore.jks
producer.ssl.truststore.password=test1234
  • For sink connectors: configure the same properties adding the consumer prefix.
consumer.bootstrap.servers=kafka1:9093
consumer.security.protocol=SSL
consumer.ssl.truststore.location=/var/private/ssl/kafka.client.truststore.jks
consumer.ssl.truststore.password=test1234

Confluent Replicator

Confluent Replicator is a type of Kafka source connector that replicates data from a source to destination Kafka cluster. An embedded consumer inside Replicator consumes data from the source cluster, and an embedded producer inside the Kafka Connect worker produces data to the destination cluster.

To configure Confluent Replicator security, you must configure the Replicator connector as shown below and additionally you must configure:

To add SSL to the Confluent Replicator embedded consumer, modify the Replicator JSON properties file.

Here is an example subset of configuration properties to add for SSL encryption:

{
  "name":"replicator",
    "config":{
      ....
      "src.kafka.bootstrap.servers" : "kafka1:9093",
      "src.kafka.security.protocol" : "SSL",
      "src.kafka.ssl.truststore.location" : "var/private/ssl/kafka.server.truststore.jks",
      "src.kafka.ssl.truststore.password" : "test1234",
      ....
    }
  }
}

Confluent Control Center

Confluent Control Center uses Kafka Streams as a state store, so if all the Kafka brokers in the cluster backing Control Center are secured, then the Control Center application also needs to be secured.

Enable security for the Control Center application as described in the section below. Additionally, configure security for the following components:

Enable SSL for Control Center in the etc/confluent-control-center/control-center.properties file.

confluent.controlcenter.streams.security.protocol=SSL
confluent.controlcenter.streams.ssl.truststore.location=/var/private/ssl/kafka.client.truststore.jks
confluent.controlcenter.streams.ssl.truststore.password=test1234

Confluent Metrics Reporter

This section describes how to enable SSL encryption for Confluent Metrics Reporter, which is used for Confluent Control Center and Auto Data Balancer.

To add SSL for the Confluent Metrics Reporter, add the following to the server.properties file on the brokers in the Kafka cluster being monitored.

confluent.metrics.reporter.bootstrap.servers=kafka1:9093
confluent.metrics.reporter.security.protocol=SSL
confluent.metrics.reporter.ssl.truststore.location=/var/private/ssl/kafka.server.truststore.jks
confluent.metrics.reporter.ssl.truststore.password=test1234

Confluent Monitoring Interceptors

Confluent Monitoring Interceptors are used for Confluent Control Center streams monitoring. This section describes how to enable security for Confluent Monitoring Interceptors in three places:

  1. General clients
  2. Kafka Connect
  3. Confluent Replicator

Interceptors for General Clients

For Confluent Control Center stream monitoring to work with Kafka clients, you must configure SSL encryption for the Confluent Monitoring Interceptors in each client.

  1. Verify that the client has configured interceptors.
  • Producer:

    interceptor.classes=io.confluent.monitoring.clients.interceptor.MonitoringProducerInterceptor
    
  • Consumer:

    interceptor.classes=io.confluent.monitoring.clients.interceptor.MonitoringConsumerInterceptor
    
  1. Configure SSL encryption for the interceptor.
confluent.monitoring.interceptor.bootstrap.servers=kafka1:9093
confluent.monitoring.interceptor.security.protocol=SSL
confluent.monitoring.interceptor.ssl.truststore.location=/var/private/ssl/kafka.server.truststore.jks
confluent.monitoring.interceptor.ssl.truststore.password=test1234

Interceptors for Kafka Connect

For Confluent Control Center stream monitoring to work with Kafka Connect, you must configure SSL for the Confluent Monitoring Interceptors in Kafka Connect. Configure the Connect workers by adding these properties in connect-distributed.properties, depending on whether the connectors are sources or sinks.

  • Source connector: configure the Confluent Monitoring Interceptors for SSL encryption with the producer prefix.

    producer.interceptor.classes=io.confluent.monitoring.clients.interceptor.MonitoringProducerInterceptor
    producer.confluent.monitoring.interceptor.bootstrap.servers=kafka1:9093
    producer.confluent.monitoring.interceptor.security.protocol=SSL
    producer.confluent.monitoring.interceptor.ssl.truststore.location=/var/private/ssl/kafka.server.truststore.jks
    producer.confluent.monitoring.interceptor.ssl.truststore.password=test1234
    
  • Sink connector: configure the Confluent Monitoring Interceptors for SSL encryption with the consumer prefix.

    consumer.interceptor.classes=io.confluent.monitoring.clients.interceptor.MonitoringConsumerInterceptor
    consumer.confluent.monitoring.interceptor.bootstrap.servers=kafka1:9093
    consumer.confluent.monitoring.interceptor.security.protocol=SSL
    consumer.confluent.monitoring.interceptor.ssl.truststore.location=/var/private/ssl/kafka.server.truststore.jks
    consumer.confluent.monitoring.interceptor.ssl.truststore.password=test1234
    

Interceptors for Replicator

For Confluent Control Center stream monitoring to work with Replicator, you must configure SSL for the Confluent Monitoring Interceptors in the Replicator JSON configuration file. Here is an example subset of configuration properties to add for SSL encryption.

{
  "name":"replicator",
    "config":{
      ....
      "src.consumer.group.id": "replicator",
      "src.consumer.interceptor.classes": "io.confluent.monitoring.clients.interceptor.MonitoringConsumerInterceptor",
      "src.consumer.confluent.monitoring.interceptor.bootstrap.servers": "kafka1:9093",
      "src.consumer.confluent.monitoring.interceptor.security.protocol": "SSL",
      "src.consumer.confluent.monitoring.interceptor.ssl.truststore.location": "/var/private/ssl/kafka.client.truststore.jks",
      "src.consumer.confluent.monitoring.interceptor.ssl.truststore.password": "test1234",
      ....
    }
  }
}

Schema Registry

The Schema Registry uses Kafka to persist schemas, and so it acts as a client to write data to the Kafka cluster. Therefore, if the Kafka brokers are configured for security, you should also configure Schema Registry to use security. You may also refer to the complete list of Schema Registry configuration options.

Here is an example subset of schema-registry.properties configuration parameters to add for SSL encryption:

kafkastore.bootstrap.servers=SSL://kafka1:9093
kafkastore.security.protocol=SSL
kafkastore.ssl.truststore.location=/var/private/ssl/kafka.server.truststore.jks
kafkastore.ssl.truststore.password=test1234

REST Proxy

Securing Confluent REST Proxy with SSL encryption requires that you configure security between:

  1. REST clients and the REST Proxy (HTTPS)
  2. REST proxy and the Kafka cluster

You may also refer to the complete list of REST Proxy configuration options.

First configure HTTPS between REST clients and the REST Proxy. Here is an example subset of kafka-rest.properties configuration parameters to configure HTTPS:

ssl.truststore.location=/var/private/ssl/kafka.server.truststore.jks
ssl.truststore.password=test1234

Then, configure SSL encryption between REST proxy and the Kafka cluster. Here is an example subset of kafka-rest.properties configuration parameters to add for SSL encryption:

client.bootstrap.servers=kafka1:9093
client.security.protocol=SSL
client.ssl.truststore.location=/var/private/ssl/kafka.server.truststore.jks
client.ssl.truststore.password=test1234

SSL Logging

Enable SSL debug logging at the JVM level by starting the Kafka broker and/or clients with the javax.net.debug system property. For example:

$ export KAFKA_OPTS=-Djavax.net.debug=all
$ bin/kafka-server-start etc/kafka/server.properties

Once you start the broker you should be able to see in the server.log:

with addresses: PLAINTEXT -> EndPoint(192.168.64.1,9092,PLAINTEXT),SSL -> EndPoint(192.168.64.1,9093,SSL)

To verify if the server’s keystore and truststore are setup correctly you can run the following command:

openssl s_client -debug -connect localhost:9093 -tls1

Note: TLSv1 should be listed under ssl.enabled.protocols.

In the output of this command you should see the server’s certificate:

-----BEGIN CERTIFICATE-----
{variable sized random bytes}
-----END CERTIFICATE-----
subject=/C=US/ST=CA/L=Santa Clara/O=org/OU=org/CN=Joe Smith
issuer=/C=US/ST=CA/L=Santa Clara/O=org/OU=org/CN=kafka/emailAddress=test@test.com

You can find more details on this in the Oracle documentation on debugging SSL/TLS connections.

If the certificate does not show up with the openssl command, or if there are any other error messages, then your keys or certificates are not setup correctly. Review your configurations.