Encrypt with TLS

TLS overview

By default, Apache Kafka® communicates in PLAINTEXT, which means that all data is sent in the clear. To encrypt communication, you should configure all the Confluent Platform components in your deployment to use TLS/SSL encryption.

Confluent Platform supports Transport Layer Security (TLS) encryption based on OpenSSL, an open source cryptography toolkit that provides an implementation of the Transport Layer Security (TLS) and Secure Socket Layer (SSL) protocols With TLS authentication, the server authenticates the client (also called “two-way authentication”).

Secure Sockets Layer (SSL) was the predecessor of Transport Layer Security (TLS), and has been deprecated since June 2015. For historical reasons, SSL is used in configuration and code instead of TLS.

You can configure TLS for encryption, but you can also configure TLS for authentication. You can configure just TLS encryption (by default, TLS encryption includes certificate authentication of the server) and independently choose a separate mechanism for client authentication (for example, TLS, SASL). Technically speaking, TLS encryption already enables one-way authentication in which the client authenticates the server certificate. In this topic, “TLS authentication” refers to the two-way authentication, where the broker also authenticates the client certificate.

Enabling TLS may have a performance impact due to encryption overhead.

TLS uses private-key/certificate pairs, which are used during the TLS handshake process.

  • Each broker needs its own private-key/certificate pair, and the client uses the certificate to authenticate the broker.
  • Each logical client needs a private-key/certificate pair if client authentication is enabled, and the broker uses the certificate to authenticate the client.

You can configure each broker and logical client with a truststore, which is used to determine which certificates (broker or logical client identities) to trust (authenticate). You can configure the truststore in many ways. Consider the following two examples:

  • The truststore contains one or many certificates: the broker or logical client will trust any certificate listed in the truststore.
  • The truststore contains a Certificate Authority (CA): the broker or logical client will trust any certificate that was signed by the CA in the truststore.

Using the CA method is more convenient, because adding a new broker or client doesn’t require a change to the truststore. The CA method is outlined in this diagram.

However, with the CA method, Kafka does not conveniently support blocking authentication for individual brokers or clients that were previously trusted using this mechanism (certificate revocation is typically done using Certificate Revocation Lists or the Online Certificate Status Protocol), so you would have to rely on authorization to block access.

In contrast, if you use one or many certificates, blocking authentication is achieved by removing the broker or client’s certificate from the truststore.

See also

For an example that shows this in action, see the Confluent Platform demo. Refer to the demo’s docker-compose.yml file for a configuration reference.

Create TLS keys and certificates

Refer to the Security Tutorial, which describes how to create TLS keys and certificates.

Brokers

Configure all brokers in the Kafka cluster to accept secure connections from clients. Any configuration changes made to the broker will require a rolling restart.

Enable security for Kafka brokers as described in the section below. Additionally, if you are using Confluent Control Center, Auto Data Balancer, or Self-Balancing, configure your brokers for:

  1. Configure the password, truststore, and keystore in the server.properties file of every broker. Since this stores passwords directly in the broker configuration file, it is important to restrict access to these files using file system permissions.

    ssl.truststore.location=/var/ssl/private/kafka.server.truststore.jks
    ssl.truststore.password=test1234
    ssl.keystore.location=/var/ssl/private/kafka.server.keystore.jks
    ssl.keystore.password=test1234
    ssl.key.password=test1234
    
  2. If you want to enable TLS for interbroker communication, add the following to the broker properties file (it defaults to PLAINTEXT):

    security.inter.broker.protocol=SSL
    
  3. Tell the Apache Kafka® brokers on which ports to listen for client and interbroker SSL connections. You must configure listeners, and optionally advertised.listeners if the value is different from listeners.

    listeners=SSL://kafka1:9093
    advertised.listeners=SSL://<localhost>:9093
    
  4. Configure both SSL ports and PLAINTEXT ports if:

    • TLS/SSL is not enabled for interbroker communication
    • Some clients connecting to the cluster do not use TLS/SSL
    listeners=PLAINTEXT://kafka1:9092,SSL://kafka1:9093
    advertised.listeners=PLAINTEXT://<localhost>:9092,SSL://<localhost>:9093
    

    Note that advertised.host.name and advertised.port configure a single PLAINTEXT port and are incompatible with secure protocols. Use advertised.listeners instead.

Optional settings

Here are some optional settings:

ssl.cipher.suites A cipher suite is a named combination of authentication, encryption, MAC and key exchange algorithm used to negotiate the security settings for a network connection using the TLS/SSL network protocol.

  • Type: list
  • Default: null (by default, all supported cipher suites are enabled)
  • Importance: medium

ssl.enabled.protocols The list of protocols enabled for TLS/SSL connections.

  • Type: list
  • Default: TLSv1.2,TLSv1.1,TLSv1
  • Importance: medium

ssl.truststore.type The file format of the truststore file.

  • Type: string
  • Default: JKS
  • Importance: medium

Due to import regulations in some countries, the Oracle implementation limits the strength of cryptographic algorithms available by default. If stronger algorithms are needed (for example, AES with 256-bit keys), the JCE Unlimited Strength Jurisdiction Policy Files must be obtained and installed in the JDK/JRE. See the JCA Providers Documentation for more information.

Clients

The new Producer and Consumer clients support security for Kafka versions 0.9.0 and higher.

If you are using the Kafka Streams API, you can read on how to configure equivalent SSL and SASL parameters.

If client authentication is not required by the broker, the following is a minimal configuration example that you can store in a client properties file client-ssl.properties. Since this stores passwords directly in the client configuration file, it is important to restrict access to these files via file system permissions.

bootstrap.servers=kafka1:9093
security.protocol=SSL
ssl.truststore.location=/var/ssl/private/kafka.client.truststore.jks
ssl.truststore.password=test1234

If client authentication using TLS/SSL is required, the client must provide the keystore as well. Please read the additional configurations required in TLS/SSL Authentication.

Examples using kafka-console-producer and kafka-console-consumer, passing in the client-ssl.properties file with the properties defined above:

bin/kafka-console-producer --broker-list kafka1:9093 --topic test --producer.config client-ssl.properties
bin/kafka-console-consumer --bootstrap-server kafka1:9093 --topic test --consumer.config client-ssl.properties --from-beginning

Optional settings

Here are some optional settings:

ssl.provider The name of the security provider used for TLS/SSL connections. Default value is the default security provider of the JVM.

  • Type: string
  • Default: null
  • Importance: medium

ssl.cipher.suites A cipher suite is a named combination of authentication, encryption, MAC address, and key exchange algorithm used to negotiate the security settings for a network connection using the TLS/SSL network protocol.

  • Type: list
  • Default: null (by default, all supported cipher suites are enabled)
  • Importance: medium

ssl.enabled.protocols The list of protocols enabled for SSL connections. The default is ‘TLSv1.2,TLSv1.3’ when running with Java 11 or newer, and ‘TLSv1.2’ otherwise. With the default value for Java 11, clients and servers will prefer TLSv1.3 if both support it. Otherwise, clients and servers fallback to TLSv1.2 (assuming both support at least TLSv1.2). This default should be fine for most cases.

  • Type: list
  • Default: TLSv1.2
  • Importance: medium

ssl.truststore.type The file format of the truststore file.

  • Type: string
  • Default: JKS
  • Importance: medium

ZooKeeper

Starting in Confluent Platform version 5.5.0, the version of ZooKeeper that is bundled with Kafka supports TLS/SSL. For details, refer to Adding security to a running cluster.

Kafka Connect

This section describes how to enable security for Kafka Connect. Securing Kafka Connect requires that you configure security for:

  1. Kafka Connect workers: part of the Kafka Connect API, a worker is really just an advanced client, underneath the covers
  2. Kafka Connect connectors: connectors may have embedded producers or consumers, so you must override the default configurations for Connect producers used with source connectors and Connect consumers used with sink connectors
  3. Kafka Connect REST: Kafka Connect exposes a REST API that can be configured to use SSL using additional properties

Configure security for Kafka Connect as described in the section below. Additionally, if you are using Confluent Control Center streams monitoring for Kafka Connect, configure security for:

Configure the top-level settings in the Connect workers to use TLS/SSL by adding these properties in connect-distributed.properties. These top-level settings are used by the Connect worker for group coordination and to read and write to the internal topics which are used to track the cluster’s state (for example, configs and offsets).

bootstrap.servers=kafka1:9093
security.protocol=SSL
ssl.truststore.location=/var/ssl/private/kafka.client.truststore.jks
ssl.truststore.password=test1234

Connect workers manage the producers used by source connectors and the consumers used by sink connectors. So, for the connectors to leverage security, you also have to override the default producer/consumer configuration that the worker uses. Depending on whether the connector is a source or sink connector:

  • For source connectors: configure the same properties adding the producer prefix.

    producer.bootstrap.servers=kafka1:9093
    producer.security.protocol=SSL
    producer.ssl.truststore.location=/var/ssl/private/kafka.client.truststore.jks
    producer.ssl.truststore.password=test1234
    
  • For sink connectors: configure the same properties adding the consumer prefix.

    consumer.bootstrap.servers=kafka1:9093
    consumer.security.protocol=SSL
    consumer.ssl.truststore.location=/var/ssl/private/kafka.client.truststore.jks
    consumer.ssl.truststore.password=test1234
    
  • For TLS/SSL with the REST API, configure the following additional properties:
Property Note
listeners
List of REST listeners in the format
protocol://host:port,protocol2://host2:port,
where the protocol is either http or https.
rest.advertised.listener
Configures the listener used for communication between workers.
Valid values are either http or https.
If the listeners property is not defined or if it contains an http
listener, the default value for this field is http. When the listeners
property is defined and contains only https listeners, the default value is
https.
ssl.client.auth
Valid values are none, requested, and required. It controls whether:
1. the client is required to do TLS/SSL client authentication (required)
2. it can decide to skip the TLS/SSL client authentication (requested)
3. the TLS/SSL client authentication will be disabled (none)
listeners.https.ssl.*
You can use the listeners.https. prefix with an SSL configuration parameter
to override the default TLS/SSL configuration that is shared with the connections
to the Kafka broker. If at least one parameter with this prefix exists, the
implementation uses only the SSL parameters with this prefix and ignores all SSL
parameters without this prefix. If no parameter with prefix listeners.https.
exists, the parameters without a prefix are used.

Note that if the listeners.https.ssl.* properties are not defined then the ssl.* properties will be used. For a list of all REST API ssl.* properties, see REST Proxy Configuration Options.

Here is an example that sets the ssl.* properties to use TLS/SSL connections to the broker, and since listeners includes https these same settings are used to configure Connect’s TLS/SSL endpoint:

listeners=https://myhost:8443
rest.advertised.listener=https
rest.advertised.host.name=<localhost>
rest.advertised.host.port=8083
ssl.client.auth=requested
ssl.truststore.location=/var/ssl/private/kafka.server.truststore.jks
ssl.truststore.password=test1234
ssl.keystore.location=/var/ssl/private/kafka.server.keystore.jks
ssl.keystore.password=test1234
ssl.key.password=test1234

To configure Connect’s SSL endpoint differently than the TLS/SSL connections to the broker, simply define the listeners.https.ssl.* properties with the correct settings. Note that as soon as any listeners.https.ssl.* properties are specified, none of the top level ssl.* properties will apply, so be sure to define all of the necessary listeners.https.ssl.* properties:

listeners=https://myhost:8443
rest.advertised.listener=https
rest.advertised.host.name=<localhost>
rest.advertised.host.port=8083
listeners.https.ssl.client.authentication=requested
listeners.https.ssl.truststore.location=/var/ssl/private/kafka.server.truststore.jks
listeners.https.ssl.truststore.password=test1234
listeners.https.ssl.keystore.location=/var/ssl/private/kafka.server.keystore.jks
listeners.https.ssl.keystore.password=test1234
listeners.https.ssl.key.password=test1234
listeners.https.ssl.endpoint.identification.algorithm=HTTPS

Confluent Replicator

Confluent Replicator is a type of Kafka source connector that replicates data from a source to destination Kafka cluster. An embedded consumer inside Replicator consumes data from the source cluster, and an embedded producer inside the Kafka Connect worker produces data to the destination cluster.

Replicator version 4.0 and earlier requires a connection to ZooKeeper in the origin and destination Kafka clusters. If ZooKeeper is configured for authentication, the client configures the ZooKeeper security credentials via the global JAAS configuration setting -Djava.security.auth.login.config on the Connect workers, and the ZooKeeper security credentials in the origin and destination clusters must be the same.

To configure Confluent Replicator security, you must configure the Replicator connector as shown below and additionally you must configure:

To add TLS/SSL to the Confluent Replicator embedded consumer, modify the Replicator JSON properties file.

Here is an example subset of configuration properties to add for TLS/SSL encryption:

{
  "name":"replicator",
    "config":{
      ....
      "src.kafka.ssl.truststore.location":"/etc/kafka/secrets/kafka.connect.truststore.jks",
      "src.kafka.ssl.truststore.password":"confluent",
      "src.kafka.security.protocol":"SSL"
      ....
    }
  }
}

See also

To see an example Confluent Replicator configuration, see the SSL source demo script. For demos of common security configurations see: Replicator security demos

To configure Confluent Replicator for a destination cluster with TLS/SSL encryption, modify the Replicator JSON configuration to include the following:

{
  "name":"replicator",
    "config":{
      ....
      "dest.kafka.ssl.truststore.location":"/etc/kafka/secrets/kafka.connect.truststore.jks",
      "dest.kafka.ssl.truststore.password":"confluent",
      "dest.kafka.security.protocol":"SSL"
      ....
    }
  }
}

Additionally the following properties are required in the Connect worker:

security.protocol=SSL
ssl.truststore.location=/etc/kafka/secrets/kafka.connect.truststore.jks
ssl.truststore.password=confluent
producer.security.protocol=SSL
producer.ssl.truststore.location=/etc/kafka/secrets/kafka.connect.truststore.jks
producer.ssl.truststore.password=confluent

For more information see the general security configuration for Connect workers.

See also

To see an example Confluent Replicator configuration, see the SSL destination demo script. For demos of common security configurations see: Replicator security demos

Confluent Control Center

Confluent Control Center uses Kafka Streams as a state store, so if all the Kafka brokers in the cluster backing Control Center are secured, then the Control Center application also needs to be secured.

Note

When RBAC is enabled, Control Center cannot be used in conjunction with Kerberos because Control Center cannot support any SASL mechanism other than OAUTHBEARER.

Enable security for the Control Center application as described in the section below. Additionally, configure security for the following components:

Enable SSL for Control Center in the etc/confluent-control-center/control-center.properties file.

confluent.controlcenter.streams.security.protocol=SSL
confluent.controlcenter.streams.ssl.truststore.location=/var/ssl/private/kafka.client.truststore.jks
confluent.controlcenter.streams.ssl.truststore.password=test1234

Confluent Metrics Reporter

This section describes how to enable TLS/SSL encryption for Confluent Metrics Reporter, which is used for Confluent Control Center and Auto Data Balancer.

To add TLS/SSL for the Confluent Metrics Reporter, add the following to the server.properties file on the brokers in the Kafka cluster being monitored.

confluent.metrics.reporter.bootstrap.servers=kafka1:9093
confluent.metrics.reporter.security.protocol=SSL
confluent.metrics.reporter.ssl.truststore.location=/var/ssl/private/kafka.server.truststore.jks
confluent.metrics.reporter.ssl.truststore.password=test1234

Confluent Monitoring Interceptor

Confluent Monitoring Interceptors are used for Confluent Control Center streams monitoring. This section describes how to enable security for Confluent Monitoring Interceptors in three places:

  1. General clients
  2. Kafka Connect
  3. Confluent Replicator

Important

The typical use case for Confluent Monitoring Interceptors is to provide monitoring data to a separate monitoring cluster that most likely has different configurations. Interceptor configurations do not inherit configurations for the monitored component. If you wish to use configurations from the monitored component, you must add the appropriate prefix. For example, the option confluent.monitoring.interceptor.security.protocol=SSL, if being used for a producer, must be prefixed with producer. and would appear as producer.confluent.monitoring.interceptor.security.protocol=SSL.

Interceptors for General Clients

For Confluent Control Center stream monitoring to work with Kafka clients, you must configure TLS/SSL encryption for the Confluent Monitoring Interceptors in each client.

  1. Verify that the client has configured interceptors.
  • Producer:

    interceptor.classes=io.confluent.monitoring.clients.interceptor.MonitoringProducerInterceptor
    
  • Consumer:

    interceptor.classes=io.confluent.monitoring.clients.interceptor.MonitoringConsumerInterceptor
    
  1. Configure SSL encryption for the interceptor.

    confluent.monitoring.interceptor.bootstrap.servers=kafka1:9093
    confluent.monitoring.interceptor.security.protocol=SSL
    confluent.monitoring.interceptor.ssl.truststore.location=/var/ssl/private/kafka.server.truststore.jks
    confluent.monitoring.interceptor.ssl.truststore.password=test1234
    

Interceptors for Kafka Connect

For Confluent Control Center stream monitoring to work with Kafka Connect, you must configure TLS/SSL for the Confluent Monitoring Interceptors in Kafka Connect. Configure the Connect workers by adding these properties in connect-distributed.properties, depending on whether the connectors are sources or sinks.

  • Source connector: configure the Confluent Monitoring Interceptors for TLS/SSL encryption with the producer prefix.

    producer.interceptor.classes=io.confluent.monitoring.clients.interceptor.MonitoringProducerInterceptor
    producer.confluent.monitoring.interceptor.bootstrap.servers=kafka1:9093
    producer.confluent.monitoring.interceptor.security.protocol=SSL
    producer.confluent.monitoring.interceptor.ssl.truststore.location=/var/ssl/private/kafka.server.truststore.jks
    producer.confluent.monitoring.interceptor.ssl.truststore.password=test1234
    
  • Sink connector: configure the Confluent Monitoring Interceptors for TLS/SSL encryption with the consumer prefix.

    consumer.interceptor.classes=io.confluent.monitoring.clients.interceptor.MonitoringConsumerInterceptor
    consumer.confluent.monitoring.interceptor.bootstrap.servers=kafka1:9093
    consumer.confluent.monitoring.interceptor.security.protocol=SSL
    consumer.confluent.monitoring.interceptor.ssl.truststore.location=/var/ssl/private/kafka.server.truststore.jks
    consumer.confluent.monitoring.interceptor.ssl.truststore.password=test1234
    

Interceptors for Replicator

For Confluent Control Center stream monitoring to work with Replicator, you must configure TLS/SSL for the Confluent Monitoring Interceptors in the Replicator JSON configuration file. Here is an example subset of configuration properties to add for SSL encryption.

{
  "name":"replicator",
    "config":{
      ....
      "src.consumer.group.id": "replicator",
      "src.consumer.interceptor.classes": "io.confluent.monitoring.clients.interceptor.MonitoringConsumerInterceptor",
      "src.consumer.confluent.monitoring.interceptor.bootstrap.servers": "kafka1:9093",
      "src.consumer.confluent.monitoring.interceptor.security.protocol": "SSL",
      "src.consumer.confluent.monitoring.interceptor.ssl.truststore.location": "/var/ssl/private/kafka.client.truststore.jks",
      "src.consumer.confluent.monitoring.interceptor.ssl.truststore.password": "test1234",
      ....
    }
  }
}

Enable TLS/SSL in a Self-Balancing cluster

To enable TLS/SSL encryption in a Self-Balancing cluster, add the following to the server.properties file on the brokers in the Kafka cluster.

confluent.rebalancer.metrics.security.protocol=SSL
confluent.rebalancer.metrics.ssl.truststore.location=/etc/kafka/secrets/kafka.client.truststore.jks
confluent.rebalancer.metrics.ssl.truststore.password=confluent
confluent.rebalancer.metrics.ssl.keystore.location=/etc/kafka/secrets/kafka.client.keystore.jks
confluent.rebalancer.metrics.ssl.keystore.password=confluent
confluent.rebalancer.metrics.ssl.key.password=confluent

Schema Registry

Schema Registry uses Kafka to persist schemas, and so it acts as a client to write data to the Kafka cluster. Therefore, if the Kafka brokers are configured for security, you should also configure Schema Registry to use security. You may also refer to the complete list of Schema Registry configuration options.

Here is an example subset of schema-registry.properties configuration parameters to add for TLS/SSL encryption:

kafkastore.bootstrap.servers=SSL://kafka1:9093
kafkastore.security.protocol=SSL
kafkastore.ssl.truststore.location=/var/ssl/private/kafka.server.truststore.jks
kafkastore.ssl.truststore.password=test1234

REST Proxy

Securing Confluent REST Proxy with TLS/SSL encryption requires that you configure security between:

  1. REST clients and the REST Proxy (HTTPS)
  2. REST proxy and the Kafka cluster

You may also refer to the complete list of REST Proxy configuration options.

First, configure HTTPS between REST clients and the REST Proxy. Here is an example subset of kafka-rest.properties configuration parameters to configure HTTPS:

ssl.truststore.location=/var/ssl/private/kafka.server.truststore.jks
ssl.truststore.password=test1234

Then, configure SSL encryption between REST proxy and the Kafka cluster. Here is an example subset of kafka-rest.properties configuration parameters to add for TLS/SSL encryption:

client.bootstrap.servers=kafka1:9093
client.security.protocol=SSL
client.ssl.truststore.location=/var/ssl/private/kafka.server.truststore.jks
client.ssl.truststore.password=test1234

TLS/SSL Logging

Enable TLS/SSL debug logging at the JVM level by starting the Kafka broker and/or clients with the javax.net.debug system property. For example:

export KAFKA_OPTS=-Djavax.net.debug=all
bin/kafka-server-start etc/kafka/server.properties

Tip

These instructions are based on the assumption that you are installing Confluent Platform by using ZIP or TAR archives. For more information, see Install Confluent Platform On-Premises.

Once you start the broker you should be able to see in the server.log:

with addresses: PLAINTEXT -> EndPoint(192.168.64.1,9092,PLAINTEXT),SSL -> EndPoint(192.168.64.1,9093,SSL)

To verify if the server’s keystore and truststore are setup correctly you can run the following command:

openssl s_client -debug -connect localhost:9093 -tls1

Note: TLSv1 should be listed under ssl.enabled.protocols.

In the output of this command you should see the server’s certificate:

-----BEGIN CERTIFICATE-----
{variable sized random bytes}
-----END CERTIFICATE-----
subject=/C=US/ST=CA/L=Santa Clara/O=org/OU=org/CN=Joe Smith
issuer=/C=US/ST=CA/L=Santa Clara/O=org/OU=org/CN=kafka/emailAddress=test@test.com

You can find more details on this in the Oracle documentation on debugging SSL/TLS connections.

If the certificate does not show up with the openssl command, or if there are any other error messages, then your keys or certificates are not setup correctly. Review your configurations.