.. _kafka_authorization: Authorization and ACLs ---------------------- Overview ~~~~~~~~ |ak-tm| ships with a pluggable Authorizer and an out-of-box authorizer implementation that uses |zk| to store all the ACLs. It is important to set ACLs because otherwise access to resources is limited to super users when an authorizer is configured. The default behavior is that if a resource has no associated ACLs, then no one is allowed to access the resource, except super users. .. _kafka-auth-broker-config: Broker Configuration ~~~~~~~~~~~~~~~~~~~~ Authorizer ^^^^^^^^^^ To enable ACLs, you must configure an authorizer. |ak| provides a simple authorizer implementation, and to use it, you can add the following to ``server.properties``: .. codewithvars:: bash authorizer.class.name=kafka.security.auth.SimpleAclAuthorizer .. _kafka-auth-superuser: Super Users ^^^^^^^^^^^ By default, if no resource patterns match a specific resource, then the resource has no associated ACLs, and therefore no one other than super users are allowed to access the resource. If you want to change that behavior, you can include the following in ``server.properties``: .. codewithvars:: bash allow.everyone.if.no.acl.found=true You can also add super users in ``server.properties`` like the following (note that the delimiter is semicolon since SSL user names may contain comma): .. codewithvars:: bash super.users=User:Bob;User:Alice User Names ^^^^^^^^^^ By default, the SSL user name will be of the form ``CN=writeuser,OU=Unknown,O=Unknown,L=Unknown,ST=Unknown,C=Unknown``. One can change that by setting a customized PrincipalBuilder in ``server.properties`` like the following: .. codewithvars:: bash principal.builder.class=CustomizedPrincipalBuilderClass By default, the SASL user name will be the primary part of the Kerberos principal. One can change that by setting ``sasl.kerberos.principal.to.local.rules`` to a customized rule in ``server.properties``. In the event that SSL is enabled but client authentication is not configured, clients will connect anonymously via the ``SSL`` port and will appear to the server with the user name ``ANONYMOUS``. Such a configuration provides encryption and server authentication, but clients will connect anonymously. The other case in which the server will see the ``ANONYMOUS`` user is if the ``PLAINTEXT`` security protocol is being used. By giving read/write permission to the ``ANONYMOUS`` user, you are allowing anyone to access the brokers without authentication. As such, you typically do not want to give access to ``ANONYMOUS`` users unless the intention is to give everyone the permission. Using ACLs ~~~~~~~~~~ The following examples use ``bin/kafka-acls`` (the |kacls-cli|) to add, remove or list ACLs. For detailed information on the supported options, run ``bin/kafka-acls --help``. Note that ACLs are stored in |zk| and they are propagated to the brokers asynchronously so there may be a delay before the change takes effect even after the command returns. .. tip:: - If you are using transactions (``--transactional-id``), the IdempotentWrite ACL is implied. - If you are not using transactions, you can use the ``--idempotent`` option to enable the IdempotentWrite ACL. ACL Format ^^^^^^^^^^ |ak| ACLs are defined in the general format of "Principal P is [Allowed/Denied] Operation O From Host H On Resource R matching ResourcePattern RP". The following table describes the relationship between operations, resources and APIs: ================ =============== ========================================= Operation Resource API ================ =============== ========================================= ALTER Cluster AlterReplicaLogDirs ALTER Cluster CreateAcls ALTER Cluster DeleteAcls ALTER Topic CreatePartitions ALTER_CONFIGS Cluster AlterConfigs ALTER_CONFIGS Topic AlterConfigs CLUSTER_ACTION Cluster Fetch (for replication only) CLUSTER_ACTION Cluster LeaderAndIsr CLUSTER_ACTION Cluster OffsetForLeaderEpoch CLUSTER_ACTION Cluster StopReplica CLUSTER_ACTION Cluster UpdateMetadata CLUSTER_ACTION Cluster ControlledShutdown CLUSTER_ACTION Cluster WriteTxnMarkers CREATE Cluster CreateTopics CREATE Cluster Metadata if ``auto.create.topics.enable`` CREATE Topic Metadata if ``auto.create.topics.enable`` CREATE Topic CreateTopics DELETE Group DeleteGroups DELETE Topic DeleteRecords DELETE Topic DeleteTopics DESCRIBE Cluster DescribeAcls DESCRIBE Cluster DescribeLogDirs DESCRIBE Cluster ListGroups DESCRIBE DelegationToken DescribeTokens DESCRIBE Group DescribeGroup DESCRIBE Group FindCoordinator DESCRIBE Group ListGroups DESCRIBE Topic ListOffsets DESCRIBE Topic Metadata DESCRIBE Topic OffsetFetch DESCRIBE Topic OffsetForLeaderEpoch DESCRIBE TxnId FindCoordinator DESCRIBE_CONFIGS Cluster DescribeConfigs DESCRIBE_CONFIGS Topic DescribeConfigs IDEMPOTENT_WRITE Cluster InitProducerId IDEMPOTENT_WRITE Cluster Produce READ Group AddOffsetsToTxn READ Group Heartbeat READ Group JoinGroup READ Group LeaveGroup READ Group OffsetCommit READ Group OffsetFetch READ Group SyncGroup READ Group TxnOffsetCommit READ Topic Fetch READ Topic OffsetCommit READ Topic TxnOffsetCommit WRITE Topic Produce WRITE TxnId Produce WRITE Topic AddPartitionsToTxn WRITE TxnId AddPartitionsToTxn WRITE TxnId AddOffsetsToTxn WRITE TxnId EndTxn WRITE TxnId InitProducerId WRITE TxnId TxnOffsetCommit ================ =============== ========================================= The operations in this table are both for clients (producers, consumers, admin) and inter-broker operations of a cluster. In a secure cluster, both client requests and inter-broker operations require authorization. The inter-broker operations are split into two classes: cluster and topic. Cluster operations refer to operations necessary for the management of the cluster, like updating broker and partition metadata, changing the leader and the set of in-sync replicas of a partition, and triggering a controlled shutdown. Because of the way replication of topic partitions work internally, it is also important to grant topic access to brokers. Brokers replicating a partition must be authorized for both READ and DESCRIBE on that topic. DESCRIBE is granted by default with the READ authorization. You can use these methods to automatically grant topic access to servers: * Make the server principal a :ref:`super user `. By configuring the cluster this way, servers can automatically access all resources, including the cluster resource. * Use the wildcard for topics so that you only have to set it once. You must set an ACL for the cluster resource separately. For example: .. code:: bash kafka-acls --authorizer-properties zookeeper.connect=localhost:2181 --add \ --allow-principal User:Alice --operation All --topic '*' --cluster Producers and consumers need to be authorized to perform operations on topics, but they should be configured with different principals compared to the servers. The main operations that producers require authorization to execute are WRITE and READ. Admin users can execute command line tools and require authorization. Operations that an admin user might need authorization for are DELETE, CREATE, and ALTER. You can use wildcards for producers and consumers so that you only have to set it once. - Wildcards are any resource, including groups. - You can give topic and group wildcard access to users who have permission to access all topics and groups (e.g admin users). If you use this method, you don't have to create a separate rule for each topic and group for the user. For example, you can use this to give wildcard access to Alice: .. code:: bash kafka-acls --authorizer-properties zookeeper.connect=localhost:2181 --add --allow-principal \ User:Alice --operation All --topic '*' --group '*' Common cases ++++++++++++ * **create** a topic, the principal of the client will require the CREATE and DESCRIBE operations on the ``topic`` resource (via the `Metadata` API with ``auto.create.topics.enable``). * **produce** to a topic, the principal of the producer will require the WRITE operation on the ``topic`` resource. * **consume** from a topic, the principal of the consumer will require the READ operation on the ``topic`` and ``group`` resources. Note that to be able to create, produce, and consume, the servers need to be configured with the appropriate ACLs. The servers need authorization to update metadata (CLUSTER_ACTION) and to read from a topic (READ) for replication purposes. |kstreams| use case +++++++++++++++++++ If you are planning to repartition topics in |kstreams|, then be sure to specify ``cleanup.policy=delete`` and also allow DELETE operations. The DELETE operation ensures that, after repartitioning, the cleanup removes old records from the logs. If you do not allow DELETE operations, then there is a likelihood of increased file descriptor usage. For more information about the security features in |kstreams|, see :ref:`streams_developer-guide_security`. .. _kafka_adding_acls: Adding ACLs ^^^^^^^^^^^ Suppose you want to add an ACL "Principals User:Bob and User:Alice are allowed to perform Operation Read and Write on Topic test-topic from IP 198.51.100.0 and IP 198.51.100.1". You can do that by executing the following: .. codewithvars:: bash bin/kafka-acls --authorizer-properties zookeeper.connect=localhost:2181 --add \ --allow-principal User:Bob --allow-principal User:Alice \ --allow-host 198.51.100.0 --allow-host 198.51.100.1 --operation Read --operation Write --topic test-topic By default all principals that don't have an explicit ACL allowing an operation to access a resource are denied. In rare cases where an ACL that allows access to all but some principal is desired, you can use the ``--deny-principal`` and ``--deny-host`` options. For example, use the following command to allow all users to Read from ``test-topic`` but only deny ``User:BadBob`` from IP 198.51.100.3: .. codewithvars:: bash bin/kafka-acls --authorizer-properties zookeeper.connect=localhost:2181 --add \ --allow-principal User:'*' --allow-host '*' --deny-principal User:BadBob --deny-host 198.51.100.3 \ --operation Read --topic test-topic Note that ``--allow-host`` and ``deny-host`` only support IP addresses (hostnames are not supported). The examples above add ACLs to a topic by specifying ``--topic [topic-name]`` as the resource pattern option. Similarly, one can add ACLs to a cluster by specifying ``--cluster`` and to a group by specifying ``--group [group-name]``. In the event that you want to grant permission to all groups, you may do so by specifying ``--group='*'`` as shown in the following command: .. codewithvars:: bash bin/kafka-acls --authorizer kafka.security.auth.SimpleAclAuthorizer \ --authorizer-properties zookeeper.connect=localhost:2181 --add \ --allow-principal User:'*' --operation read --topic test --group='*' You can add ACLs on prefixed resource patterns. For example, you can add an acl for user Jane to produce to any topic whose name starts with ``Test-``. You can do that by executing the CLI with following options: .. codewithvars:: bash bin/kafka-acls --authorizer-properties zookeeper.connect=localhost:2181 --add --allow-principal \ User:Jane --producer --topic Test- --resource-pattern-type prefixed Note that ``--resource-pattern-type`` defaults to ``literal``, which only affects resources with the exact same name or, in the case of the wildcard resource name ``'*'``, a resource with any name. Removing ACLs ^^^^^^^^^^^^^ Removing ACLs is similar, but the ``--remove`` option should be specified instead of ``--add``. To remove the ACLs added in the first example above you can execute the following: .. codewithvars:: bash bin/kafka-acls --authorizer-properties zookeeper.connect=localhost:2181 --remove \ --allow-principal User:Bob --allow-principal User:Alice \ --allow-host 198.51.100.0 --allow-host 198.51.100.1 \ --operation Read --operation Write --topic test-topic If you want to remove the acl added to the prefixed resource pattern in the example we can execute the CLI with following options: .. codewithvars:: bash bin/kafka-acls --authorizer-properties zookeeper.connect=localhost:2181 --remove \ --allow-principal User:Jane --producer --topic Test- --resource-pattern-type Prefixed Listing ACLs ^^^^^^^^^^^^ You can list the ACLs for a given resource by specifying the ``--list`` option and the resource. For example, to list all ACLs for ``test-topic`` you can execute the following: .. codewithvars:: bash bin/kafka-acls --authorizer-properties zookeeper.connect=localhost:2181 \ --list --topic test-topic However, this will only return the ACLs that have been added to this exact resource pattern. Other ACLs can exist that affect access to the topic, e.g. any ACLs on the topic wildcard ``'*'``, or any ACLs on prefixed resource patterns. Acls on the wildcard resource pattern can be queried explicitly: .. codewithvars:: bash bin/kafka-acls --authorizer-properties zookeeper.connect=localhost:2181 --list --topic '*' It is not necessarily possible to explicitly query for ACLs on prefixed resource patterns that match Test-topic as the name of such patterns may not be known. We can list all ACLs affecting Test-topic by using ``--resource-pattern-type match``. For example: .. codewithvars:: bash bin/kafka-acls --authorizer-properties zookeeper.connect=localhost:2181 --list --topic Test-topic --resource-pattern-type match This will list ACLs on all matching literal, wildcard and prefixed resource patterns. Adding or Removing a Principal as Producer or Consumer ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^ The most common use cases for ACL management are adding/removing a principal as a producer or consumer. To add ``User:Bob`` as a producer of ``test-topic`` you can execute the following: .. codewithvars:: bash bin/kafka-acls --authorizer-properties zookeeper.connect=localhost:2181 \ --add --allow-principal User:Bob \ --producer --topic test-topic To add Alice as a consumer of ``test-topic`` with group ``Group-1``, you can specify the ``--consumer`` and ``--group`` options: .. codewithvars:: bash bin/kafka-acls --authorizer-properties zookeeper.connect=localhost:2181 \ --add --allow-principal User:Bob \ --consumer --topic test-topic --group Group-1 To remove a principal from a producer or consumer role, you can specify the ``--remove`` option. Authorization in the REST Proxy and |sr| ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~ You may use |ak| ACLs to enforce authorization in the REST Proxy and |sr|. These require Confluent :ref:`security plugins `. Debugging ~~~~~~~~~ It's possible to run with authorizer logs in ``DEBUG`` mode by making some changes to the ``log4j.properties`` file. If you're using the default ``log4j.properties`` file in the Confluent Platform 0.9.0.0 or above, you simply need to change the following line to ``DEBUG`` mode instead of ``WARN``: .. codewithvars:: bash log4j.logger.kafka.authorizer.logger=WARN, authorizerAppender The ``log4j.properties`` file is located in the |ak| config directory at ``/etc/kafka/log4j.properties``. In the event that you're using an earlier version of the Confluent Platform, or if you're using your own ``log4j.properties`` file, you'll need to add the following lines to the config: .. codewithvars:: bash log4j.appender.authorizerAppender=org.apache.log4j.DailyRollingFileAppender log4j.appender.authorizerAppender.DatePattern='.'yyyy-MM-dd-HH log4j.appender.authorizerAppender.File=${kafka.logs.dir}/kafka-authorizer.log log4j.appender.authorizerAppender.layout=org.apache.log4j.PatternLayout log4j.appender.authorizerAppender.layout.ConversionPattern=[%d] %p %m (%c)%n log4j.logger.kafka.authorizer.logger=DEBUG, authorizerAppender log4j.additivity.kafka.authorizer.logger=false You'll need to restart the broker before it will take effect. This will log every request being authorized and its associated user name. The log is located in ``$kafka_logs_dir/kafka-authorizer.log``. The location of the logs depends on the packaging format - ``kafka_logs_dir`` will be in ``/var/log/kafka`` in rpm/debian and ``$base_dir/logs`` in the archive format.