Using the Confluent LDAP Authorizer

Install

Important

This software is available under a Confluent enterprise license. You can use this software for a 30-day trial period without a license key. If you are a subscriber, please contact Confluent Support at support@confluent.io for more information.

The LDAP authorizer plugin is included in the Confluent Platform commercial component confluent-server package. To use LDAP authorizer, you must install or migrate to Confluent Server. The LDAP authorizer is not supported with confluent-kafka or Apache Kafka®. Refer to Migrate to Confluent Server for installation instructions.

Activate the Plugin

After the installation is complete, you add the following config in the Apache Kafka® broker config file to activate (e.g. /etc/kafka/server.properties). The URL for your LDAP server and the configuration options to connect to your LDAP server and obtain the mapping of users to groups should also be added to the broker configuration. In 5.3.0 and later, configuration options for the LDAP Authorizer are prefixed with either ldap. or ldap.authorizer. (prefix ldap.authorizer. is supported for backward compatibility).

authorizer.class.name=io.confluent.kafka.security.ldap.authorizer.LdapAuthorizer

Tutorial: Starting a Kafka cluster with group-based authorization

Follow these instructions to start a Kafka cluster with group-based authorization using groups obtained from your LDAP server. The instructions below use SASL_PLAINTEXT as the security protocol for the Kafka broker and Kafka clients with SASL/SCRAM-SHA-256 as the SASL mechanism. Instructions to use SASL/GSSAPI to enable both authentication and group-based authorization using a Kerberos server (for example, Active Directory or Apache Directory Service) are also provided.

Prerequisites

An LDAP server (e.g Active Directory) must be set up before starting up the Kafka cluster. The example below assumes that you have an LDAP server at the URL LDAPSERVER.EXAMPLE.COM:3268 that is accessible using DNS lookup from the host where the broker is run. The example expects a Kerberos-enabled LDAP server and the LDAP Authorizer configuration uses GSSAPI for authentication. These security settings and other configuration options must match your LDAP server configuration.

The example uses the following host, realm, and port, which should be updated to point to your LDAP server.

  • LDAP server host: LDAPSERVER.EXAMPLE.COM
  • LDAP realm: EXAMPLE.COM
  • LDAP port: 3268 (global context port used in the example)

At least one group containing one or more users must be created . The example assumes that your LDAP server contains a group named Kafka Developers and a user named alice who is a member of Kafka Developers group. The user principal and group must be updated to match the user and group from the LDAP server that you want to use for the tests.

The users in the example are:

  • kafka: for brokers (groups are not used in the example for authorization of brokers, but broker authorization could also be configured using groups if required)
  • alice: member of group Kafka Developers

If your LDAP server authenticates clients using Kerberos, a keytab file is required for the LDAP authorizer and the keytab file and principal should be updated in authorizer JAAS configuration option ldap.sasl.jaas.config.

Start Kafka Cluster with LDAP Authorizer

The instructions below are based on the assumption that you are running scripts from the installation directory after installing Confluent Platform from an archive. The directories of the scripts and configuration files should be adjusted to match your installation.

Start ZooKeeper

Start ZooKeeper using the default configuration.

bin/zookeeper-server-start etc/kafka/zookeeper.properties > /tmp/zookeeper.log 2>&1 &

Create users

Create the user kafka for brokers and alice for clients.

bin/kafka-configs --bootstrap-server localhost:9092 --alter --add-config 'SCRAM-SHA-256=[iterations=8192,password=kafka-secret]' --entity-type users --entity-name kafka
bin/kafka-configs --bootstrap-server localhost:9092 --alter --add-config 'SCRAM-SHA-256=[iterations=8192,password=alice-secret]' --entity-type users --entity-name alice

Configure listeners for broker

Copy the following lines at the end of your broker configuration file (for example, etc/kafka/server.properties).

listeners=SASL_PLAINTEXT://:9092
advertised.listeners=SASL_PLAINTEXT://localhost:9092
security.inter.broker.protocol=SASL_PLAINTEXT
sasl.enabled.mechanisms=SCRAM-SHA-256
sasl.mechanism.inter.broker.protocol=SCRAM-SHA-256
listener.name.sasl_plaintext.scram-sha-256.sasl.jaas.config= \
  org.apache.kafka.common.security.scram.ScramLoginModule required \
  username="kafka" \
  password="kafka-secret";

Configure LDAP Authorizer

Copy the following lines to the end of your broker configuration file (for example, etc/kafka/server.properties) and update the configurations to match the configuration of your LDAP server.

# Configure authorizer
authorizer.class.name=io.confluent.kafka.security.ldap.authorizer.LdapAuthorizer
# Set Kafka broker user as super user (alternatively, set ACLs before starting brokers)
super.users=User:kafka
# LDAP provider URL
ldap.java.naming.provider.url=ldap://LDAPSERVER.EXAMPLE.COM:3268/DC=EXAMPLE,DC=COM
# Refresh interval for LDAP cache. If set to zero, persistent search is used.
ldap.refresh.interval.ms=60000
# Security authentication protocol for LDAP context
ldap.java.naming.security.authentication=GSSAPI
# JAAS configuration for the LDAP authorizer, update keytab path and the principal for user performing LDAP search
ldap.sasl.jaas.config=com.sun.security.auth.module.Krb5LoginModule required \
keyTab="/tmp/keytabs/ldap.keytab" \
principal="ldap@EXAMPLE.COM" \
debug="true" \
storeKey="true" \
useKeyTab="true";

# Search base for group-based search
ldap.group.search.base=CN=Users
# Object class for groups
ldap.group.object.class=group
# Name of the attribute from which group name used in ACLs is obtained
ldap.group.name.attribute=sAMAccountName
# Regex pattern to obtain group name used in ACLs from the attribute `ldap.group.name.attribute`
ldap.group.name.attribute.pattern=
# Name of the attribute from which group members (user principals) are obtained
ldap.group.member.attribute=member
# Regex pattern to obtain user principal from group member attribute
ldap.group.member.attribute.pattern=CN=(.*),CN=Users,DC=EXAMPLE,DC=COM

Alternatively, to configure the LDAP Authorizer using simple authentication, your configuration should look like the following:

# Activate the plugin
authorizer.class.name=io.confluent.kafka.security.ldap.authorizer.LdapAuthorizer
# Set the Kafka broker as a super user
super.users=User:kafka

# Provide the LDAP provider URL
ldap.authorizer.java.naming.provider.url=ldap://somehost:389/DC=example,DC=com
# Specify the LDAP security authentication protocol
ldap.authorizer.java.naming.security.authentication=SIMPLE
# Identify the principal for the LDAP context
ldap.authorizer.java.naming.security.principal=cn=admin,dc=example,dc=com
ldap.authorizer.java.naming.security.credentials=broker-secret
# Specify the search base for group-based search
ldap.authorizer.group.search.base=ou=Groups,dc=example,dc=com
# Specify the object class for groups
ldap.authorizer.group.object.class=groupOfNames
# Specify the attribute name from which the group name used in ACLs is obtained
ldap.authorizer.group.name.attribute=cn
# Specify the regex pattern to obtain the group name used in ACLs from the attribute ldap.authorizer.group.name.attribute
ldap.authorizer.group.name.attribute.pattern=(Kafka.*)
# Specify the attribute name from which group members (user principals) are obtained
ldap.authorizer.group.member.attribute=member
# Specify the regex pattern to obtain the user principal from the group member attribute
ldap.authorizer.group.member.attribute.pattern=cn=(.*),ou=People,dc=example,dc=com

Start Kafka broker

Note

If there are other nodes in the cluster, repeat these steps for each node.

Authorize broker user kafka for cluster operations. Note that the example uses user-principal based ACL for brokers, but brokers may also be configured to use group-based ACLs.

If there are other nodes in the cluster, repeat these steps for each node.

bin/kafka-acls --authorizer-properties zookeeper.connect=localhost:2181 --add --cluster --operation=All --allow-principal=User:kafka

Note that if there are no brokers up, you cannot use the --bootstrap-server option. In this case, you must use the ZooKeeper option for adding the initial ACL.

Start your broker with Kerberos options that enable LDAP Authorizer to authenticate with your LDAP server.

export KAFKA_OPTS="-Djava.security.krb5.kdc=LDAPSERVER.EXAMPLE.COM -Djava.security.krb5.realm=EXAMPLE.COM"
bin/kafka-server-start etc/kafka/server.properties > /tmp/kafka.log 2>&1 &

Test LDAP group-based authorization

Create a topic

bin/kafka-topics --create --topic testtopic --partitions 10 --replication-factor 1 --bootstrap-server localhost:9092

Configure producer and consumer

Add the following lines to your producer and consumer configuration files (for example, etc/kafka/producer.properties and etc/kafka/consumer.properties)

security.protocol=SASL_PLAINTEXT
sasl.mechanism=SCRAM-SHA-256
sasl.jaas.config=org.apache.kafka.common.security.scram.ScramLoginModule required username="alice" password="alice-secret";

Run console producer without authorizing user

bin/kafka-console-producer --broker-list localhost:9092 --topic testtopic --producer.config etc/kafka/producer.properties

Type in some messages. You should see authorization failures.

Authorize group and rerun producer

bin/kafka-acls --bootstrap-server localhost:9092 --command-config adminclient-configs.conf --add --topic=testtopic --producer --allow-principal="Group:Kafka Developers"
bin/kafka-console-producer --broker-list localhost:9092 --topic testtopic --producer.config etc/kafka/producer.properties

Type in some messages. Records are produced successfully using group-based authorization where the user-to-group mapping was obtained from your LDAP server.

Run console consumer without access to consumer group

bin/kafka-console-consumer --bootstrap-server localhost:9092 --topic testtopic --from-beginning --consumer.config etc/kafka/consumer.properties

Consume should fail authorization because neither user alice nor the group Kafka Developers that alice belongs to has authorization to consume using the group test-consumer-group

Authorize group and rerun consumer

bin/kafka-acls --bootstrap-server localhost:9092 --command-config adminclient-configs.conf --add --topic=testtopic --group test-consumer-group --allow-principal="Group:Kafka Developers"
bin/kafka-console-consumer --bootstrap-server localhost:9092 --topic testtopic --from-beginning --consumer.config etc/kafka/consumer.properties

Consume should succeed now using group-based authorization where the user-to-group mapping was obtained from your LDAP server.

Authentication and group-based authorization using an LDAP server

A Kerberos-enabled LDAP server (for example, Active Directory or Apache Directory Server) may be used for authentication as well as group-based authorization if users and groups are managed by this server. The instructions below use SASL/GSSAPI for authentication using AD or DS and obtain group membership of the users from the same server.

The example is based on the assumption that you have the following three user principals and keytabs for these principals:

  • kafka/localhost@EXAMPLE.COM: Service principal for brokers
  • alice@EXAMPLE.COM: Client principal, member of group Kafka Developers
  • ldap@EXAMPLE.COM : Principal used by LDAP Authorizer

Note that the user principal used for authorization is the local name (for example, kafka, alice) by default and these short principals are used to determine group membership. Brokers may be configured with custom principal.builder.class or sasl.kerberos.principal.to.local.rules to override this behavior. The attributes used for mapping users to groups may also be customized to match your LDAP server.

If you have already started the broker using SASL/SCRAM-SHA-256 following the instructions above, stop the server first. The instructions below are based on the assumption that you have already updated configuration for brokers, producers, and consumers as described earlier.

Configure listeners to use GSSAPI by updating the following properties in your broker configuration file (for example, etc/kafka/server.properties).

sasl.enabled.mechanisms=GSSAPI
sasl.mechanism.inter.broker.protocol=GSSAPI
sasl.kerberos.service.name=kafka
listener.name.sasl_plaintext.gssapi.sasl.jaas.config= \
  com.sun.security.auth.module.Krb5LoginModule required \
  keyTab="/tmp/keytabs/kafka.keytab" \
  principal="kafka/localhost@EXAMPLE.COM" \
  debug="true" \
  storeKey="true" \
  useKeyTab="true";

Add or update the following properties in your producer and consumer configuration files (e.g. etc/kafka/producer.properties and etc/kafka/consumer.properties)

sasl.mechanism=GSSAPI
sasl.kerberos.service.name=kafka
sasl.jaas.config= com.sun.security.auth.module.Krb5LoginModule required \
  keyTab="/tmp/keytabs/alice.keytab" \
  principal="alice@EXAMPLE.COM" \
  debug="true" \
  storeKey="true" \
  useKeyTab="true";

Restart the broker, and run the producer and consumer as described earlier. Producers and consumers are now authenticated using your Kerberos server. Group information is also obtained from the same server using LDAP.