.. title:: Secure Deployment for Kafka Streams in Confluent Platform .. meta:: :description: Set ACLS and assign RBAC roles in your Kafka Streams applications. .. _streams_developer-guide_security: Secure Deployment for |kstreams| in |cp| **************************************** |kstreams| natively integrates with the |ak-tm| :ref:`security features ` and supports all of the client-side security features in |ak|. |kstreams| leverages the :ref:`Java Producer and Consumer API `. To secure your Stream processing applications, configure the security settings in the corresponding |ak| producer and consumer clients, and then specify the corresponding configuration settings in your |kstreams| application. |ak| supports cluster encryption and authentication, including a mix of authenticated and unauthenticated, and encrypted and non-encrypted clients. Using security is optional. Here a few relevant client-side security features: Encrypt data-in-transit between your applications and |ak| brokers You can enable the encryption of the client-server communication between your applications and the |ak| brokers. For example, you can configure your applications to always use encryption when reading and writing data to and from |ak|. This is critical when reading and writing data across security domains such as internal network, public internet, and partner networks. Client authentication You can enable client authentication for connections from your application to |ak| brokers. For example, you can define that only specific applications are allowed to connect to your |ak| cluster. Client authorization You can enable client authorization of read and write operations by your applications. For example, you can define that only specific applications are allowed to read from a |ak| topic. You can also restrict write access to |ak| topics to prevent data pollution or fraudulent activities. For more information about the security features in |ak|, see :ref:`Kafka Security ` and the blog post `Apache Kafka Security 101 `__. .. _streams_developer-guide_security-acls: Required ACL setting for secure |ak| clusters --------------------------------------------- |ak| clusters can use ACLs to control access to resources (like the ability to create topics), and for such clusters each client, including |kstreams|, is required to authenticate as a particular user in order to be authorized with appropriate access. In particular, when |kstreams| applications are run against a secured |ak| cluster, the principal running the application must have the ACL set so that the application has the permissions to create :ref:`internal topics `. To avoid providing this permission to your application, you can create the required internal topics manually. If the internal topics exist, |kstreams| won't try to recreate them. The internal repartition and changelog topics must be created with the correct number of partitions, otherwise |kstreams| fails on startup. The topics must be created with the same number of partitions as your input topic. If there are multiple topics, they must be created with the maximum number of partitions across all input topics. Also, changelog topics *must* be created with log compaction enabled, otherwise your application might lose data. For changelog topics for windowed KTables, apply "delete,compact" and set the retention time based on the corresponding store retention time. To avoid premature deletion, add a delta to the store retention time. By default, |kstreams| adds 24 hours to the store retention time. You can find out more about the names of the required internal topics by using ``Topology#describe()``. All internal topics follow the naming pattern ``--``, where the ``suffix`` is either ``repartition`` or ``changelog``. .. important:: There is no guarantee that this naming pattern will continue in future releases, because it's not part of the public API. For best security, set only the minimum ACL operations. Allow only the following operations for the |kstreams| principal. - Topic resource (for internal topics): READ, DELETE, WRITE, CREATE - Consumer Group resource: READ, DESCRIBE - Topic resource (for input topic): READ - Topic resource (for output topic): WRITE For example, given the following setup of your |kstreams| application: - Configuration ``application.id`` value is ``team1-streams-app1``. - Authenticating with the |ak| cluster as a ``team1`` user. - The application's coded topology reads from input topics ``input-topic1`` and ``input-topic2``. - The application's topology write to output topics ``output-topic1`` and ``output-topic2``. - The application has Exactly-Once processing guarantee enabled ``processing.guarantee=exactly_once``. The following commands create the necessary ACLs in the |ak| cluster to allow your application to operate: .. code:: bash # Allow Streams to read the input topics: bin/kafka-acls ... --add --allow-principal User:team1 --operation READ --topic input-topic1 --topic input-topic2 # Allow Streams to write to the output topics: bin/kafka-acls ... --add --allow-principal User:team1 --operation WRITE --topic output-topic1 --topic output-topic2 # Allow Streams to manage its own internal topics: bin/kafka-acls ... --add --allow-principal User:team1 --operation READ --operation DELETE --operation WRITE --operation CREATE --resource-pattern-type prefixed --topic team1-streams-app1 # Allow Streams to manage its own consumer groups: bin/kafka-acls ... --add --allow-principal User:team1 --operation READ --operation DESCRIBE --group team1-streams-app1 # Allow Streams EOS: bin/kafka-acls ... --add --allow-principal User:team1 --operation WRITE --operation DESCRIBE --transactional-id team1-streams-app1 --resource-pattern-type prefixed If you are planning to repartition topics in |kstreams|, then be sure to specify ``cleanup.policy=delete`` and also allow DELETE operations. The DELETE operation ensures that, after repartitioning, the cleanup removes old records from the logs. If you don't allow DELETE operations, there is a likelihood of increased file descriptor usage. The DELETE ACL on internal topics is necessary to enable the automatic cleanup that occurs through calls to ``deleteRecords``. .. _streams_developer-guide_security-rbac: RBAC role bindings ------------------ |kstreams| supports role-based access control (RBAC) for controlling access to resources in your |ak| clusters. The following table shows required RBAC roles for access to cluster resources. +-------------------------------+---------------------+------------------------------------------------------+-----------------------------------------------+ | Resource | Role | Command | Notes | +===============================+=====================+======================================================+===============================================+ | Input topic | ``DeveloperRead`` | .. code:: bash | | | | | | | | | | confluent iam rbac role-binding create \ | | | | | --principal User: \ | | | | | --role DeveloperRead \ | | | | | --resource Topic: \ | | | | | --kafka-cluster-id | | +-------------------------------+---------------------+------------------------------------------------------+-----------------------------------------------+ | Output topic | ``DeveloperWrite`` | .. code:: bash | | | | | | | | | | confluent iam rbac role-binding create \ | | | | | --principal User: \ | | | | | --role DeveloperWrite \ | | | | | --resource Topic: \ | | | | | --kafka-cluster-id | | +-------------------------------+---------------------+------------------------------------------------------+-----------------------------------------------+ | Internal topic | ``ResourceOwner`` | .. code:: bash | Required on all internal topics for | | | | | internal topic management, for example, | | | | confluent iam rbac role-binding create \ | internal delete calls. | | | | --principal User: \ | | | | | --role ResourceOwner \ | | | | | --prefix \ | | | | | --resource Topic: \ | | | | | --kafka-cluster-id | | +-------------------------------+---------------------+------------------------------------------------------+-----------------------------------------------+ | Idempotent Producer | ``DeveloperWrite`` | .. code:: bash | The role binding is on the cluster, because | | | | | no topic is involved. | | | | confluent iam rbac role-binding create \ | | | | | --principal User: \ | | | | | --role DeveloperWrite \ | | | | | --resource Cluster: \ | | | | | --kafka-cluster-id | | +-------------------------------+---------------------+------------------------------------------------------+-----------------------------------------------+ | Transactional Producer | ``DeveloperWrite`` | .. code:: bash | When ``processing.guarantee`` is set to | | | | | ``exactly_once`` or ``exactly_once_v2``. | | | | confluent iam rbac role-binding create \ | | | | | --principal User: \ | | | | | --role DeveloperWrite \ | | | | | --prefix \ | | | | | --resource Transactional-Id: \ | | | | | --kafka-cluster-id | | +-------------------------------+---------------------+------------------------------------------------------+-----------------------------------------------+ | Consumer group | ``DeveloperRead`` | .. code:: bash | | | | | | | | | | confluent iam rbac role-binding create \ | | | | | --principal User: \ | | | | | --role DeveloperRead \ | | | | | --prefix \ | | | | | --resource Group: \ | | | | | --kafka-cluster-id | | +-------------------------------+---------------------+------------------------------------------------------+-----------------------------------------------+ | |sr| with input/output topics | ``DeveloperRead`` | .. code:: bash | The resource also may be | | | | | ``Subject:``. | | | | confluent iam rbac role-binding create \ | | | | | --principal User: \ | | | | | --role DeveloperRead \ | | | | | --prefix \ | | | | | --resource Subject: \ | | | | | --kafka-cluster-id | | +-------------------------------+---------------------+------------------------------------------------------+-----------------------------------------------+ | |sr| with internal topics | ``ResourceOwner`` | .. code:: bash | If internal topic schema usage is enabled. | | | | | | | | | confluent iam rbac role-binding create \ | | | | | --principal User: \ | | | | | --role ResourceOwner \ | | | | | --prefix \ | | | | | --resource Subject: \ | | | | | --kafka-cluster-id | | +-------------------------------+---------------------+------------------------------------------------------+-----------------------------------------------+ .. _streams_developer-guide_security-example: Security example ---------------- This example is based on the Confluent blog post `Apache Kafka Security 101 `__. The purpose is to configure a |kstreams| application to enable client authentication and encrypt data-in-transit when communicating with its |ak| cluster. .. tip:: A complete demo application is available at :cp-examples:`SecureKafkaStreamsExample.java|src/main/java/io/confluent/examples/streams/SecureKafkaStreamsExample.java` in the Confluent Examples repository. This example assumes that the |ak| brokers in the cluster already have their security setup and that the necessary SSL certificates are available to the application in the local filesystem locations. For example, if you are using Docker then you must also include these TLS/SSL certificates in the correct locations within the Docker image. The snippet below shows the settings to enable client authentication and TLS/SSL encryption for data-in-transit between your |kstreams| application and the |ak| cluster it is reading and writing from: .. codewithvars:: bash # Essential security settings to enable client authentication and TLS/SSL encryption bootstrap.servers=kafka.example.com:9093 security.protocol=SSL ssl.truststore.location=/etc/security/tls/kafka.client.truststore.jks ssl.truststore.password=test1234 ssl.keystore.location=/etc/security/tls/kafka.client.keystore.jks ssl.keystore.password=test1234 ssl.key.password=test1234 Configure these settings in the application for your ``StreamsConfig`` instance. These settings will encrypt any data-in-transit that is being read from or written to |ak|, and your application will authenticate itself against the |ak| brokers that it is communicating with. Note that this example does not cover client authorization. .. sourcecode:: java // Code of your Java application that uses the Kafka Streams library Properties settings = new Properties(); settings.put(StreamsConfig.APPLICATION_ID_CONFIG, "secure-kafka-streams-app"); // Where to find secure Kafka brokers. Here, it's on port 9093. settings.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "kafka.example.com:9093"); // // ...further non-security related settings may follow here... // // Security settings. // 1. These settings must match the security settings of the secure Kafka cluster. // 2. The TLS/SSL trust store and key store files must be locally accessible to the application. settings.put(CommonClientConfigs.SECURITY_PROTOCOL_CONFIG, "SSL"); settings.put(SslConfigs.SSL_TRUSTSTORE_LOCATION_CONFIG, "/etc/security/tls/kafka.client.truststore.jks"); settings.put(SslConfigs.SSL_TRUSTSTORE_PASSWORD_CONFIG, "test1234"); settings.put(SslConfigs.SSL_KEYSTORE_LOCATION_CONFIG, "/etc/security/tls/kafka.client.keystore.jks"); settings.put(SslConfigs.SSL_KEYSTORE_PASSWORD_CONFIG, "test1234"); settings.put(SslConfigs.SSL_KEY_PASSWORD_CONFIG, "test1234"); If you incorrectly configure a security setting in your application, it will fail at runtime, typically right after you start it. For example, if you enter an incorrect password for the ``ssl.keystore.password`` setting, an error message similar to this would be logged and then the application would terminate: .. codewithvars:: bash # Misconfigured ssl.keystore.password Exception in thread "main" org.apache.kafka.common.KafkaException: Failed to construct kafka producer [...snip...] Caused by: org.apache.kafka.common.KafkaException: org.apache.kafka.common.KafkaException: java.io.IOException: Keystore was tampered with, or password was incorrect [...snip...] Caused by: java.security.UnrecoverableKeyException: Password verification failed Monitor your |kstreams| application log files for such error messages to spot any misconfigured applications quickly. .. include:: ../../.hidden/docs-common/home/includes/ak-share.rst