.. _security_tutorial: Security Tutorial ================= .. important:: .. include:: ../includes/zk-deprecation.rst This tutorial provides an example of how to enable security on a |zk|-based |cp|. .. tip:: For RBAC enablement, see :ref:`rbac-mds-config`. Overview ~~~~~~~~ This tutorial provides a step-by-step example to enable :ref:`TLS/SSL encryption`, :ref:`SASL authentication`, and :ref:`authorization` on |cp| with monitoring using |c3|. Follow the steps to walk through configuration settings for securing |zk|, |ak-tm| brokers, |kconnect-long|, and |crep-full|, plus all the components required for monitoring, including the |cmetric-full| and Confluent Monitoring Interceptors. .. note:: * For simplicity, this tutorial uses :ref:`SASL/PLAIN (or PLAIN) `, a simple username/password authentication mechanism typically used with TLS encryption to implement secure authentication. * For production deployments of |cp|, :ref:`SASL/GSSAPI (Kerberos) ` or :ref:`SASL/SCRAM ` is recommended. * |ccloud| uses :ref:`SASL/PLAIN (or PLAIN) ` over TLS v1.2 encryption for authentication because it offers broad client support while providing a good level of security. The usernames and passwords used in the SASL exchange are API keys and secrets that should be securely managed using a secrets store and rotated periodically. .. note:: If you use a backslash character in property values, you must escape the backslash character when entering a value in a |cp| configuration file or when using |confluent-cli|. For example, if a password property value is ``1\o/-r@c6pD`` (including a valid backslash character), then you need to enter it as ``1\\o/-r@c6pD`` in, for example, a ``server.properties`` file. Otherwise, the following error message appears: .. code-block:: text Caused by: java.io.IOException: Keystore was tampered with, or password was incorrect If you use |confluent-cli| for secrets protection, and include a backslash character (without escaping it) in a property value you are encrypting, then the following error message appears: ``Error: properties: Line xxx: invalid unicode literal``, where ``xxx`` would be the line with the backslash. Prerequisites ^^^^^^^^^^^^^ You should understand why it is critical to secure |cp| and have a conceptual understanding of how encryption, authentication, and authorization work. Before proceeding with this tutorial: * Use the :ref:`quickstart` to bring up |cp| *without* security enabled .. note:: Do not proceed with this tutorial until you have verified that you can run |cp| *without* security enabled. This helps users focus on just the security configuration additions and validation. .. _generating_keys_certs: Creating TLS/SSL Keys and Certificates ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~ Each machine in the cluster has a public-private key pair, and a certificate to identify the machine. The certificate, however, is unsigned, which means that an attacker can create such a certificate to pretend to be any machine. Therefore, it is important to prevent forged certificates by signing them for each machine in the cluster. A certificate authority (CA) is responsible for signing certificates. CA works like a government that issues passports - the government validates the identity of the person applying for the passport and then provides a passport in a standard form that is difficult to forge. Other governments verify the form is valid to ensure the passport is authentic. Similarly, the CA signs the certificates, and the cryptography guarantees that a signed certificate is computationally difficult to forge. Thus, as long as the CA is a genuine and trusted authority, the clients have high assurance that they are connecting to the authentic machines. The keystore stores each machine’s own identity. The truststore stores all the certificates that the machine should trust. Importing a certificate into one’s truststore also means trusting all certificates that are signed by that certificate. As the analogy above, trusting the government (CA) also means trusting all passports (certificates) that it has issued. This attribute is called the chain of trust, and it is particularly useful when deploying SSL on a large |ak| cluster. You can sign all certificates in the cluster with a single CA, and have all machines share the same truststore that trusts the CA. That way all machines can authenticate all other machines. .. important:: OpenTLS/SSL certificates may include an `Extended Key Usage `__ extension (``extendedKeyUsage``) to control the purpose for which the certificate public key can be used. If this field is empty, there are no restrictions on usage, but if any usage is specified, valid TLS/SSL implementations must enforce the restrictions. Extension key usages are relevant for client and server authentication. |ak| brokers require both client and server authentication for intracluster communication because every broker is both the client and the server for the other brokers. Some corporate CAs may have a signing profile for web servers †hat is used for |ak| as well and only include the ``serverAuth`` usage value, causing the TLS/SSL handshake to fail. .. _broker-key-script: To deploy SSL, the general steps are: * Generate the keys and certificates * Create your own Certificate Authority (CA) * Sign the certificate The steps to create keys and sign certificates are enumerated below. You may also adapt the ``kafka-generate-ssl`` script from `confluent-platform-security-tools.git `_. The definitions of the parameters used in the steps are as follows: #. keystore: the location of the keystore #. ca-cert: the certificate of the CA #. ca-key: the private key of the CA #. ca-password: the passphrase of the CA #. cert-file: the exported, unsigned certificate of the server #. cert-signed: the signed certificate of the server .. note:: After a connection has been established, |ak| does not perform certificate renegotiations or revocations. In cases where a certificate is compromised, or you wish to revoke a certificate, use ACL blacklists (specifically, the ``--deny-principal`` or ``--deny-host`` options) to remove a specific client. For details about ACLs, see :ref:`kafka_authorization`. Configuring Host Name Verification ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^ Host name verification of servers is enabled by default for client connections as well as interbroker connections to prevent man-in-the-middle attacks. Server host name verification may be disabled by setting ``ssl.endpoint.identification.algorithm`` to an empty string. For example, .. codewithvars:: bash ssl.endpoint.identification.algorithm= For dynamically configured broker listeners, hostname verification may be disabled using ``kafka-configs``. For example, .. codewithvars:: bash ./bin/kafka-configs --bootstrap-server localhost:9093 --entity-type brokers --entity-name 0 --alter \ --add-config "listener.name.internal.ssl.endpoint.identification.algorithm=" Configuring Host Name In Certificates ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^ If host name verification is enabled, clients will verify the server's fully qualified domain name (FQDN) against one of the following two fields: * Common Name (CN) * Subject Alternative Name (SAN) Both fields are valid, however `RFC-2818 `_ recommends the use of SAN. SAN is also more flexible, allowing for multiple DNS entries to be declared. Another advantage is that the CN can be set to a more meaningful value for authorization purposes. To add a SAN field, append the argument ``-ext SAN=DNS:{FQDN}`` to the keytool command: .. codewithvars:: bash keytool -keystore server.keystore.jks -alias localhost -validity {validity} -genkey -keyalg RSA -ext SAN=DNS:{FQDN} The following command can be run afterwards to verify the contents of the generated certificate: .. codewithvars:: bash keytool -list -v -keystore server.keystore.jks Generate the keys and certificates ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^ You can use Java’s ``keytool`` utility for this process. Consult the `Java documentation `_ for more information on the commands and arguments. 1. Generate the key and the certificate for each |ak| broker in the cluster. Generate the key into a keystore called ``kafka.server.keystore`` so that you can export and sign it later with CA. The keystore file contains the private key of the certificate; therefore, it needs to be kept safely. .. codewithvars:: bash # With user prompts keytool -keystore kafka.server.keystore.jks -alias localhost -keyalg RSA -genkey # Without user prompts, pass command line arguments keytool -keystore kafka.server.keystore.jks -alias localhost -keyalg RSA -validity {validity} -genkey -storepass {keystore-pass} -keypass {key-pass} -dname {distinguished-name} -ext SAN=DNS:{hostname} Ensure that the common name (CN) exactly matches the fully qualified domain name (FQDN) of the server. The client compares the CN with the DNS domain name to ensure that it is indeed connecting to the desired server, not a malicious one. The hostname of the server can also be specified in the Subject Alternative Name (SAN). Since the distinguished name is used as the server principal when TLS/SSL is used as the interbroker security protocol, it is useful to have hostname as a SAN rather than the CN. Create your own Certificate Authority (CA) ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^ 2. Generate a CA that is simply a public-private key pair and certificate, and it is intended to sign other certificates. .. codewithvars:: bash openssl req -new -x509 -keyout ca-key -out ca-cert -days {validity} 3. Add the generated CA to the **clients’ truststore** so that the clients can trust this CA: .. codewithvars:: bash keytool -keystore kafka.client.truststore.jks -alias CARoot -importcert -file ca-cert 4. Add the generated CA to the **brokers' truststore** so that the brokers can trust this CA. .. codewithvars:: bash keytool -keystore kafka.server.truststore.jks -alias CARoot -importcert -file ca-cert Sign the certificate ^^^^^^^^^^^^^^^^^^^^ To sign all certificates in the keystore with the CA that you generated: 5. Export the certificate from the keystore: .. codewithvars:: bash keytool -keystore kafka.server.keystore.jks -alias localhost -certreq -file cert-file 6. Sign it with the CA: .. codewithvars:: bash openssl x509 -req -CA ca-cert -CAkey ca-key -in cert-file -out cert-signed -days {validity} -CAcreateserial -passin pass:{ca-password} 7. Import both the certificate of the CA and the signed certificate into the broker keystore: .. codewithvars:: bash keytool -keystore kafka.server.keystore.jks -alias CARoot -importcert -file ca-cert keytool -keystore kafka.server.keystore.jks -alias localhost -importcert -file cert-signed Summary ^^^^^^^ Combining the steps described above, the script to create the CA and broker and client truststores and keystores is as follows: .. codewithvars:: bash keytool -keystore kafka.server.keystore.jks -alias localhost -keyalg RSA -validity {validity} -genkey openssl req -new -x509 -keyout ca-key -out ca-cert -days {validity} keytool -keystore kafka.client.truststore.jks -alias CARoot -importcert -file ca-cert keytool -keystore kafka.server.truststore.jks -alias CARoot -importcert -file ca-cert keytool -keystore kafka.server.keystore.jks -alias localhost -certreq -file cert-file openssl x509 -req -CA ca-cert -CAkey ca-key -in cert-file -out cert-signed -days {validity} -CAcreateserial -passin pass:{ca-password} keytool -keystore kafka.server.keystore.jks -alias CARoot -importcert -file ca-cert keytool -keystore kafka.server.keystore.jks -alias localhost -importcert -file cert-signed .. note:: In this tutorial, the client does not need a keystore because client authentication is done using SASL/PLAIN instead of mutual TLS (mTLS). However, if you use mTLS authentication, you create a client keystore and sign all certificates with the CA that you generated, similarly as done for the brokers. Configure |zk| ~~~~~~~~~~~~~~ For broker to |zk| communication, you will configure optional `DIGEST-MD5 `__ authentication. #. Set the authentication provider: .. codewithvars:: bash authProvider.sasl=org.apache.zookeeper.server.auth.SASLAuthenticationProvider #. Configure the ``zookeeper_jaas.conf`` file as follows. Note the two semicolons. .. codewithvars:: bash Server { org.apache.zookeeper.server.auth.DigestLoginModule required user_super="admin-secret" user_kafka="kafka-secret"; }; #. When you start |zk|, pass the name of its JAAS file as a JVM parameter: .. code-block:: shell export KAFKA_OPTS="-Djava.security.auth.login.config=etc/kafka/zookeeper_jaas.conf" zookeeper-server-start etc/kafka/zookeeper.properties .. include:: ../includes/installation-types-zip-tar.rst .. _sec-tut-config-brokers: Configure Brokers ~~~~~~~~~~~~~~~~~ Administrators can configure a mix of secure and unsecured clients. This tutorial ensures that all broker/client and interbroker network communication is encrypted in the following manner: * All broker/client communication use ``SASL_SSL`` security protocol, which ensures that the communication is encrypted and authenticated using SASL/PLAIN * All interbroker communication use ``SSL`` security protocol, which ensures that the communication is encrypted and authenticated using SSL * The unsecured ``PLAINTEXT`` port is not enabled The steps are as follows: 1. Enable the desired security protocols and ports in each broker's ``server.properties``. Notice that both ``SSL`` and ``SASL_SSL`` are enabled. .. codewithvars:: bash listeners=SSL://:9093,SASL_SSL://:9094 2. To enable the brokers to authenticate each other (mutual TLS (mTLS) authentication), you need to configure all the brokers for client authentication (in this case, the requesting broker is the "client"). We recommend setting ``ssl.client.auth=required``. We discourage configuring it as ``requested`` because misconfigured brokers will still connect successfully and it provides a false sense of security. .. codewithvars:: bash security.inter.broker.protocol=SSL ssl.client.auth=required 3. Define the TLS/SSL truststore, keystore, and password in the ``server.properties`` file of every broker. Since this stores passwords directly in the broker configuration file, it is important to restrict access to these files using file system permissions. .. codewithvars:: bash ssl.truststore.location=/var/ssl/private/kafka.server.truststore.jks ssl.truststore.password=test1234 ssl.keystore.location=/var/ssl/private/kafka.server.keystore.jks ssl.keystore.password=test1234 ssl.key.password=test1234 4. Enable SASL/PLAIN mechanism in the ``server.properties`` file of every broker. .. codewithvars:: bash sasl.enabled.mechanisms=PLAIN 5. Create the broker's JAAS configuration file in each |ak| broker's ``config`` directory, let's call it ``kafka_server_jaas.conf`` for this example. * Configure a ``KafkaServer`` section used when the broker validates client connections, including those from other brokers. The broker properties ``username`` and ``password`` are used to initiate connections to other brokers, and in this example, ``kafkabroker`` is the user for interbroker communication. The ``user_{userName}`` property set defines the passwords for all other clients that connect to the broker. In this example, there are two users ``kafkabroker`` and ``client``. * Configure a ``Client`` section used when the broker connects to |zk|. There is a single username and password that must match the |zk| JAAS configuration. .. note:: Note the two semicolons in each section. .. codewithvars:: bash KafkaServer { org.apache.kafka.common.security.plain.PlainLoginModule required username="kafkabroker" password="kafkabroker-secret" user_kafkabroker="kafkabroker-secret" user_kafka-broker-metric-reporter="kafkabroker-metric-reporter-secret" user_client="client-secret"; }; Client { org.apache.zookeeper.server.auth.DigestLoginModule required username="kafka" password="kafka-secret"; }; .. _tutorial_metrics_reporter: 6. If you are using |c3| to monitor your deployment, and if the monitoring cluster backing |c3| is also configured with the same security protocols, you must configure the |cmetric-full| for security as well. Add these configurations to the ``server.properties`` file of each broker. .. codewithvars:: bash metric.reporters=io.confluent.metrics.reporter.ConfluentMetricsReporter confluent.metrics.reporter.security.protocol=SASL_SSL confluent.metrics.reporter.ssl.truststore.location=/var/ssl/private/kafka.server.truststore.jks confluent.metrics.reporter.ssl.truststore.password=test1234 confluent.metrics.reporter.sasl.mechanism=PLAIN confluent.metrics.reporter.sasl.jaas.config=org.apache.kafka.common.security.plain.PlainLoginModule required \ username="kafka-broker-metric-reporter" \ password="kafka-broker-metric-reporter-secret"; 7. To enable ACLs, we need to configure an authorizer. |ak| provides a simple authorizer implementation, and to use it, you can add the following to ``server.properties``: .. codewithvars:: shell authorizer.class.name=kafka.security.auth.SimpleAclAuthorizer 8. The default behavior is such that if a resource has no associated ACLs, then no one is allowed to access the resource, except super users. Setting broker principals as super users is a convenient way to give them the required access to perform interbroker operations. Because this tutorial configures the interbroker security protocol as SSL, set the super user name to be the ``distinguished name`` configured in the broker's certificate. (See other :ref:`authorization configuration options`). .. codewithvars:: bash super.users=User:;User:;User:;User:kafka-broker-metric-reporter Combining the configuration steps described above, the broker's ``server.properties`` file contains the following configuration settings: .. codewithvars:: bash # Enable TLS/SSL security protocol for interbroker communication # Enable SASL_SSL security protocol for broker-client communication listeners=SSL://:9093,SASL_SSL://:9094 security.inter.broker.protocol=SSL ssl.client.auth=required # Broker security settings ssl.truststore.location=/var/ssl/private/kafka.server.truststore.jks ssl.truststore.password=test1234 ssl.keystore.location=/var/ssl/private/kafka.server.keystore.jks ssl.keystore.password=test1234 ssl.key.password=test1234 sasl.enabled.mechanisms=PLAIN # Confluent Metrics Reporter for monitoring with Confluent Control Center metric.reporters=io.confluent.metrics.reporter.ConfluentMetricsReporter confluent.metrics.reporter.security.protocol=SASL_SSL confluent.metrics.reporter.ssl.truststore.location=/var/ssl/private/kafka.server.truststore.jks confluent.metrics.reporter.ssl.truststore.password=test1234 confluent.metrics.reporter.sasl.mechanism=PLAIN confluent.metrics.reporter.sasl.jaas.config=org.apache.kafka.common.security.plain.PlainLoginModule required \ username="kafka-broker-metric-reporter" \ password="kafkabroker-metric-reporter-secret"; # ACLs authorizer.class.name=kafka.security.auth.SimpleAclAuthorizer super.users=User:;User:;User: .. note:: This is not the full Broker configuration. This is just the additional configurations required to enable security on a known working |ak| cluster of brokers that is already successfully monitored via |c3|. Start each |ak| broker. Pass the name of the JAAS file as a JVM parameter: .. code-block:: shell export KAFKA_OPTS=-Djava.security.auth.login.config=/etc/kafka/kafka_server_jaas.conf kafka-server-start etc/kafka/server.properties Alternatively, you can modify the configuration file with the following: .. codewithvars:: bash listener.name.sasl_ssl.plain.sasl.jaas.config=org.apache.kafka.common.security.plain.PlainLoginModule required \ username="admin" \ password="admin-secret" \ user_admin="admin-secret" \ user_kafkabroker1="kafkabroker1-secret"; .. _kafka-security-config-clients: Configure Clients ~~~~~~~~~~~~~~~~~ Common Configuration ^^^^^^^^^^^^^^^^^^^^ Any component that interacts with secured |ak| brokers is a *client* and must be configured for security as well. These clients include |kconnect-long| workers and certain connectors such as Replicator, |kstreams| API clients, |ksqldb| clients, non-Java clients, |c3|, |sr-long|, REST Proxy, etc. All clients share a general set of security configuration parameters required to interact with a secured |ak| cluster: 1. To encrypt via TLS/SSL and authenticate via SASL, configure the security protocol to use ``SASL_SSL``. (If you wanted TLS/SSL for both encryption and authentication without SASL, the security protocol would be ``SSL``). .. codewithvars:: bash security.protocol=SASL_SSL 2. To configure TLS encryption truststore settings, set the truststore configuration parameters. In this tutorial, the client does not need the keystore because authentication is done via SASL/PLAIN instead of mutal TLS (mTLS). .. codewithvars:: bash ssl.truststore.location=/var/ssl/private/kafka.client.truststore.jks ssl.truststore.password=test1234 3. To configure SASL authentication, set the SASL mechanism, which in this tutorial is ``PLAIN``. Then configure the JAAS configuration property to describe to connect to the |ak| brokers. The properties ``username`` and ``password`` are used to configure the user for connections. .. codewithvars:: bash sasl.mechanism=PLAIN sasl.jaas.config=org.apache.kafka.common.security.plain.PlainLoginModule required \ username="client" \ password="client-secret"; Combining the configuration steps above, the client's general pattern for enabling TLS/SSL encryption and SASL/PLAIN authentication is to add the following to the client's properties file. .. include:: ../kafka/includes/tutorial_config.rst What differs between the clients is the specific :ref:`configuration prefix` that precedes each configuration parameter, as described in the sections below. .. _config-console-producer-consumer: Configure Console Producer and Consumer ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^ The command line tools for console producer and consumer are convenient ways to send and receive a small amount of data to the cluster. They are clients and thus need security configurations as well. 1. Create a ``client_security.properties`` file with the security configuration parameters described above, with no additional configuration prefix. .. include:: ../kafka/includes/tutorial_config.rst 2. Pass in the properties file when using the command line tools. .. codewithvars:: bash kafka-console-producer --broker-list kafka1:9094 --topic test-topic --producer.config client_security.properties kafka-console-consumer --bootstrap-server kafka1:9094 --topic test-topic --consumer.config client_security.properties Configure |ksqldb| and Stream Processing Clients ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^ Enabling |ksqldb| and stream processing clients for security is simply a matter of passing the security configurations to the relevant client constructor. Take the basic client security configuration: .. include:: ../kafka/includes/tutorial_config.rst And configure the application for the following: * Top-level, with no additional configuration prefix * Confluent Monitoring Interceptor producer used for |c3| streams monitoring, with an additional configuration prefix ``producer.confluent.monitoring.interceptor.`` * Confluent Monitoring Interceptor consumer used for |c3| streams monitoring, with an additional configuration prefix ``consumer.confluent.monitoring.interceptor.`` Combining these configurations, |ksqldb| configuration for TLS/SSL encryption and SASL/PLAIN authentication is the following. You may configure them by either loading the properties from a file, as shown below, or by setting the properties programmatically. .. codewithvars:: bash # Top level security.protocol=SASL_SSL ssl.truststore.location=/var/ssl/private/kafka.client.truststore.jks ssl.truststore.password=test1234 sasl.mechanism=PLAIN sasl.jaas.config=org.apache.kafka.common.security.plain.PlainLoginModule required username="client" password="client-secret"; # Embedded producer for streams monitoring with Confluent Control Center producer.interceptor.classes=io.confluent.monitoring.clients.interceptor.MonitoringProducerInterceptor producer.confluent.monitoring.interceptor.security.protocol=SASL_SSL producer.confluent.monitoring.interceptor.ssl.truststore.location=/var/ssl/private/kafka.client.truststore.jks producer.confluent.monitoring.interceptor.ssl.truststore.password=test1234 producer.confluent.monitoring.interceptor.sasl.mechanism=PLAIN producer.confluent.monitoring.interceptor.sasl.jaas.config=org.apache.kafka.common.security.plain.PlainLoginModule required username="client" password="client-secret"; # Embedded consumer for streams monitoring with Confluent Control Center consumer.interceptor.classes=io.confluent.monitoring.clients.interceptor.MonitoringConsumerInterceptor consumer.confluent.monitoring.interceptor.security.protocol=SASL_SSL consumer.confluent.monitoring.interceptor.ssl.truststore.location=/var/ssl/private/kafka.client.truststore.jks consumer.confluent.monitoring.interceptor.ssl.truststore.password=test1234 consumer.confluent.monitoring.interceptor.sasl.mechanism=PLAIN consumer.confluent.monitoring.interceptor.sasl.jaas.config=org.apache.kafka.common.security.plain.PlainLoginModule required username="client" password="client-secret"; .. note:: This is not the full configuration. This is just the additional configurations required to enable security on a known working |ksqldb| or Java application that is monitored using |c3|. Configure |kconnect-long| ~~~~~~~~~~~~~~~~~~~~~~~~~ From the perspective of the brokers, |kconnect-long| is another client, and this tutorial configures Connect for TLS/SSL encryption and SASL/PLAIN authentication. Enabling |kconnect-long| for security is simply a matter of passing the security configurations to the Connect workers, the producers used by source connectors, and the consumers used by sink connectors. Take the basic client security configuration: .. include:: ../kafka/includes/tutorial_config.rst And configure |kconnect-long| for the following: * Top-level for Connect workers, with no additional configuration prefix * Embedded producer for source connectors, with an additional configuration prefix ``producer.`` * Embedded consumers for sink connectors, with an additional configuration prefix ``consumer.`` * Confluent Monitoring Interceptor producer for source connectors used for |c3| streams monitoring, with an additional configuration prefix ``producer.confluent.monitoring.interceptor.`` * Confluent Monitoring Interceptor consumer for sink connectors used for |c3| streams monitoring, with an additional configuration prefix ``consumer.confluent.monitoring.interceptor.`` Combining these configurations, a |kconnect-long| worker configuration for TLS/SSL encryption and SASL/PLAIN authentication is the following. You may configure these settings in the ``connect-distributed.properties`` file. .. codewithvars:: bash # Connect worker security.protocol=SASL_SSL ssl.truststore.location=/var/ssl/private/kafka.client.truststore.jks ssl.truststore.password=test1234 sasl.mechanism=PLAIN sasl.jaas.config=org.apache.kafka.common.security.plain.PlainLoginModule required \ username="connect" \ password="connect-secret"; # Embedded producer for source connectors producer.security.protocol=SASL_SSL producer.ssl.truststore.location=/var/ssl/private/kafka.client.truststore.jks producer.ssl.truststore.password=test1234 producer.sasl.mechanism=PLAIN producer.sasl.jaas.config=org.apache.kafka.common.security.plain.PlainLoginModule required \ username="connect" \ password="connect-secret"; # Embedded consumer for sink connectors consumer.security.protocol=SASL_SSL consumer.ssl.truststore.location=/var/ssl/private/kafka.client.truststore.jks consumer.ssl.truststore.password=test1234 consumer.sasl.mechanism=PLAIN consumer.sasl.jaas.config=org.apache.kafka.common.security.plain.PlainLoginModule required \ username="connect" \ password="connect-secret"; # Embedded producer for source connectors for streams monitoring with Confluent Control Center producer.interceptor.classes=io.confluent.monitoring.clients.interceptor.MonitoringProducerInterceptor producer.confluent.monitoring.interceptor.security.protocol=SASL_SSL producer.confluent.monitoring.interceptor.ssl.truststore.location=/var/ssl/private/kafka.client.truststore.jks producer.confluent.monitoring.interceptor.ssl.truststore.password=test1234 producer.confluent.monitoring.interceptor.sasl.mechanism=PLAIN producer.confluent.monitoring.interceptor.sasl.jaas.config=org.apache.kafka.common.security.plain.PlainLoginModule required \ username="connect" \ password="connect-secret"; # Embedded consumer for sink connectors for streams monitoring with Confluent Control Center consumer.interceptor.classes=io.confluent.monitoring.clients.interceptor.MonitoringConsumerInterceptor consumer.confluent.monitoring.interceptor.security.protocol=SASL_SSL consumer.confluent.monitoring.interceptor.ssl.truststore.location=/var/ssl/private/kafka.client.truststore.jks consumer.confluent.monitoring.interceptor.ssl.truststore.password=test1234 consumer.confluent.monitoring.interceptor.sasl.mechanism=PLAIN consumer.confluent.monitoring.interceptor.sasl.jaas.config=org.apache.kafka.common.security.plain.PlainLoginModule required \ username="connect" \ password="connect-secret"; .. note:: This is not the full Connect worker configuration. This is just the additional configurations required to enable security on a known working |kconnect-long| cluster that is already successfully monitored via |c3|. Pass in the properties file when starting each Connect worker. .. code-block:: shell connect-distributed etc/kafka/connect-distributed.properties .. _security-tutorial-replicator: Replicator ~~~~~~~~~~ |crep-full| is a type of |ak| source connector that replicates data from a source to destination |ak| cluster. An embedded consumer inside Replicator consumes data from the source cluster, and an embedded producer inside the |kconnect-long| worker produces data to the destination cluster. Take the basic client security configuration: .. include:: ../kafka/includes/tutorial_config.rst And configure Replicator for the following: * Top-level Replicator consumer from the origin cluster, with an additional configuration prefix ``src.kafka.`` * Confluent Monitoring Interceptor consumer from the origin cluster used for |c3| streams monitoring, with an additional configuration prefix ``src.consumer.confluent.monitoring.interceptor.`` Combining the configuration steps described above, the Replicator JSON properties file contains the following configuration settings: .. codewithvars:: bash { "name":"replicator", "config":{ .... "src.kafka.security.protocol" : "SASL_SSL", "src.kafka.ssl.truststore.location" : "var/private/ssl/kafka.server.truststore.jks", "src.kafka.ssl.truststore.password" : "test1234", "src.kafka.sasl.mechanism" : "PLAIN", "src.kafka.sasl.jaas.config" : "org.apache.kafka.common.security.plain.PlainLoginModule required username=\"replicator\" password=\"replicator-secret\";", "src.consumer.interceptor.classes": "io.confluent.monitoring.clients.interceptor.MonitoringConsumerInterceptor", "src.consumer.confluent.monitoring.interceptor.security.protocol": "SASL_SSL", "src.consumer.confluent.monitoring.interceptor.ssl.truststore.location": "/var/ssl/private/kafka.client.truststore.jks", "src.consumer.confluent.monitoring.interceptor.ssl.truststore.password": "confluent", "src.consumer.confluent.monitoring.interceptor.sasl.mechanism": "PLAIN", "src.consumer.confluent.monitoring.interceptor.sasl.jaas.config": "org.apache.kafka.common.security.plain.PlainLoginModule required username="client" password="client-secret";", .... } } .. note:: This is not the full Replicator configuration. Rather, it shows the additional configurations required to enable security on a known working Replicator connector that is already successfully monitored using |c3|. After |kconnect-long| is started, you can add the |crep-full|: .. codewithvars:: bash curl -X POST -H "Content-Type: application/json" --data @replicator_properties.json http://connect:8083/connectors |c3| ~~~~ |c3| uses |kstreams| for stream processing, so if all the |ak| brokers in the monitoring cluster backing Control Center are secured, then the Control Center application, another client, also needs to be secured. Take the basic client security configuration and add the configuration prefix ``confluent.controlcenter.streams.`` Make all the following modifications in the ``etc/confluent-control-center/control-center.properties`` file: .. codewithvars:: bash confluent.controlcenter.streams.security.protocol=SASL_SSL confluent.controlcenter.streams.ssl.truststore.location=/var/ssl/private/kafka.client.truststore.jks confluent.controlcenter.streams.ssl.truststore.password=test1234 confluent.controlcenter.streams.sasl.mechanism=PLAIN confluent.controlcenter.streams.sasl.jaas.config=org.apache.kafka.common.security.plain.PlainLoginModule required \ username="confluent" \ password="confluent-secret"; .. note:: This is not the full |c3| configuration. Rather, it shows the additional configurations required to enable security on a known working |c3| deployment. Start |c3|. .. code-block:: shell control-center-start etc/confluent-control-center/control-center.properties |cmetric-full| ~~~~~~~~~~~~~~ If you are using |c3| to monitor your deployment, the |cmetric-full| is a client as well. If the monitoring cluster backing |c3| is also configured with the same security protocols, then configure the |cmetric-full| for security in each broker's ``server.properties`` file. The configuration prefix is ``confluent.metrics.reporter.`` and is described :ref:`above`. Confluent Monitoring Interceptors ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~ Configure security for the components that are using Confluent Monitoring Interceptors to report stream monitoring statistics in |c3|: .. codewithvars:: bash # Embedded producer for streams monitoring with Confluent Control Center producer.interceptor.classes=io.confluent.monitoring.clients.interceptor.MonitoringProducerInterceptor producer.confluent.monitoring.interceptor.security.protocol=SASL_SSL producer.confluent.monitoring.interceptor.ssl.truststore.location=/var/ssl/private/kafka.client.truststore.jks producer.confluent.monitoring.interceptor.ssl.truststore.password=test1234 producer.confluent.monitoring.interceptor.sasl.mechanism=PLAIN producer.confluent.monitoring.interceptor.sasl.jaas.config=org.apache.kafka.common.security.plain.PlainLoginModule required username="client" password="client-secret"; # Embedded consumer for streams monitoring with Confluent Control Center consumer.interceptor.classes=io.confluent.monitoring.clients.interceptor.MonitoringConsumerInterceptor consumer.confluent.monitoring.interceptor.security.protocol=SASL_SSL consumer.confluent.monitoring.interceptor.ssl.truststore.location=/var/ssl/private/kafka.client.truststore.jks consumer.confluent.monitoring.interceptor.ssl.truststore.password=test1234 consumer.confluent.monitoring.interceptor.sasl.mechanism=PLAIN consumer.confluent.monitoring.interceptor.sasl.jaas.config=org.apache.kafka.common.security.plain.PlainLoginModule required username="client" password="client-secret"; Authorization and ACLs ~~~~~~~~~~~~~~~~~~~~~~ Use :ref:`Centralized ACLs ` to add, remove, or list ACLs when using RBAC. The most common tasks for ACL management are adding or removing a principal as a producer or consumer. For example, to add a client called ``client-1`` as a producer and consumer of a topic called ``test-topic``, execute the following: .. highlight:: none :: confluent iam acl create --allow --principal User:client-1 --operation write --topic test-topic --kafka-cluster-id confluent iam acl create --allow --principal User:client-1 --operation read --topic test-topic --kafka-cluster-id For additional CLI details, refer to the ACL subcommands in :confluent-cli:`confluent iam|command-reference/iam/index.html`. If you are not running RBAC, then refer to :ref:`kafka_authorization`. Troubleshooting ~~~~~~~~~~~~~~~ In cases where the configuration does not work on the first attempt, debugging output is a helpful way to diagnose the cause of the problem: 1. Validate the keys and certificates in the keystores and truststores in the brokers and clients. .. codewithvars:: bash keytool -list -v -keystore /var/ssl/private/kafka.server.keystore.jks 2. Enable |ak| authorization logging by modifying the ``etc/kafka/log4j.properties`` file. Change the log level to DEBUG, and then restart the brokers. .. codewithvars:: bash log4j.logger.kafka.authorizer.logger=DEBUG, authorizerAppender 3. Enable TLS/SSL debug output by using the ``javax.net.debug`` system property, which requires a restart of the JVM. .. codewithvars:: bash export KAFKA_OPTS=-Djavax.net.debug=all 4. Enable SASL debug output using the ``sun.security.krb5.debug`` system property, which requires a restart of the JVM. .. codewithvars:: bash export KAFKA_OPTS=-Dsun.security.krb5.debug=true Next Steps ~~~~~~~~~~ To see a fully secured multi-node cluster, check out the Docker-based :ref:`Confluent Platform demo`. It shows entire configurations, including security-related and non security-related configuration parameters, on all components in |cp|, and the demo's playbook has a security section for further learning. Read the :ref:`documentation ` for more details about security design and configuration on all components in |cp|. While this tutorial uses the PLAIN mechanism for the SASL examples, Confluent additionally supports :ref:`GSSAPI (Kerberos)` and :ref:`SCRAM`, which are more suitable for production. We welcome feedback in the `Confluent community `_ security channel in Slack!