Security Tutorial

This tutorial provides an example of how to enable security on Confluent Platform.

Table of Contents

Overview

This tutorial provides a step-by-step example to enable SSL encryption, SASL authentication, and authorization on the Confluent Platform with monitoring via Confluent Control Center. It walks through the configuration settings to secure ZooKeeper, Kafka brokers, Kafka Connect, and Confluent Replicator, plus all the components required for monitoring including the Confluent Metrics Reporter and Confluent Monitoring Interceptors.

Note

For simplicity, this tutorial uses the PLAIN mechanism for SASL. For production deployments we recommend SASL/GSSAPI (Kerberos) or SASL/SCRAM.

Prerequisites

Users should understand why it is critical to secure the Confluent Platform and have a conceptual understanding of how encryption, authentication, and authorization work.

Before proceeding with this tutorial:

  • Use the quickstart to bring up the Confluent Platform without security enabled
  • Verify that Confluent Control Center can monitor the cluster and that streams monitoring works without security enabled

Note

Do not proceed with this tutorial until you have verified that you can run the Confluent Platform without security enabled. This helps users focus on just the security configuration additions and validation.

Creating SSL Keys and Certificates

Each machine in the cluster has a public-private key pair, and a certificate to identify the machine. The certificate, however, is unsigned, which means that an attacker can create such a certificate to pretend to be any machine.

Therefore, it is important to prevent forged certificates by signing them for each machine in the cluster. A certificate authority (CA) is responsible for signing certificates. CA works like a government that issues passports - the government validates the identity of the person applying for the passport and then provides a passport in a standard form that is difficult to forge. Other governments verify the form is valid to ensure the passport is authentic. Similarly, the CA signs the certificates, and the cryptography guarantees that a signed certificate is computationally difficult to forge. Thus, as long as the CA is a genuine and trusted authority, the clients have high assurance that they are connecting to the authentic machines.

The keystore stores each machine’s own identity. The truststore stores all the certificates that the machine should trust. Importing a certificate into one’s truststore also means trusting all certificates that are signed by that certificate. As the analogy above, trusting the government (CA) also means trusting all passports (certificates) that it has issued. This attribute is called the chain of trust, and it is particularly useful when deploying SSL on a large Kafka cluster. You can sign all certificates in the cluster with a single CA, and have all machines share the same truststore that trusts the CA. That way all machines can authenticate all other machines.

To deploy SSL, the general steps are:

  • Generate the keys and certificates
  • Create your own Certificate Authority (CA)
  • Sign the certificate

The steps to create keys and sign certificates are elaborated below. You may also adapt a script from confluent-platform-security-tools.git.

The definitions of the parameters used in the steps are the following:

  1. keystore: the location of the keystore
  2. ca-cert: the certificate of the CA
  3. ca-key: the private key of the CA
  4. ca-password: the passphrase of the CA
  5. cert-file: the exported, unsigned certificate of the server
  6. cert-signed: the signed certificate of the server

Generate the keys and certificates

You can use Java’s keytool utility for this process. Please consult the Java documentation for more information on the commands and arguments.

  1. Generate the key and the certificate for each Kafka broker in the cluster. Generate the key into a keystore called kafka.server.keystore so that you can export and sign it later with CA. The keystore file contains the private key of the certificate; therefore, it needs to be kept safely.

    # With user prompts
    keytool -keystore kafka.server.keystore.jks -alias localhost -genkey
    
    # Without user prompts, pass command line arguments
    keytool -keystore kafka.server.keystore.jks -alias localhost -validity {validity} -genkey -storepass {keystore-pass} -keypass {key-pass} -dname {distinguished-name} -ext SAN=DNS:{hostname}
    

Ensure that the common name (CN) exactly matches the fully qualified domain name (FQDN) of the server. The client compares the CN with the DNS domain name to ensure that it is indeed connecting to the desired server, not a malicious one. The hostname of the server can also be specified in the Subject Alternative Name (SAN). Since the distinguished name is used as the server principal when SSL is used as the inter-broker security protocol, it is useful to have hostname as a SAN rather than the CN.

Create your own Certificate Authority (CA)

  1. Generate a CA that is simply a public-private key pair and certificate, and it is intended to sign other certificates.

    openssl req -new -x509 -keyout ca-key -out ca-cert -days {validity}
    
  2. Add the generated CA to the clients’ truststore so that the clients can trust this CA:

    keytool -keystore kafka.client.truststore.jks -alias CARoot -import -file ca-cert
    
  3. Add the generated CA to the brokers’ truststore so that the brokers can trust this CA.

    keytool -keystore kafka.server.truststore.jks -alias CARoot -import -file ca-cert
    

Sign the certificate

To sign all certificates in the keystore with the CA that you generated:

  1. Export the certificate from the keystore:

    keytool -keystore kafka.server.keystore.jks -alias localhost -certreq -file cert-file
    
  2. Sign it with the CA:

    openssl x509 -req -CA ca-cert -CAkey ca-key -in cert-file -out cert-signed -days {validity} -CAcreateserial -passin pass:{ca-password}
    
  3. Import both the certificate of the CA and the signed certificate into the broker keystore:

    keytool -keystore kafka.server.keystore.jks -alias CARoot -import -file ca-cert
    keytool -keystore kafka.server.keystore.jks -alias localhost -import -file cert-signed
    

Summary

Combining the steps described above, the script to create the CA and broker and client truststores and keystores is as follows:

keytool -keystore kafka.server.keystore.jks -alias localhost -validity {validity} -genkey
openssl req -new -x509 -keyout ca-key -out ca-cert -days {validity}
keytool -keystore kafka.client.truststore.jks -alias CARoot -import -file ca-cert
keytool -keystore kafka.server.truststore.jks -alias CARoot -import -file ca-cert
keytool -keystore kafka.server.keystore.jks -alias localhost -certreq -file cert-file
openssl x509 -req -CA ca-cert -CAkey ca-key -in cert-file -out cert-signed -days {validity} -CAcreateserial -passin pass:{ca-password}
keytool -keystore kafka.server.keystore.jks -alias CARoot -import -file ca-cert
keytool -keystore kafka.server.keystore.jks -alias localhost -import -file cert-signed

Note

In this tutorial, the client does not need a keystore because client authentication is done via SASL/PLAIN instead of 2-way SSL. However, if you were doing 2-way SSL authentication, you would create a client keystore and sign all certificates with the CA that you generated, similarly as done for the brokers.

ZooKeeper

For broker to ZooKeeper communication, we will configure optional DIGEST-MD5 authentication. The current version of ZooKeeper bundled with Apache Kafka does not support SSL.

  1. Set the authentication provider in zookeeper.properties:

    authProvider.1=org.apache.zookeeper.server.auth.SASLAuthenticationProvider
    
  2. Configure the zookeeper_jaas.conf file as follows. Note the two semicolons.

    Server {
           org.apache.zookeeper.server.auth.DigestLoginModule required
           user_super="admin-secret"
           user_kafka="kafka-secret";
    };
    
  3. When you start ZooKeeper, pass the name of its JAAS file as a JVM parameter:

    $ export KAFKA_OPTS="-Djava.security.auth.login.config=etc/kafka/zookeeper_jaas.conf"
    $ bin/zookeeper-server-start etc/kafka/zookeeper.properties
    

Brokers

Administrators can configure a mix of secure and unsecured clients. This tutorial ensures that all broker/client and inter-broker network communication is encrypted in the following manner:

  • All broker/client communication use SASL_SSL security protocol, which ensures that the communication is encrypted and authenticated using SASL/PLAIN
  • All inter-broker communication use SSL security protocol, which ensures that the communication is encrypted and authenticated using SSL
  • The unsecured PLAINTEXT port is not enabled

The steps are as follows:

  1. Enable the desired security protocols and ports in each broker’s server.properties. Notice that both SSL and SASL_SSL are enabled.

    listeners=SSL://:9093,SASL_SSL://:9094
    
  2. To enable the brokers to authenticate each other (2-way SSL authentication), you need to configure all the brokers for client authentication (in this case, the requesting broker is the “client”). We recommend setting ssl.client.auth=required – we discourage configuring it as requested because misconfigured brokers will still connect successfully and it provides a false sense of security.

    security.inter.broker.protocol=SSL
    ssl.client.auth=required
    
  3. Define the SSL truststore, keystore, and password in the server.properties file of every broker. Since this stores passwords directly in the broker configuration file, it is important to restrict access to these files via file system permissions.

    ssl.truststore.location=/var/ssl/private/kafka.server.truststore.jks
    ssl.truststore.password=test1234
    ssl.keystore.location=/var/ssl/private/kafka.server.keystore.jks
    ssl.keystore.password=test1234
    ssl.key.password=test1234
    
  4. Enable SASL/PLAIN mechanism in the server.properties file of every broker.

    sasl.enabled.mechanisms=PLAIN
    
  5. Create the broker’s JAAS configuration file in each Kafka broker’s config directory, let’s call it kafka_server_jaas.conf for this example.

    • Configure a KafkaServer section used when the broker validates client connections, including those from other brokers. The properties username and password are used by the broker to initiate connections to other brokers, and in this example, kafkabroker is the user for inter-broker communication. The set of properties user_{userName} defines the passwords for all other clients that connect to the broker. In this example, there are two users kafkabroker and client1.
    • Configure a Client section used when the broker connects to ZooKeeper. There is a single username and password that must match the ZooKeeper jaas configuration.

    Note

    Note the two semicolons in each section.

    KafkaServer {
       org.apache.kafka.common.security.plain.PlainLoginModule required
       username="kafkabroker"
       password="kafkabroker-secret"
       user_kafkabroker="kafkabroker-secret";
       user_client1="client1-secret";
    };
    
    Client {
       org.apache.zookeeper.server.auth.DigestLoginModule required
       username="kafka"
       password="kafka-secret";
    };
    
  1. If you are using Confluent Control Center to monitor your deployment, and if the monitoring cluster backing Confluent Control Center is also configured with the same security protocols, you must configure the Confluent Metrics Reporter for security as well. Add these configurations to the server.properties file of each broker.

    metric.reporters=io.confluent.metrics.reporter.ConfluentMetricsReporter
    confluent.metrics.reporter.security.protocol=SASL_SSL
    confluent.metrics.reporter.ssl.truststore.location=/var/ssl/private/kafka.server.truststore.jks
    confluent.metrics.reporter.ssl.truststore.password=test1234
    confluent.metrics.reporter.sasl.mechanism=PLAIN
    confluent.metrics.reporter.sasl.jaas.config=org.apache.kafka.common.security.plain.PlainLoginModule required \
       username="kafkabroker" \
       password="kafkabroker-secret";
    
  2. To enable ACLs, we need to configure an authorizer. Kafka provides a simple authorizer implementation, and to use it, you can add the following to server.properties:

    authorizer.class.name=kafka.security.auth.SimpleAclAuthorizer
    
  3. The default behavior is such that if a resource has no associated ACLs, then no one is allowed to access the resource, except super users. Setting broker principals as super users is a convenient way to give them the required access to perform inter-broker operations. Since this tutorial configures the inter-broker security protocol as SSL, set the super user name to be the distinguished name configured in the broker’s certificate. (See other authorization configuration options).

    super.users=User:kafkabroker
    

Combining the configuration steps described above, the broker’s server.properties file contains the following configuration settings:

# Enable SSL security protocol for inter-broker communication
# Enable SASL_SSL security protocol for broker-client communication
listeners=SSL://:9093,SASL_SSL://:9094
security.inter.broker.protocol=SSL
ssl.client.auth=required

# Broker security settings
ssl.truststore.location=/var/ssl/private/kafka.server.truststore.jks
ssl.truststore.password=test1234
ssl.keystore.location=/var/ssl/private/kafka.server.keystore.jks
ssl.keystore.password=test1234
ssl.key.password=test1234
sasl.enabled.mechanisms=PLAIN

# Confluent Metrics Reporter for monitoring with Confluent Control Center
metric.reporters=io.confluent.metrics.reporter.ConfluentMetricsReporter
confluent.metrics.reporter.security.protocol=SASL_SSL
confluent.metrics.reporter.ssl.truststore.location=/var/ssl/private/kafka.server.truststore.jks
confluent.metrics.reporter.ssl.truststore.password=test1234
confluent.metrics.reporter.sasl.mechanism=PLAIN
confluent.metrics.reporter.sasl.jaas.config=org.apache.kafka.common.security.plain.PlainLoginModule required \
   username="kafkabroker" \
   password="kafkabroker-secret";

# ACLs
authorizer.class.name=kafka.security.auth.SimpleAclAuthorizer
super.users=User:kafkabroker

Note

This is not the full Broker configuration. This is just the additional configurations required to enable security on a known working Kafka cluster of brokers that is already successfully monitored via Confluent Control Center.

Start each Kafka broker. Pass the name of the JAAS file as a JVM parameter:

$ export KAFKA_OPTS=-Djava.security.auth.login.config=/etc/kafka/kafka_server_jaas.conf
$ bin/kafka-server-start etc/kafka/server.properties

Clients

Common Configuration

Any component that interacts with secured Kafka brokers is a client and must be configured for security as well. These clients include Kafka Connect workers and certain connectors such as Replicator, Kafka Streams API clients, KSQL clients, non-Java clients, Confluent Control Center, Confluent Schema Registry, REST Proxy, etc.

All clients share a general set of security configuration parameters required to interact with a secured Kafka cluster:

  1. To encrypt via SSL and authenticate via SASL, configure the security protocol to use SASL_SSL. (If you wanted SSL for both encryption and authentication without SASL, the security protocol would be SSL).

    security.protocol=SASL_SSL
    
  2. To configure SSL encryption truststore settings, set the truststore configuration parameters. In this tutorial, the client does not need the keystore because authentication is done via SASL/PLAIN instead of 2-way SSL.

    ssl.truststore.location=/var/ssl/private/kafka.client.truststore.jks
    ssl.truststore.password=test1234
    
  3. To configure SASL authentication, set the SASL mechanism, which in this tutorial is PLAIN. Then configure the JAAS configuration property to describe to connect to the Kafka brokers. The properties username and password are used to configure the user for connections.

    sasl.mechanism=PLAIN
    sasl.jaas.config=org.apache.kafka.common.security.plain.PlainLoginModule required \
       username="client1" \
       password="client1-secret";
    

Combining the configuration steps above, the client’s general pattern for enabling SSL encryption and SASL/PLAIN authentication is to add the following to the client’s properties file.

security.protocol=SASL_SSL
ssl.truststore.location=/var/ssl/private/kafka.client.truststore.jks
ssl.truststore.password=test1234
sasl.mechanism=PLAIN
sasl.jaas.config=org.apache.kafka.common.security.plain.PlainLoginModule required \
    username=\"client\" \
    password=\"client-secret\";

What differs between the clients is the specific configuration prefix that precedes each configuration parameter, as described in the sections below.

Console Producer and Consumer

The command line tools for console producer and consumer are convenient ways to send and receive a small amount of data to the cluster. They are clients and thus need security configurations as well.

  1. Create a client_security.properties file with the security configuration parameters described above, with no additional configuration prefix.
security.protocol=SASL_SSL
ssl.truststore.location=/var/ssl/private/kafka.client.truststore.jks
ssl.truststore.password=test1234
sasl.mechanism=PLAIN
sasl.jaas.config=org.apache.kafka.common.security.plain.PlainLoginModule required \
    username=\"client\" \
    password=\"client-secret\";
  1. Pass in the properties file when using the command line tools.

    $ kafka-console-producer --broker-list kafka1:9094 --topic test-topic --producer.config client_security.properties
    $ kafka-console-consumer --bootstrap-server kafka1:9094 --topic test-topic --consumer.config client_security.properties
    

KSQL and Stream Processing Clients

Enabling KSQL and stream processing clients for security is simply a matter of passing the security configurations to the relevant client constructor.

Take the basic client security configuration:

security.protocol=SASL_SSL
ssl.truststore.location=/var/ssl/private/kafka.client.truststore.jks
ssl.truststore.password=test1234
sasl.mechanism=PLAIN
sasl.jaas.config=org.apache.kafka.common.security.plain.PlainLoginModule required \
    username=\"client\" \
    password=\"client-secret\";

And configure the application for the following:

  • Top-level, with no additional configuration prefix
  • Confluent Monitoring Interceptor producer used for Confluent Control Center streams monitoring, with an additional configuration prefix producer.confluent.monitoring.interceptor.
  • Confluent Monitoring Interceptor consumer used for Confluent Control Center streams monitoring, with an additional configuration prefix consumer.confluent.monitoring.interceptor.

Combining these configurations, KSQL configuration for SSL encryption and SASL/PLAIN authentication is the following. You may configure them by either loading the properties from a file, as shown below, or by setting the properties programmatically.

# Top level
security.protocol=SASL_SSL
ssl.truststore.location=/var/ssl/private/kafka.client.truststore.jks
ssl.truststore.password=test1234
sasl.mechanism=PLAIN
sasl.jaas.config=org.apache.kafka.common.security.plain.PlainLoginModule required username="client1" password="client1-secret";

# Embedded producer for streams monitoring with Confluent Control Center
producer.interceptor.classes=io.confluent.monitoring.clients.interceptor.MonitoringProducerInterceptor
producer.confluent.monitoring.interceptor.security.protocol=SASL_SSL
producer.confluent.monitoring.interceptor.ssl.truststore.location=/var/ssl/private/kafka.client.truststore.jks
producer.confluent.monitoring.interceptor.ssl.truststore.password=test1234
producer.confluent.monitoring.interceptor.sasl.mechanism=PLAIN
producer.confluent.monitoring.interceptor.sasl.jaas.config=org.apache.kafka.common.security.plain.PlainLoginModule required username="client1" password="client1-secret";

# Embedded consumer for streams monitoring with Confluent Control Center
consumer.interceptor.classes=io.confluent.monitoring.clients.interceptor.MonitoringConsumerInterceptor
consumer.confluent.monitoring.interceptor.security.protocol=SASL_SSL
consumer.confluent.monitoring.interceptor.ssl.truststore.location=/var/ssl/private/kafka.client.truststore.jks
consumer.confluent.monitoring.interceptor.ssl.truststore.password=test1234
consumer.confluent.monitoring.interceptor.sasl.mechanism=PLAIN
consumer.confluent.monitoring.interceptor.sasl.jaas.config=org.apache.kafka.common.security.plain.PlainLoginModule required username="client1" password="client1-secret";

Note

This is not the full configuration. This is just the additional configurations required to enable security on a known working KSQL or Java application that is monitored via Confluent Control Center.

Kafka Connect

From the perspective of the brokers, Kafka Connect is another client, and this tutorial configures Connect for SSL encryption and SASL/PLAIN authentication. Enabling Kafka Connect for security is simply a matter of passing the security configurations to the Connect workers, the producers used by source connectors, and the consumers used by sink connectors.

Take the basic client security configuration:

security.protocol=SASL_SSL
ssl.truststore.location=/var/ssl/private/kafka.client.truststore.jks
ssl.truststore.password=test1234
sasl.mechanism=PLAIN
sasl.jaas.config=org.apache.kafka.common.security.plain.PlainLoginModule required \
    username=\"client\" \
    password=\"client-secret\";

And configure Kafka Connect for the following:

  • Top-level for Connect workers, with no additional configuration prefix
  • Embedded producer for source connectors, with an additional configuration prefix producer.
  • Embedded consumers for sink connectors, with an additional configuration prefix consumer.
  • Confluent Monitoring Interceptor producer for source connectors used for Confluent Control Center streams monitoring, with an additional configuration prefix producer.confluent.monitoring.interceptor.
  • Confluent Monitoring Interceptor consumer for sink connectors used for Confluent Control Center streams monitoring, with an additional configuration prefix consumer.confluent.monitoring.interceptor.

Combining these configurations, a Kafka Connect worker configuration for SSL encryption and SASL/PLAIN authentication is the following. You may configure these settings in the connect-distributed.properties file.

# Connect worker
security.protocol=SASL_SSL
ssl.truststore.location=/var/ssl/private/kafka.client.truststore.jks
ssl.truststore.password=test1234
sasl.mechanism=PLAIN
sasl.jaas.config=org.apache.kafka.common.security.plain.PlainLoginModule required \
   username="connect" \
   password="connect-secret";

# Embedded producer for source connectors
producer.security.protocol=SASL_SSL
producer.ssl.truststore.location=/var/ssl/private/kafka.client.truststore.jks
producer.ssl.truststore.password=test1234
producer.sasl.mechanism=PLAIN
producer.sasl.jaas.config=org.apache.kafka.common.security.plain.PlainLoginModule required \
  username="connect" \
  password="connect-secret";

# Embedded consumer for sink connectors
consumer.security.protocol=SASL_SSL
consumer.ssl.truststore.location=/var/ssl/private/kafka.client.truststore.jks
consumer.ssl.truststore.password=test1234
consumer.sasl.mechanism=PLAIN
consumer.sasl.jaas.config=org.apache.kafka.common.security.plain.PlainLoginModule required \
  username="connect" \
  password="connect-secret";

# Embedded producer for source connectors for streams monitoring with Confluent Control Center
producer.interceptor.classes=io.confluent.monitoring.clients.interceptor.MonitoringProducerInterceptor
producer.confluent.monitoring.interceptor.security.protocol=SASL_SSL
producer.confluent.monitoring.interceptor.ssl.truststore.location=/var/ssl/private/kafka.client.truststore.jks
producer.confluent.monitoring.interceptor.ssl.truststore.password=test1234
producer.confluent.monitoring.interceptor.sasl.mechanism=PLAIN
producer.confluent.monitoring.interceptor.sasl.jaas.config=org.apache.kafka.common.security.plain.PlainLoginModule required \
  username="connect" \
  password="connect-secret";

# Embedded consumer for sink connectors for streams monitoring with Confluent Control Center
consumer.interceptor.classes=io.confluent.monitoring.clients.interceptor.MonitoringConsumerInterceptor
consumer.confluent.monitoring.interceptor.security.protocol=SASL_SSL
consumer.confluent.monitoring.interceptor.ssl.truststore.location=/var/ssl/private/kafka.client.truststore.jks
consumer.confluent.monitoring.interceptor.ssl.truststore.password=test1234
consumer.confluent.monitoring.interceptor.sasl.mechanism=PLAIN
consumer.confluent.monitoring.interceptor.sasl.jaas.config=org.apache.kafka.common.security.plain.PlainLoginModule required \
  username="connect" \
  password="connect-secret";

Note

This is not the full Connect worker configuration. This is just the additional configurations required to enable security on a known working Kafka Connect cluster that is already successfully monitored via Confluent Control Center.

Pass in the properties file when starting each Connect worker.

$ bin/connect-distributed etc/kafka/connect-distributed.properties

Replicator

Confluent Replicator is a type of Kafka source connector that replicates data from a source to destination Kafka cluster. An embedded consumer inside Replicator consumes data from the source cluster, and an embedded producer inside the Kafka Connect worker produces data to the destination cluster.

Take the basic client security configuration:

security.protocol=SASL_SSL
ssl.truststore.location=/var/ssl/private/kafka.client.truststore.jks
ssl.truststore.password=test1234
sasl.mechanism=PLAIN
sasl.jaas.config=org.apache.kafka.common.security.plain.PlainLoginModule required \
    username=\"client\" \
    password=\"client-secret\";

And configure Replicator for the following:

  • Top-level Replicator consumer from the origin cluster, with an additional configuration prefix src.kafka.
  • Confluent Monitoring Interceptor consumer from the origin cluster used for Confluent Control Center streams monitoring, with an additional configuration prefix src.consumer.confluent.monitoring.interceptor.

Combining the configuration steps described above, the Replicator JSON properties file contains the following configuration settings:

{
  "name":"replicator",
  "config":{
    ....
    "src.kafka.security.protocol" : "SASL_SSL",
    "src.kafka.ssl.truststore.location" : "var/private/ssl/kafka.server.truststore.jks",
    "src.kafka.ssl.truststore.password" : "test1234",
    "src.kafka.sasl.mechanism" : "PLAIN",
    "src.kafka.sasl.jaas.config" : "org.apache.kafka.common.security.plain.PlainLoginModule required username=\"replicator\" password=\"replicator-secret\";",
    "src.consumer.interceptor.classes": "io.confluent.monitoring.clients.interceptor.MonitoringConsumerInterceptor",
    "src.consumer.confluent.monitoring.interceptor.security.protocol": "SASL_SSL",
    "src.consumer.confluent.monitoring.interceptor.ssl.truststore.location": "/var/ssl/private/kafka.client.truststore.jks",
    "src.consumer.confluent.monitoring.interceptor.ssl.truststore.password": "confluent",
    "src.consumer.confluent.monitoring.interceptor.sasl.mechanism": "PLAIN",
    "src.consumer.confluent.monitoring.interceptor.sasl.jaas.config": "org.apache.kafka.common.security.plain.PlainLoginModule required username=\"client1\" password=\"client1-secret\";",
    ....
  }
}

Note

This is not the full Replicator configuration. This is just the additional configurations required to enable security on a known working Replicator connector that is already successfully monitored via Confluent Control Center.

Once Kafka Connect is started, you can add the Confluent Replicator.

$ curl -X POST -H "Content-Type: application/json" --data @replicator_properties.json http://connect:8083/connectors

Confluent Control Center

Confluent Control Center uses Kafka Streams for stream processing, so if all the Kafka brokers in the monitoring cluster backing Control Center are secured, then the Control Center application, another client, also needs to be secured.

Take the basic client security configuration and add the configuration prefix confluent.controlcenter.streams. Make all the following modifications in the etc/confluent-control-center/control-center.properties file.

confluent.controlcenter.streams.security.protocol=SASL_SSL
confluent.controlcenter.streams.ssl.truststore.location=/var/ssl/private/kafka.client.truststore.jks
confluent.controlcenter.streams.ssl.truststore.password=test1234
confluent.controlcenter.streams.sasl.mechanism=PLAIN
confluent.controlcenter.streams.sasl.jaas.config=org.apache.kafka.common.security.plain.PlainLoginModule required \
  username="confluent" \
  password="confluent-secret";

Note

This is not the full Confluent Control Center configuration. This is just the additional configurations required to enable security on a known working Confluent Control Center deployment.

Start Confluent Control Center.

$ bin/control-center-start etc/confluent-control-center/control-center.properties

Confluent Metrics Reporter

If you are using Confluent Control Center to monitor your deployment, the Confluent Metrics Reporter is a client as well. If the monitoring cluster backing Confluent Control Center is also configured with the same security protocols, configure the Confluent Metrics Reporter for security in each broker’s server.properties file. The configuration prefix is confluent.metrics.reporter. and is described in above.

Confluent Monitoring Interceptors

Configure security for whichever components are using Confluent Monitoring Interceptors to report stream monitoring statistics in Confluent Control Center.

# Embedded producer for streams monitoring with Confluent Control Center
producer.interceptor.classes=io.confluent.monitoring.clients.interceptor.MonitoringProducerInterceptor
producer.confluent.monitoring.interceptor.security.protocol=SASL_SSL
producer.confluent.monitoring.interceptor.ssl.truststore.location=/var/ssl/private/kafka.client.truststore.jks
producer.confluent.monitoring.interceptor.ssl.truststore.password=test1234
producer.confluent.monitoring.interceptor.sasl.mechanism=PLAIN
producer.confluent.monitoring.interceptor.sasl.jaas.config=org.apache.kafka.common.security.plain.PlainLoginModule required username="client1" password="client1-secret";

# Embedded consumer for streams monitoring with Confluent Control Center
consumer.interceptor.classes=io.confluent.monitoring.clients.interceptor.MonitoringConsumerInterceptor
consumer.confluent.monitoring.interceptor.security.protocol=SASL_SSL
consumer.confluent.monitoring.interceptor.ssl.truststore.location=/var/ssl/private/kafka.client.truststore.jks
consumer.confluent.monitoring.interceptor.ssl.truststore.password=test1234
consumer.confluent.monitoring.interceptor.sasl.mechanism=PLAIN
consumer.confluent.monitoring.interceptor.sasl.jaas.config=org.apache.kafka.common.security.plain.PlainLoginModule required username="client1" password="client1-secret";

Authorization and ACLs

Use kafka-acls (the Kafka Authorizer CLI) to add, remove or list ACLs. The most common use cases for ACL management are adding/removing a principal as a producer or consumer and there are convenience options to handle these cases. In order to add a client called client1, which is part of a group called test, as a producer and consumer of Test-topic we can execute the following:

$ kafka-acls --authorizer-properties zookeeper.connect=zookeeper:2181 \
   --add --allow-principal User:client1 \
   --producer --topic Test-topic

$ kafka-acls --authorizer-properties zookeeper.connect=zookeeper:2181 \
   --add --allow-principal User:client1 \
   --consumer --topic Test-topic --group test

Read our documentation for more information on authorization and ACLs on the Confluent Platform.

Troubleshooting

It may happen that things don’t work on the first attempt when it comes to configuring security systems. Debugging output can be quite helpful in order to diagnose the cause of the problem.

  1. Validate the keys and certificates in the keystores and truststores in the brokers and clients.

    $ keytool -list -v -keystore /var/ssl/private/kafka.server.keystore.jks
    
  2. Enable Kafka authorization logging by modifying the etc/kafka/log4j.properties file. Change the log level to DEBUG, and then restart the brokers.

    log4j.logger.kafka.authorizer.logger=DEBUG, authorizerAppender
    
  3. Enable SSL debug output via the the javax.net.debug system property, which does require a restart of the JVM.

    $ export KAFKA_OPTS=-Djavax.net.debug=all
    
  4. Enable SASL debug output via the sun.security.krb5.debug system property, which does require a restart of the JVM.

    $ export KAFKA_OPTS=-Dsun.security.krb5.debug=true
    

Next Steps

To see a fully secured multi-node cluster, check out our Docker-based Confluent Platform demo. It shows entire configurations, including security-related and non security-related configuration paramters, on all components in the Confluent Platform, and the demo’s playbook has a security section for further learning.

Read our documentation for more on security design and configuration on all components in the Confluent Platform. While this tutorial uses the PLAIN mechanism for the SASL examples, Confluent additionally supports GSSAPI (Kerberos) and SCRAM, which are more suitable for production.

We would love to hear your feedback in the Confluent community security channel in Slack!