.. _kafka_authorization: Authorization using ACLs ------------------------ |ak-tm| ships with a pluggable, out-of-box Authorizer implementation that uses |zk-full| to store all the ACLs. It is important to set ACLs because otherwise access to resources is limited to super users when an Authorizer is configured. The default behavior is that if a resource has no associated ACLs, then no one is allowed to access the resource, except super users. You can also use the |ak| :cp-javadoc:`AdminClient|clients/javadocs/org/apache/kafka/clients/admin/AdminClient.html` API to manage ACLs. .. include:: ../includes/cp-demo-tip.rst .. _kafka-auth-broker-config: Broker Configuration ~~~~~~~~~~~~~~~~~~~~ Authorizer ^^^^^^^^^^ To enable ACLs, you must configure an Authorizer. |ak| provides a simple authorizer implementation, and to use it, you can add the following to ``server.properties``: .. codewithvars:: bash authorizer.class.name=kafka.security.auth.SimpleAclAuthorizer .. note:: You can also use the commercially licensed :ref:`LDAP authorizer` to enable LDAP-group based ACLs in addition to user-based ACLs. .. _kafka-auth-superuser: Super Users ^^^^^^^^^^^ By default, if no resource patterns match a specific resource, then the resource has no associated ACLs, and therefore no one other than super users are allowed to access the resource. If you want to change that behavior, you can include the following in ``server.properties``: .. codewithvars:: bash allow.everyone.if.no.acl.found=true You can also add super users in ``server.properties`` like the following (note that the delimiter is semicolon since SSL user names may contain comma): .. codewithvars:: bash super.users=User:Bob;User:Alice User Names ^^^^^^^^^^ By default, the SSL user name will be of the form ``CN=writeuser,OU=Unknown,O=Unknown,L=Unknown,ST=Unknown,C=Unknown``. You can change the SSL user name by setting ``ssl.principal.mapping.rules`` to a customized rule in ``server.properties``. For details about customizing SSL user names, see :ref:`kafkarest_config_ssl_user_name`. For advanced use cases, you can customize the name by setting a customized PrincipalBuilder in ``server.properties``, such as: .. code:: principal.builder.class=CustomizedPrincipalBuilderClass By default, the SASL user name will be the primary part of the Kerberos principal. You can change that by setting ``sasl.kerberos.principal.to.local.rules`` to a customized rule in ``server.properties``. In the event that SSL is enabled but client authentication is not configured, clients will connect anonymously via the ``SSL`` port and will appear to the server with the user name ``ANONYMOUS``. Such a configuration provides encryption and server authentication, but clients will connect anonymously. The other case in which the server will see the ``ANONYMOUS`` user is if the ``PLAINTEXT`` security protocol is being used. By giving read/write permission to the ``ANONYMOUS`` user, you are allowing anyone to access the brokers without authentication. As such, you typically do not want to give access to ``ANONYMOUS`` users unless the intention is to give everyone the permission. .. _kafka_config_ssl_user_name: Configuration Options for Customizing SSL User Name """"""""""""""""""""""""""""""""""""""""""""""""""" .. include:: ../includes/kafka-ssl-user-dname-mapping-rule.rst Using ACLs ~~~~~~~~~~ The following examples use ``bin/kafka-acls`` (the |kacls-cli|) to add, remove or list ACLs. For detailed information on the supported options, run ``bin/kafka-acls --help``. Note that ACLs are stored in |zk| and they are propagated to the brokers asynchronously so there may be a delay before the change takes effect even after the command returns. .. tip:: - If you are using transactions (``--transactional-id``), the IdempotentWrite ACL is implied. - If you are not using transactions, you can use the ``--idempotent`` option to enable the IdempotentWrite ACL. .. _acl-format-operations-resources: ACL Format ^^^^^^^^^^ |ak| ACLs are defined in the general format of "Principal P is [Allowed/Denied] Operation O From Host H On Resource R matching ResourcePattern RP". The following table describes the relationship between operations, resources and APIs: ================ =============== ========================================= Operation Resource API ================ =============== ========================================= ALTER Cluster AlterReplicaLogDirs ALTER Cluster CreateAcls ALTER Cluster DeleteAcls ALTER Topic CreatePartitions ALTER_CONFIGS Cluster AlterConfigs ALTER_CONFIGS Topic AlterConfigs CLUSTER_ACTION Cluster Fetch (for replication only) CLUSTER_ACTION Cluster LeaderAndIsr CLUSTER_ACTION Cluster OffsetForLeaderEpoch CLUSTER_ACTION Cluster StopReplica CLUSTER_ACTION Cluster UpdateMetadata CLUSTER_ACTION Cluster ControlledShutdown CLUSTER_ACTION Cluster WriteTxnMarkers CREATE Cluster CreateTopics CREATE Cluster Metadata if ``auto.create.topics.enable`` CREATE Topic Metadata if ``auto.create.topics.enable`` CREATE Topic CreateTopics DELETE Group DeleteGroups DELETE Topic DeleteRecords DELETE Topic DeleteTopics DESCRIBE Cluster DescribeAcls DESCRIBE Cluster DescribeLogDirs DESCRIBE Cluster ListGroups DESCRIBE DelegationToken DescribeTokens DESCRIBE Group DescribeGroup DESCRIBE Group FindCoordinator DESCRIBE Group ListGroups DESCRIBE Topic ListOffsets DESCRIBE Topic Metadata DESCRIBE Topic OffsetFetch DESCRIBE Topic OffsetForLeaderEpoch DESCRIBE TransactionalId FindCoordinator DESCRIBE_CONFIGS Cluster DescribeConfigs DESCRIBE_CONFIGS Topic DescribeConfigs IDEMPOTENT_WRITE Cluster InitProducerId IDEMPOTENT_WRITE Cluster Produce READ Group AddOffsetsToTxn READ Group Heartbeat READ Group JoinGroup READ Group LeaveGroup READ Group OffsetCommit READ Group OffsetFetch READ Group SyncGroup READ Group TxnOffsetCommit READ Topic Fetch READ Topic OffsetCommit READ Topic TxnOffsetCommit WRITE Topic Produce WRITE TransactionalId Produce WRITE Topic AddPartitionsToTxn WRITE TransactionalId AddPartitionsToTxn WRITE TransactionalId AddOffsetsToTxn WRITE TransactionalId EndTxn WRITE TransactionalId InitProducerId WRITE TransactionalId TxnOffsetCommit ================ =============== ========================================= The operations in this table are both for clients (producers, consumers, admin) and inter-broker operations of a cluster. In a secure cluster, both client requests and inter-broker operations require authorization. The inter-broker operations are split into two classes: cluster and topic. Cluster operations refer to operations necessary for the management of the cluster, like updating broker and partition metadata, changing the leader and the set of in-sync replicas of a partition, and triggering a controlled shutdown. Because of the way replication of topic partitions work internally, it is also important to grant topic access to brokers. Brokers replicating a partition must be authorized for both READ and DESCRIBE on that topic. DESCRIBE is granted by default with the READ authorization. You can use these methods to automatically grant topic access to servers: * Make the server principal a :ref:`super user `. By configuring the cluster this way, servers can automatically access all resources, including the cluster resource. * Use the wildcard for topics so that you only have to set it once. You must set an ACL for the cluster resource separately. For example: .. code:: bash kafka-acls --authorizer-properties zookeeper.connect=localhost:2181 --add \ --allow-principal User:Alice --operation All --topic '*' --cluster Producers and consumers need to be authorized to perform operations on topics, but they should be configured with different principals compared to the servers. The main operations that producers require authorization to execute are WRITE and READ. Admin users can execute command line tools and require authorization. Operations that an admin user might need authorization for are DELETE, CREATE, and ALTER. You can use wildcards for producers and consumers so that you only have to set it once. - Wildcards are any resource, including groups. - You can give topic and group wildcard access to users who have permission to access all topics and groups (e.g admin users). If you use this method, you don't have to create a separate rule for each topic and group for the user. For example, you can use this to give wildcard access to Alice: .. code:: bash kafka-acls --authorizer-properties zookeeper.connect=localhost:2181 --add --allow-principal \ User:Alice --operation All --topic '*' --group '*' Common Cases ^^^^^^^^^^^^ * **create** a topic, the principal of the client will require the CREATE and DESCRIBE operations on the ``topic`` resource (via the `Metadata` API with ``auto.create.topics.enable``). * **produce** to a topic, the principal of the producer will require the WRITE operation on the ``topic`` resource. * **consume** from a topic, the principal of the consumer will require the READ operation on the ``topic`` and ``group`` resources. Note that to be able to create, produce, and consume, the servers need to be configured with the appropriate ACLs. The servers need authorization to update metadata (CLUSTER_ACTION) and to read from a topic (READ) for replication purposes. |kstreams| Use Case ^^^^^^^^^^^^^^^^^^^ If you are planning to repartition topics in |kstreams|, then be sure to specify ``cleanup.policy=delete`` and also allow DELETE operations. The DELETE operation ensures that, after repartitioning, the cleanup removes old records from the logs. If you do not allow DELETE operations, then there is a likelihood of increased file descriptor usage. For more information about the security features in |kstreams|, see :ref:`streams_developer-guide_security`. .. _kafka_adding_acls: Adding ACLs ^^^^^^^^^^^ Suppose you want to add an ACL "Principals User:Bob and User:Alice are allowed to perform Operation Read and Write on Topic test-topic from IP 198.51.100.0 and IP 198.51.100.1". You can do that by executing the following: .. codewithvars:: bash bin/kafka-acls --authorizer-properties zookeeper.connect=localhost:2181 --add \ --allow-principal User:Bob --allow-principal User:Alice \ --allow-host 198.51.100.0 --allow-host 198.51.100.1 --operation Read --operation Write --topic test-topic By default all principals that don't have an explicit ACL allowing an operation to access a resource are denied. In rare cases where an ACL that allows access to all but some principal is desired, you can use the ``--deny-principal`` and ``--deny-host`` options. For example, use the following command to allow all users to Read from ``test-topic`` but only deny ``User:BadBob`` from IP 198.51.100.3: .. codewithvars:: bash bin/kafka-acls --authorizer-properties zookeeper.connect=localhost:2181 --add \ --allow-principal User:'*' --allow-host '*' --deny-principal User:BadBob --deny-host 198.51.100.3 \ --operation Read --topic test-topic Note that ``--allow-host`` and ``deny-host`` only support IP addresses (hostnames are not supported). Also note that IPv6 addresses are supported, and that you can use them in ACLs. The examples above add ACLs to a topic by specifying ``--topic [topic-name]`` as the resource pattern option. Similarly, one can add ACLs to a cluster by specifying ``--cluster`` and to a group by specifying ``--group [group-name]``. In the event that you want to grant permission to all groups, you may do so by specifying ``--group='*'`` as shown in the following command: .. codewithvars:: bash bin/kafka-acls --authorizer kafka.security.auth.SimpleAclAuthorizer \ --authorizer-properties zookeeper.connect=localhost:2181 --add \ --allow-principal User:'*' --operation read --topic test --group='*' You can add ACLs on prefixed resource patterns. For example, you can add an acl for user Jane to produce to any topic whose name starts with ``Test-``. You can do that by executing the CLI with following options: .. codewithvars:: bash bin/kafka-acls --authorizer-properties zookeeper.connect=localhost:2181 --add --allow-principal \ User:Jane --producer --topic Test- --resource-pattern-type prefixed Note that ``--resource-pattern-type`` defaults to ``literal``, which only affects resources with the exact same name or, in the case of the wildcard resource name ``'*'``, a resource with any name. Removing ACLs ^^^^^^^^^^^^^ Removing ACLs is similar, but the ``--remove`` option should be specified instead of ``--add``. To remove the ACLs added in the first example above you can execute the following: .. codewithvars:: bash bin/kafka-acls --authorizer-properties zookeeper.connect=localhost:2181 --remove \ --allow-principal User:Bob --allow-principal User:Alice \ --allow-host 198.51.100.0 --allow-host 198.51.100.1 \ --operation Read --operation Write --topic test-topic If you want to remove the acl added to the prefixed resource pattern in the example we can execute the CLI with following options: .. codewithvars:: bash bin/kafka-acls --authorizer-properties zookeeper.connect=localhost:2181 --remove \ --allow-principal User:Jane --producer --topic Test- --resource-pattern-type Prefixed Listing ACLs ^^^^^^^^^^^^ You can list the ACLs for a given resource by specifying the ``--list`` option and the resource. For example, to list all ACLs for ``test-topic`` you can execute the following: .. codewithvars:: bash bin/kafka-acls --authorizer-properties zookeeper.connect=localhost:2181 \ --list --topic test-topic However, this will only return the ACLs that have been added to this exact resource pattern. Other ACLs can exist that affect access to the topic, e.g. any ACLs on the topic wildcard ``'*'``, or any ACLs on prefixed resource patterns. Acls on the wildcard resource pattern can be queried explicitly: .. codewithvars:: bash bin/kafka-acls --authorizer-properties zookeeper.connect=localhost:2181 --list --topic '*' It is not necessarily possible to explicitly query for ACLs on prefixed resource patterns that match Test-topic as the name of such patterns may not be known. We can list all ACLs affecting Test-topic by using ``--resource-pattern-type match``. For example: .. codewithvars:: bash bin/kafka-acls --authorizer-properties zookeeper.connect=localhost:2181 --list --topic Test-topic --resource-pattern-type match This will list ACLs on all matching literal, wildcard and prefixed resource patterns. Adding or Removing a Principal as Producer or Consumer ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^ The most common use cases for ACL management are adding/removing a principal as a producer or consumer. To add ``User:Bob`` as a producer of ``test-topic`` you can execute the following: .. codewithvars:: bash bin/kafka-acls --authorizer-properties zookeeper.connect=localhost:2181 \ --add --allow-principal User:Bob \ --producer --topic test-topic To add Alice as a consumer of ``test-topic`` with group ``Group-1``, you can specify the ``--consumer`` and ``--group`` options: .. codewithvars:: bash bin/kafka-acls --authorizer-properties zookeeper.connect=localhost:2181 \ --add --allow-principal User:Bob \ --consumer --topic test-topic --group Group-1 To remove a principal from a producer or consumer role, you can specify the ``--remove`` option. Enabling Authorization for Idempotent and Transactional APIs ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^ Producers may be configured with ``enable.idempotence=true`` to ensure that exactly one copy of each message is written to the stream. The principal used by idempotent producers must be authorized to perform ``IdempotentWrite`` on the cluster. To enable Bob to producer messages using an idempotent producer, you can execute the command: .. sourcecode:: bash bin/kafka-acls --authorizer-properties zookeeper.connect=localhost:2181 \ --add --allow-principal User:Bob \ --producer --topic test-topic --idempotent Producers may also be configured with a non-empty ``transactional.id`` to enable transactional delivery with reliability semantics that span multiple producer sessions. The principal used by transactional producers must be authorized for ``Describe`` and ``Write`` operations on the configured ``transactional.id``. To enable Alice to producer messages using a transactional producer with ``transactional.id=test-txn``, you can execute the command: .. sourcecode:: bash bin/kafka-acls --authorizer-properties zookeeper.connect=localhost:2181 \ --add --allow-principal User:Alice \ --producer --topic test-topic --transactional-id test-txn Note that idempotent write access is automatically granted for transactional producers configured with ACLs for the configured transactional id. Creating Non-Super User ACL Administrators ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^ In the event that you want a non-super user to be able to create or delete ACLs, but not grant them the super user role, a current super user can grant another user (referred to here as the ACL administrator) the ``ALTER --cluster`` access control entry (ACE), which binds an operation, in this case, “alter” to a resource, “cluster”. After granting the ACL Administrator the ``ALTER --cluster`` ACE, that user can create and delete ACLs for a given resource in a cluster. .. sourcecode:: bash kafka-acls --authorizer-properties zookeeper.connect=localhost:2181 \ --add --allow-principal User: notSuper \ --operation ALTER --cluster .. note:: - If you wish to assign ``ALTER --cluster`` to a group, then ``Group:groupName`` is also valid; however, the Authorizer you are using must be able to handle/allow groups. - Exercise caution when assigning ``ALTER --cluster`` to users or groups because such users will be able to create and delete ACLs to control their own access to resources as well. Authorization in the REST Proxy and |sr| ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~ You may use |ak| ACLs to enforce authorization in the REST Proxy and |sr|. These require Confluent :ref:`security plugins `. Debugging ~~~~~~~~~ It's possible to run with authorizer logs in ``DEBUG`` mode by making some changes to the ``log4j.properties`` file. If you're using the default ``log4j.properties`` file in the Confluent Platform 0.9.0.0 or above, you simply need to change the following line to ``DEBUG`` mode instead of ``WARN``: .. codewithvars:: bash log4j.logger.kafka.authorizer.logger=WARN, authorizerAppender The ``log4j.properties`` file is located in the |ak| config directory at ``/etc/kafka/log4j.properties``. In the event that you're using an earlier version of the Confluent Platform, or if you're using your own ``log4j.properties`` file, you'll need to add the following lines to the config: .. codewithvars:: bash log4j.appender.authorizerAppender=org.apache.log4j.DailyRollingFileAppender log4j.appender.authorizerAppender.DatePattern='.'yyyy-MM-dd-HH log4j.appender.authorizerAppender.File=${kafka.logs.dir}/kafka-authorizer.log log4j.appender.authorizerAppender.layout=org.apache.log4j.PatternLayout log4j.appender.authorizerAppender.layout.ConversionPattern=[%d] %p %m (%c)%n log4j.logger.kafka.authorizer.logger=DEBUG, authorizerAppender log4j.additivity.kafka.authorizer.logger=false You'll need to restart the broker before it will take effect. This will log every request being authorized and its associated user name. The log is located in ``$kafka_logs_dir/kafka-authorizer.log``. The location of the logs depends on the packaging format - ``kafka_logs_dir`` will be in ``/var/log/kafka`` in rpm/debian and ``$base_dir/logs`` in the archive format. .. toctree:: :maxdepth: 1 :hidden: ../security/ldap-authorizer/introduction