チュートリアル: LDAP を使用したグループベースの認可

グループベースの認可に LDAP を使用するには、Confluent Server をインストールするか、これに移行して、Confluent Server Authorizer の構成 を構成する必要があります。インストール手順については、「Confluent Server への移行」を参照してください。

チュートリアル : グループベースの認可での Kafka クラスターの起動

以下の手順に従い、LDAP サーバーから取得したグループのグループベースの認可を使用して、Confluent Server(Confluent Server Authorizer の構成 を使用)でクラスターを起動します。

以下の手順に従い、LDAP サーバーから取得したグループを使用して、グループベースの認可で Kafka クラスターを起動します。以下の手順では、Kafka ブローカーおよび Kafka クライアントのセキュリティプロトコルとして SASL_PLAINTEXT を使用し、SASL メカニズムとして SASL/SCRAM-SHA-256 を使用します。SASL/GSSAPI を使用して、Kerberos サーバー(Active Directory や Apache Directory Service など)を使用した認証とグループベースの認可の両方を有効にする手順も記載されています。

前提条件

Kafka クラスターを起動する前に、LDAP サーバー(Active Directory など)をセットアップする必要があります。以下の例は、ブローカーが実行されているホストから DNS ルックアップを使用してアクセスできる LDAP サーバーが URL LDAPSERVER.EXAMPLE.COM:3268 にあることを前提としています。この例では Kerberos 対応の LDAP サーバーが想定されており、この LDAP Authorizer 構成では認証に GSSAPI が使用されます。これらのセキュリティ設定およびその他の構成オプションは、LDAP サーバーの構成と一致する必要があります。

この例では、以下のホスト、レルム、ポートを使用していますが、これらは実際の LDAP サーバーを指すように変更してください。

  • LDAP サーバーホスト : LDAPSERVER.EXAMPLE.COM
  • LDAP レルム : EXAMPLE.COM
  • LDAP ポート : 3268 (この例ではグローバルコンテキストポートを使用)

1 人以上のユーザーを含む 1 つ以上のグループを作成する必要があります。この例では、LDAP サーバーに Kafka Developers という名前のグループと、Kafka Developers グループのメンバーである alice という名前のユーザーが含まれていることを前提としています。ユーザープリンシパルとグループは、テストに使用する LDAP サーバーのユーザーおよびグループと一致するように変更してください。

この例では以下のユーザーを使用します。

  • kafka: ブローカー用(ブローカーの認可の例ではグループが使用されていませんが、必要に応じてグループを使用してブローカーの認可を構成することもできます)
  • alice: グループ Kafka Developers のメンバー

LDAP サーバーが Kerberos を使用してクライアントを認証する場合は、LDAP オーソライザー用にキータブファイルが必要になり、オーソライザーの JAAS 構成オプション ldap.sasl.jaas.config でキータブファイルとプリンシパルを更新する必要があります。

Confluent Server Authorizer での Kafka クラスターの起動

以下の手順は、アーカイブから Confluent Platform をインストールした後、インストールディレクトリからスクリプトを実行していることを前提としています。スクリプトと構成ファイルのディレクトリは、インストールに合わせて調整してください。

ZooKeeper の起動

デフォルトの構成を使用して ZooKeeper を起動します。

bin/zookeeper-server-start etc/kafka/zookeeper.properties > /tmp/zookeeper.log 2>&1 &

ユーザーの作成

ブローカー用のユーザーとして kafka、クライアント用のユーザーとして alice を作成します。

bin/kafka-configs --bootstrap-server localhost:9092 --alter --add-config 'SCRAM-SHA-256=[iterations=8192,password=kafka-secret]' --entity-type users --entity-name kafka
bin/kafka-configs --bootstrap-server localhost:9092 --alter --add-config 'SCRAM-SHA-256=[iterations=8192,password=alice-secret]' --entity-type users --entity-name alice

ブローカー用のリスナーの構成

ブローカー構成ファイル( etc/kafka/server.properties など)の末尾に、以下をコピーします。

listeners=SASL_PLAINTEXT://:9092
advertised.listeners=SASL_PLAINTEXT://localhost:9092
security.inter.broker.protocol=SASL_PLAINTEXT
sasl.enabled.mechanisms=SCRAM-SHA-256
sasl.mechanism.inter.broker.protocol=SCRAM-SHA-256
listener.name.sasl_plaintext.scram-sha-256.sasl.jaas.config= \
  org.apache.kafka.common.security.scram.ScramLoginModule required \
  username="kafka" \
  password="kafka-secret";

Confluent Server Authorizer の構成

Kerberos(GSSAPI)構成(ldap.java.naming.security.authentication=GSSAPI)の場合は、以下の行をブローカー構成ファイル(etc/kafka/server.properties など)の末尾にコピーし、LDAP サーバーの構成と一致するように構成をアップデートします。

# Configure authorizer
authorizer.class.name=`io.confluent.kafka.security.authorizer.ConfluentServerAuthorizer`
# Set Kafka broker user as super user (alternatively, set ACLs before starting brokers)
super.users=User:kafka
# LDAP provider URL
ldap.java.naming.provider.url=ldap://LDAPSERVER.EXAMPLE.COM:3268/DC=EXAMPLE,DC=COM
# Refresh interval for LDAP cache. If set to zero, persistent search is used.
ldap.refresh.interval.ms=60000
# Security authentication protocol for LDAP context
ldap.java.naming.security.authentication=GSSAPI
# JAAS configuration for the LDAP authorizer, update keytab path and the principal for user performing LDAP search
ldap.sasl.jaas.config=com.sun.security.auth.module.Krb5LoginModule required \
keyTab="/tmp/keytabs/ldap.keytab" \
principal="ldap@EXAMPLE.COM" \
debug="true" \
storeKey="true" \
useKeyTab="true";

# Search base for group-based search
ldap.group.search.base=CN=Users
# Object class for groups
ldap.group.object.class=group
# Name of the attribute from which group name used in ACLs is obtained
ldap.group.name.attribute=sAMAccountName
# Regex pattern to obtain group name used in ACLs from the attribute `ldap.group.name.attribute`
ldap.group.name.attribute.pattern=
# Name of the attribute from which group members (user principals) are obtained
ldap.group.member.attribute=member
# Regex pattern to obtain user principal from group member attribute
ldap.group.member.attribute.pattern=CN=(.*),CN=Users,DC=EXAMPLE,DC=COM

SIMPLE 認証の場合、ldap.java.naming.security.authenticationldap.java.naming.security.principalldap.java.naming.security.credentials は、前述の Kerberos の例とは異なります。Confluent Server Authorizer 構成は次のようになります。

# Activate the plugin
authorizer.class.name=`io.confluent.kafka.security.authorizer.ConfluentServerAuthorizer`
# Set the Kafka broker as a super user
super.users=User:kafka

# Provide the LDAP provider URL
ldap.java.naming.provider.url=ldap://somehost:389/DC=example,DC=com
# Specify the LDAP security authentication protocol
ldap.java.naming.security.authentication=SIMPLE
# Identify the principal for the LDAP context
ldap.java.naming.security.principal=cn=admin,dc=example,dc=com
ldap.java.naming.security.credentials=broker-secret
# Specify the search base for group-based search
ldap.group.search.base=ou=Groups,dc=example,dc=com
# Specify the object class for groups
ldap.group.object.class=groupOfNames
# Specify the attribute name from which the group name used in ACLs is obtained
ldap.group.name.attribute=cn
# Specify the regex pattern to obtain the group name used in ACLs from the attribute ldap.authorizer.group.name.attribute
ldap.group.name.attribute.pattern=(Kafka.*)
# Specify the attribute name from which group members (user principals) are obtained
ldap.group.member.attribute=member
# Specify the regex pattern to obtain the user principal from the group member attribute
ldap.group.member.attribute.pattern=cn=(.*),ou=People,dc=example,dc=com

Kafka ブローカーの起動

注釈

クラスター内に他のノードがある場合は、ノードごとにこれらの手順を繰り返します。

ブローカーユーザー kafka にクラスター操作を許可します。この例では、ユーザープリンシパルベースの ACL をブローカーに使用していますが、グループベースの ACL を使用するようにブローカーを構成することもできます。

クラスター内に他のノードがある場合は、ノードごとにこれらの手順を繰り返します。

bin/kafka-acls --authorizer-properties zookeeper.connect=localhost:2181 --add --cluster --operation=All --allow-principal=User:kafka

ブローカーが稼働していない場合は、--bootstrap-server オプションを使用できません。その場合は、ZooKeeper オプションを使用して初期 ACL を追加する必要があります。

Confluent Server Authorizer を有効にして LDAP サーバーで認証するための Kerberos オプションを使用して、ブローカーを起動します。

export KAFKA_OPTS="-Djava.security.krb5.kdc=LDAPSERVER.EXAMPLE.COM -Djava.security.krb5.realm=EXAMPLE.COM"
bin/kafka-server-start etc/kafka/server.properties > /tmp/kafka.log 2>&1 &

LDAP でのグループベースの認可のテスト

トピックの作成

bin/kafka-topics --create --topic testtopic --partitions 10 --replication-factor 1 --bootstrap-server localhost:9092

プロデューサーとコンシューマーの構成

プロデューサーとコンシューマーの構成ファイル( etc/kafka/producer.propertiesetc/kafka/consumer.properties など)に、以下を追加します。

security.protocol=SASL_PLAINTEXT
sasl.mechanism=SCRAM-SHA-256
sasl.jaas.config=org.apache.kafka.common.security.scram.ScramLoginModule required username="alice" password="alice-secret";

ユーザーを認可せずにコンソールプロデューサーを実行する

bin/kafka-console-producer --broker-list localhost:9092 --topic testtopic --producer.config etc/kafka/producer.properties

メッセージを入力します。認可の失敗が示されます。

グループの認可とプロデューサーの再実行

bin/kafka-acls --bootstrap-server localhost:9092 --command-config adminclient-configs.conf --add --topic=testtopic --producer --allow-principal="Group:Kafka Developers"
bin/kafka-console-producer --broker-list localhost:9092 --topic testtopic --producer.config etc/kafka/producer.properties

メッセージを入力します。ユーザーからグループへのマッピングが LDAP サーバーから取得されていれば、グループベースの認可を使用してレコードが正常に生成されます。

コンシューマーグループへのアクセスなしでコンソールコンシューマーを実行

bin/kafka-console-consumer --bootstrap-server localhost:9092 --topic testtopic --from-beginning --consumer.config etc/kafka/consumer.properties

ユーザー alice も、alice が属するグループ Kafka Developers も、グループ test-consumer-group の使用による消費権限を持っていないため、消費の認可は失敗になります。

グループの認可とコンシューマーの再実行

bin/kafka-acls --bootstrap-server localhost:9092 --command-config adminclient-configs.conf --add --topic=testtopic --group test-consumer-group --allow-principal="Group:Kafka Developers"
bin/kafka-console-consumer --bootstrap-server localhost:9092 --topic testtopic --from-beginning --consumer.config etc/kafka/consumer.properties

ユーザーからグループへのマッピングが LDAP サーバーから取得されていれば、グループベースの認可を使用して消費が正しく処理されます。

LDAP サーバーを使用した認証とグループベースの認可

Kerberos 対応の LDAP サーバー(Active Directory や Apache Directory Server など)でユーザーとグループが管理されている場合は、このサーバーを認証とグループベースの認可に使用できます。以下の手順では、AD または DS を使用した認証に SASL/GSSAPI を使用し、同じサーバーからユーザーのグループメンバーシップを取得します。

この例は、以下の 3 つのユーザープリンシパルとこれらのプリンシパルのキータブがあることを前提としています。

  • kafka/localhost@EXAMPLE.COM: ブローカーのサービスプリンシパル
  • alice@EXAMPLE.COM: クライアントプリンシパル、グループ Kafka Developers のメンバー
  • ldap@EXAMPLE.COM: LDAP Authorizer で使用されるプリンシパル

認可に使用されるユーザープリンシパルはデフォルトでローカル名( kafkaalice など)であり、グループメンバーシップの判断にはこれらの短いプリンシパルが使用されます。ブローカーでは、カスタムの principal.builder.class または sasl.kerberos.principal.to.local.rules を使用して構成し、この動作をオーバーライドできます。ユーザーをグループにマッピングするために使用される属性を LDAP サーバーに合わせてカスタマイズすることもできます。

上の手順に従い、SASL/SCRAM-SHA-256 を使用してブローカーを既に起動している場合は、まずサーバーを停止します。以下の手順は、上の説明のように、ブローカー、プロデューサー、およびコンシューマーの構成が既に更新されていることを前提としています。

ブローカー構成ファイル( etc/kafka/server.properties など)で以下のプロパティを更新することにより、GSSAPI を使用するようにリスナーを構成します。

sasl.enabled.mechanisms=GSSAPI
sasl.mechanism.inter.broker.protocol=GSSAPI
sasl.kerberos.service.name=kafka
listener.name.sasl_plaintext.gssapi.sasl.jaas.config= \
  com.sun.security.auth.module.Krb5LoginModule required \
  keyTab="/tmp/keytabs/kafka.keytab" \
  principal="kafka/localhost@EXAMPLE.COM" \
  debug="true" \
  storeKey="true" \
  useKeyTab="true";

プロデューサーとコンシューマーの構成ファイル( etc/kafka/producer.propertiesetc/kafka/consumer.properties など)で、以下のプロパティを追加または更新します。

sasl.mechanism=GSSAPI
sasl.kerberos.service.name=kafka
sasl.jaas.config= com.sun.security.auth.module.Krb5LoginModule required \
  keyTab="/tmp/keytabs/alice.keytab" \
  principal="alice@EXAMPLE.COM" \
  debug="true" \
  storeKey="true" \
  useKeyTab="true";

ブローカーを再起動し、上の説明に従ってプロデューサーとコンシューマーを実行します。これで、プロデューサーとコンシューマーは Kerberos サーバーを使用して認証されます。同じサーバーから、LDAP を使用してグループ情報も取得されます。