Tutorial: Group-Based Authorization Using LDAP¶
To use LDAP for group-based authorization, you must install or migrate to Confluent Server and configure the Confluent Server Authorizer. Refer to Migrate Confluent Platform to Confluent Server for installation instructions.
Tutorial - Starting a Kafka cluster with group-based authorization¶
Follow these instructions to start a Kafka cluster with Confluent Server (using
Configure Confluent Server Authorizer in Confluent Platform) to use group-based authorization with groups
obtained from your LDAP server. The instructions below use SASL_PLAINTEXT
as the security
protocol for the Confluent Server broker and Kafka clients with SASL/SCRAM-SHA-256
as the SASL mechanism.
Instructions to use SASL/GSSAPI
to enable both authentication and group-based authorization using
a Kerberos server (for example, Active Directory or Apache Directory Service) are also provided.
Prerequisites¶
An LDAP server (for example, Microsoft Entra ID/Active Directory) must be set up
before starting up the Kafka cluster. The example below assumes that you have an
LDAP server at the URL LDAPSERVER.EXAMPLE.COM:3268
that is accessible using
DNS lookup from the host where the broker is run. The example expects a Kerberos-enabled
LDAP server and the LDAP Authorizer configuration uses GSSAPI
for authentication.
These security settings and other configuration options must match your LDAP server
configuration.
The example uses the following host, realm, and port, which should be updated to point to your LDAP server.
- LDAP server host:
LDAPSERVER.EXAMPLE.COM
- LDAP realm:
EXAMPLE.COM
- LDAP port:
3268
(global context port used in the example)
At least one group containing one or more users must be created . The example assumes that your
LDAP server contains a group named Kafka Developers
and a user named alice
who is a member
of Kafka Developers
group. The user principal and group must be updated to match the user and
group from the LDAP server that you want to use for the tests.
The users in the example are:
kafka
: for brokers (groups are not used in the example for authorization of brokers, but broker authorization could also be configured using groups if required)alice
: member of groupKafka Developers
If your LDAP server authenticates clients using Kerberos, a keytab file is required for the LDAP
authorizer and the keytab file and principal should be updated in authorizer JAAS configuration
option ldap.sasl.jaas.config
.
Start Kafka Cluster with Confluent Server Authorizer¶
The instructions below are based on the assumption that you are running scripts from the installation directory after installing Confluent Platform from an archive. The directories of the scripts and configuration files should be adjusted to match your installation.
Start ZooKeeper¶
Start ZooKeeper using the default configuration.
zookeeper-server-start etc/kafka/zookeeper.properties > /tmp/zookeeper.log 2>&1 &
Create users¶
Create the user kafka
for brokers and alice
for clients.
kafka-configs --bootstrap-server localhost:9092 --alter --add-config 'SCRAM-SHA-256=[iterations=8192,password=kafka-secret]' --entity-type users --entity-name kafka
kafka-configs --bootstrap-server localhost:9092 --alter --add-config 'SCRAM-SHA-256=[iterations=8192,password=alice-secret]' --entity-type users --entity-name alice
Configure listeners for broker¶
Copy the following lines at the end of your broker configuration file (for example, etc/kafka/server.properties
).
listeners=SASL_PLAINTEXT://:9092
advertised.listeners=SASL_PLAINTEXT://localhost:9092
security.inter.broker.protocol=SASL_PLAINTEXT
sasl.enabled.mechanisms=SCRAM-SHA-256
sasl.mechanism.inter.broker.protocol=SCRAM-SHA-256
listener.name.sasl_plaintext.scram-sha-256.sasl.jaas.config= \
org.apache.kafka.common.security.scram.ScramLoginModule required \
username="kafka" \
password="kafka-secret";
Configure Confluent Server Authorizer¶
For Kerberos (GSSAPI) configurations (ldap.java.naming.security.authentication=GSSAPI
),
copy the following lines to the end of your broker configuration file (for example,
etc/kafka/server.properties
) and update the configurations to match the
configuration of your LDAP server.
# Configure authorizer
authorizer.class.name=`io.confluent.kafka.security.authorizer.ConfluentServerAuthorizer`
# Set Kafka broker user as super user (alternatively, set ACLs before starting brokers)
super.users=User:kafka
# LDAP provider URL
ldap.java.naming.provider.url=ldap://LDAPSERVER.EXAMPLE.COM:3268/DC=EXAMPLE,DC=COM
# Refresh interval for LDAP cache. If set to zero, persistent search is used.
ldap.refresh.interval.ms=60000
# Security authentication protocol for LDAP context
ldap.java.naming.security.authentication=GSSAPI
# JAAS configuration for the LDAP authorizer, update keytab path and the principal for user performing LDAP search
ldap.sasl.jaas.config=com.sun.security.auth.module.Krb5LoginModule required \
keyTab="/tmp/keytabs/ldap.keytab" \
principal="ldap@EXAMPLE.COM" \
debug="true" \
storeKey="true" \
useKeyTab="true";
# Search base for group-based search
ldap.group.search.base=CN=Users
# Object class for groups
ldap.group.object.class=group
# Name of the attribute from which group name used in ACLs is obtained
ldap.group.name.attribute=sAMAccountName
# Regex pattern to obtain group name used in ACLs from the attribute `ldap.group.name.attribute`
ldap.group.name.attribute.pattern=
# Name of the attribute from which group members (user principals) are obtained
ldap.group.member.attribute=member
# Regex pattern to obtain user principal from group member attribute
ldap.group.member.attribute.pattern=CN=(.*),CN=Users,DC=EXAMPLE,DC=COM
For SIMPLE authentication, the ldap.java.naming.security.authentication
,
ldap.java.naming.security.principal
, and ldap.java.naming.security.credentials
will differ from the Kerberos example above, and your Confluent Server Authorizer configuration should
look like the following:
# Activate the plugin
authorizer.class.name=`io.confluent.kafka.security.authorizer.ConfluentServerAuthorizer`
# Set the Kafka broker as a super user
super.users=User:kafka
# Provide the LDAP provider URL
ldap.java.naming.provider.url=ldap://somehost:389/DC=example,DC=com
# Specify the LDAP security authentication protocol
ldap.java.naming.security.authentication=SIMPLE
# Identify the principal for the LDAP context
ldap.java.naming.security.principal=cn=admin,dc=example,dc=com
ldap.java.naming.security.credentials=broker-secret
# Specify the search base for group-based search
ldap.group.search.base=ou=Groups,dc=example,dc=com
# Specify the object class for groups
ldap.group.object.class=groupOfNames
# Specify the attribute name from which the group name used in ACLs is obtained
ldap.group.name.attribute=cn
# Specify the regex pattern to obtain the group name used in ACLs from the attribute ldap.authorizer.group.name.attribute
ldap.group.name.attribute.pattern=(Kafka.*)
# Specify the attribute name from which group members (user principals) are obtained
ldap.group.member.attribute=member
# Specify the regex pattern to obtain the user principal from the group member attribute
ldap.group.member.attribute.pattern=cn=(.*),ou=People,dc=example,dc=com
Start Confluent Server broker¶
Note
If there are other nodes in the cluster, repeat these steps for each node.
Authorize broker user kafka
for cluster operations. Note that the example
uses user-principal based ACL for brokers, but brokers may also be configured to
use group-based ACLs.
If there are other nodes in the cluster, repeat these steps for each node.
kafka-acls --authorizer-properties zookeeper.connect=localhost:2181 --add --cluster --operation=All --allow-principal=User:kafka
Note that if there are no brokers up, you cannot use the --bootstrap-server
option. In this case, you must use the ZooKeeper option for adding the initial ACL.
Start your broker with Kerberos options that enable Confluent Server Authorizer to authenticate with your LDAP server.
export KAFKA_OPTS="-Djava.security.krb5.kdc=LDAPSERVER.EXAMPLE.COM -Djava.security.krb5.realm=EXAMPLE.COM"
kafka-server-start etc/kafka/server.properties > /tmp/kafka.log 2>&1 &
Test LDAP group-based authorization¶
Create a topic¶
kafka-topics --create --topic testtopic --partitions 10 --replication-factor 1 --bootstrap-server localhost:9092
Configure producer and consumer¶
Add the following lines to your producer and consumer configuration files (for example, etc/kafka/producer.properties
and etc/kafka/consumer.properties
)
security.protocol=SASL_PLAINTEXT
sasl.mechanism=SCRAM-SHA-256
sasl.jaas.config=org.apache.kafka.common.security.scram.ScramLoginModule required username="alice" password="alice-secret";
Run console producer without authorizing user¶
kafka-console-producer --broker-list localhost:9092 --topic testtopic --producer.config etc/kafka/producer.properties
Type in some messages. You should see authorization failures.
Authorize group and rerun producer¶
kafka-acls --bootstrap-server localhost:9092 --command-config adminclient-configs.conf --add --topic=testtopic --producer --allow-principal="Group:Kafka Developers"
kafka-console-producer --broker-list localhost:9092 --topic testtopic --producer.config etc/kafka/producer.properties
Type in some messages. Records are produced successfully using group-based authorization where the user-to-group mapping was obtained from your LDAP server.
Run console consumer without access to consumer group¶
kafka-console-consumer --bootstrap-server localhost:9092 --topic testtopic --from-beginning --consumer.config etc/kafka/consumer.properties
Consume should fail authorization because neither user alice
nor the group Kafka Developers
that
alice
belongs to has authorization to consume using the group test-consumer-group
Authorize group and rerun consumer¶
kafka-acls --bootstrap-server localhost:9092 --command-config adminclient-configs.conf --add --topic=testtopic --group test-consumer-group --allow-principal="Group:Kafka Developers"
kafka-console-consumer --bootstrap-server localhost:9092 --topic testtopic --from-beginning --consumer.config etc/kafka/consumer.properties
Consume should succeed now using group-based authorization where the user-to-group mapping was obtained from your LDAP server.
Authentication and group-based authorization using an LDAP server¶
A Kerberos-enabled LDAP server (for example, Active Directory or Apache Directory Server) may be used for authentication as
well as group-based authorization if users and groups are managed by this server. The instructions below use
SASL/GSSAPI
for authentication using AD or DS and obtain group membership of the users from the same server.
The example is based on the assumption that you have the following three user principals and keytabs for these principals:
kafka/localhost@EXAMPLE.COM
: Service principal for brokersalice@EXAMPLE.COM
: Client principal, member of groupKafka Developers
ldap@EXAMPLE.COM
: Principal used by LDAP Authorizer
Note that the user principal used for authorization is the local name (for example, kafka
, alice
) by default and
these short principals are used to determine group membership. Brokers may be configured with custom
principal.builder.class
or sasl.kerberos.principal.to.local.rules
to override this behavior.
The attributes used for mapping users to groups may also be customized to match your LDAP server.
If you have already started the broker using SASL/SCRAM-SHA-256
following the instructions above,
stop the server first. The instructions below are based on the assumption that you have already updated configuration for brokers,
producers, and consumers as described earlier.
Configure listeners to use GSSAPI
by updating the following properties in your broker configuration file
(for example, etc/kafka/server.properties
).
sasl.enabled.mechanisms=GSSAPI
sasl.mechanism.inter.broker.protocol=GSSAPI
sasl.kerberos.service.name=kafka
listener.name.sasl_plaintext.gssapi.sasl.jaas.config= \
com.sun.security.auth.module.Krb5LoginModule required \
keyTab="/tmp/keytabs/kafka.keytab" \
principal="kafka/localhost@EXAMPLE.COM" \
debug="true" \
storeKey="true" \
useKeyTab="true";
Add or update the following properties in your producer and consumer configuration files (e.g. etc/kafka/producer.properties
and etc/kafka/consumer.properties
)
sasl.mechanism=GSSAPI
sasl.kerberos.service.name=kafka
sasl.jaas.config= com.sun.security.auth.module.Krb5LoginModule required \
keyTab="/tmp/keytabs/alice.keytab" \
principal="alice@EXAMPLE.COM" \
debug="true" \
storeKey="true" \
useKeyTab="true";
Restart the broker, and run the producer and consumer as described earlier. Producers and consumers are now authenticated using your Kerberos server. Group information is also obtained from the same server using LDAP.