セキュリティのチュートリアル

このチュートリアルでは、Confluent Platform でセキュリティを有効にする方法の例を示します。

ちなみに

RBAC の有効化については、「Metadata Service (MDS) の構成」を参照してください。

概要

このチュートリアルでは、Confluent Platform で Confluent Control Center を使用したモニタリングにより、SSL 暗号化SASL 認証、および 認可 を有効にする手順の例を示します。ZooKeeper、Apache Kafka® ブローカー、Kafka Connect、および Confluent Replicator に加え、モニタリングに必要なすべてのコンポーネント(Confluent Metrics Reporter および Confluent Monitoring Interceptor を含む)を保護するための構成設定について、手順を追って説明します。

注釈

  • 説明をわかりやすくするために、このチュートリアルでは SASL/PLAIN(または PLAIN) を使用します。これはユーザー名とパスワードによるシンプルな認証メカニズムで、セキュアな認証を実装するために、一般には暗号化に TLS を併用します。
  • Confluent Platform の本稼働環境のデプロイでは、SASL/GSSAPI(Kerberos) または SASL/SCRAM の使用をお勧めします。
  • Confluent Cloud では認証に SASL/PLAIN(または PLAIN) と TLS v1.2 暗号化を併用します。幅広いクライアントをサポートしつつ、それなりのセキュリティ水準を確保できるからです。SASL 交換で使用されるユーザー名とパスワードは、API キーおよびシークレットです。これらは、シークレットストアを使用して安全に管理するとともに、定期的にローテーションする必要があります。

注釈

プロパティ値にバックスラッシュ文字を使用する場合、Confluent Platform 構成ファイルに値を入力するときや Confluent CLI を使用するときは、バックスラッシュ文字をエスケープする必要があります。

たとえば、パスワードプロパティの値が 1\o/-r@c6pD で、有効なバックスラッシュ文字が含まれている場合、server.properties ファイルなどには 1\\o/-r@c6pD と入力する必要があります。このように入力しないと、次のエラーメッセージが表示されます。

Caused by: java.io.IOException: Keystore was tampered with, or password was incorrect

機密性の保持に Confluent CLI を使用する場合、暗号化するプロパティ値にバックスラッシュ文字が(エスケープされずに)含まれていると、Error: properties: Line xxx: invalid unicode literal というエラーメッセージが表示されます。xxx はバックスラッシュが使用されている行です。

前提条件

Confluent Platform の保護が重要である理由と、暗号化、認証、および認可のしくみを理解する必要があります。

このチュートリアルを進める前に:

注釈

このチュートリアルは、セキュリティを有効にせずに Confluent Platform を実行できることを確認してから続行してください。これにより、ユーザーはセキュリティ構成の追加と検証に専念できます。

SSL キーと証明書の作成

クラスター内の各マシンには、パブリックキー/プライベートキーのペアと、マシンを識別するための証明書があります。ただし証明書は署名されていないため、攻撃者が証明書を作成して、いずれかのマシンになりすますこともできます。

したがって、クラスター内のマシンごとに署名することにより、証明書の偽造を防ぐことが重要になります。証明書への署名は、認証機関(CA)が行います。CA は、パスポートを発行する公的機関のような役割を果たします。この公的機関では、パスポートを申請する人の身元を確認してから、偽造が困難な正規の形式でパスポートを提供します。他国の政府は、有効な形式かどうかを検証して、パスポートが本物であることを確認します。同様に、CA は証明書への署名を行い、暗号化技術によって、署名済み証明書の電子的な偽造が困難であることを保証します。このため、CA が本物の信頼できる機関である限り、クライアント側では認証済みのマシンに接続していることを確信できます。

キーストアには、各マシンの ID が保管されます。トラストストアには、マシンが信頼できるすべての証明書が保管されます。証明書をトラストストアにインポートすることは、その CA によって署名されたすべての証明書を信頼することを意味します。これは、政府(CA)を信頼することが、政府から発行されたすべてのパスポート(証明書)への信頼を意味することに似ています。この特性は信頼の連鎖と呼ばれ、大規模な Kafka クラスターで SSL デプロイする場合に特に便利です。クラスター内のすべての証明書に単一の CA で署名して、この CA を信頼する同じトラストストアをすべてのマシンで共有することができます。これにより、どのマシンでも、他のすべてのマシンを認証できます。

重要

証明書のパブリックキーを使用できる目的を制御するために、OpenSSL 証明書に Extended Key Usage 拡張機能(extendedKeyUsage)を含めることができます。このフィールドが空の場合、用途に制限はありませんが、なんらかの用途が指定された場合、有効な SSL 実装で制限を適用する必要があります。

Extended Key Usage 拡張機能は、クライアントとサーバーの認証に適しています。Kafka ブローカーでは、クラスター内通信にクライアントとサーバー両方の認証が必要です。これは、各ブローカーが他のブローカーに対してクライアントとサーバー両方の役割を果たすためです。企業の CA に Web サーバー用の署名プロファイルが存在する場合があります。これを Kafka にも使用する場合、用途の値として serverAuth しか含まれていないと、SSL ハンドシェイクが失敗します。

SSL をデプロイするための一般的な手順は、次のとおりです。

  • キーと証明書を生成する
  • 独自の認証機関(CA)を作成する
  • 証明書に署名する

キーを作成し証明書に署名する手順を以下に示します。confluent-platform-security-tools.git のスクリプトをカスタマイズして使用することもできます。

手順で使用されるパラメーターの定義は次のとおりです。

  1. keystore: キーストアの場所
  2. ca-cert: CA の証明書
  3. ca-key: CA のプライベートキー
  4. ca-password: CA のパスフレーズ
  5. cert-file: サーバーのエクスポートされた未署名の証明書
  6. cert-signed: サーバーの署名済み証明書

注釈

接続の確立後、Kafka では証明書の再ネゴシエーションや取り消しの処理は行われません。証明書が侵害された場合や証明書を取り消す場合は、ACL ブラックリスト(--deny-principal オプションまたは --deny-host オプション)を使用して、特定のクライアントを削除してください。ACL の詳細については、「ACL を使用した認可」を参照してください。

ホスト名検証の構成

サーバーホスト名の検証は、中間者攻撃を防ぐために、クライアント接続とブローカー間接続に対してデフォルトで有効になっています。サーバーホスト名の検証は、ssl.endpoint.identification.algorithm を空の文字列に設定することで無効にできます。以下に例を示します。

ssl.endpoint.identification.algorithm=

ダイナミックに構成されたブローカーリスナーの場合、ホスト名の検証は kafka-configs を使用して無効にできます。以下に例を示します。

./bin/kafka-configs --bootstrap-server localhost:9093 --entity-type brokers --entity-name 0 --alter \
--add-config "listener.name.internal.ssl.endpoint.identification.algorithm="

証明書でのホスト名の構成

ホスト名の検証が有効になっている場合、クライアントは次の 2 つのフィールドのいずれかに照らして、サーバーの完全修飾ドメイン名(FQDN)を検証します。

  • CN(Common Name)
  • SAN(Subject Alternative Name)

どちらのフィールドも有効ですが、RFC-2818 では SAN の使用が推奨されています。SAN の方が柔軟性に優れており、複数の DNS エントリを宣言できます。もう 1 つの利点は、認可の目的で CN をより意味のある値に設定できることです。SAN フィールドを追加するには、keytool コマンドの末尾に引数 -ext SAN=DNS:{FQDN} を追加します。

keytool -keystore server.keystore.jks -alias localhost -validity {validity} -genkey -keyalg RSA -ext SAN=DNS:{FQDN}

その後で次のコマンドを実行すると、生成された証明書の内容を確認できます。

keytool -list -v -keystore server.keystore.jks

キーと証明書を生成する

このプロセスには、Java の keytool ユーティリティを使用できます。コマンドと引数の詳細については、Java のドキュメント を参照してください。

  1. クラスター内の各 Kafka ブローカーについて、キーと証明書を生成します。後でエクスポートして CA で署名できるように、キーを kafka.server.keystore というキーストアに生成します。キーストアファイルには証明書のプライベートキーが含まれているため、キーストアファイルは安全に保管する必要があります。

    # With user prompts
    keytool -keystore kafka.server.keystore.jks -alias localhost -keyalg RSA -genkey
    
    # Without user prompts, pass command line arguments
    keytool -keystore kafka.server.keystore.jks -alias localhost -keyalg RSA -validity {validity} -genkey -storepass {keystore-pass} -keypass {key-pass} -dname {distinguished-name} -ext SAN=DNS:{hostname}
    

コモンネーム(CN)がサーバーの完全修飾ドメイン名(FQDN)と完全に一致していることを確認してください。クライアントは CN を DNS ドメイン名と比較して、悪意のあるサーバーではなく、目的のサーバーに接続していることを確認します。サーバーのホスト名は、SAN(Subject Alternative Name)で指定することもできます。SSL がブローカー間セキュリティプロトコルとして使用される場合、識別名がサーバープリンシパルとして使用されるため、ホスト名を CN ではなく SAN とすると便利です。

独自の認証機関(CA)を作成する

  1. CA を生成します。CA はパブリックキー/プライベートキーのペアと証明書であり、他の証明書に署名することを目的としています。

    openssl req -new -x509 -keyout ca-key -out ca-cert -days {validity}
    
  2. 生成された CA を クライアントのトラストストア に追加します。これにより、クライアントがこの CA を信頼できるようになります。

    keytool -keystore kafka.client.truststore.jks -alias CARoot -importcert -file ca-cert
    
  3. 生成された CA を ブローカーのトラストストア に追加します。これにより、ブローカーがこの CA を信頼できるようになります。

    keytool -keystore kafka.server.truststore.jks -alias CARoot -importcert -file ca-cert
    

証明書に署名する

生成した CA を使用してキーストア内のすべての証明書に署名するには、次の手順に従います。

  1. キーストアから証明書をエクスポートします。

    keytool -keystore kafka.server.keystore.jks -alias localhost -certreq -file cert-file
    
  2. CA を指定して署名します。

    openssl x509 -req -CA ca-cert -CAkey ca-key -in cert-file -out cert-signed -days {validity} -CAcreateserial -passin pass:{ca-password}
    
  3. CA の証明書と署名された証明書の両方をブローカーのキーストアにインポートします。

    keytool -keystore kafka.server.keystore.jks -alias CARoot -importcert -file ca-cert
    keytool -keystore kafka.server.keystore.jks -alias localhost -importcert -file cert-signed
    

まとめ

前述の手順をまとめると、CA とブローカー、クライアントのトラストストアとキーストアを作成するスクリプトは次のようになります。

keytool -keystore kafka.server.keystore.jks -alias localhost -keyalg RSA -validity {validity} -genkey
openssl req -new -x509 -keyout ca-key -out ca-cert -days {validity}
keytool -keystore kafka.client.truststore.jks -alias CARoot -importcert -file ca-cert
keytool -keystore kafka.server.truststore.jks -alias CARoot -importcert -file ca-cert
keytool -keystore kafka.server.keystore.jks -alias localhost -certreq -file cert-file
openssl x509 -req -CA ca-cert -CAkey ca-key -in cert-file -out cert-signed -days {validity} -CAcreateserial -passin pass:{ca-password}
keytool -keystore kafka.server.keystore.jks -alias CARoot -importcert -file ca-cert
keytool -keystore kafka.server.keystore.jks -alias localhost -importcert -file cert-signed

注釈

このチュートリアルでは、双方向 SSL ではなく SASL/PLAIN を使用してクライアント認証が行われるため、クライアントにキーストアは必要ありません。ただし、双方向 SSL 認証を使用する場合は、ブローカーの場合と同様に、クライアントキーストアを作成し、生成した CA を使用してすべての証明書に署名します。

ZooKeeper の構成

ブローカーから ZooKeeper への通信用に、オプションの DIGEST-MD5 認証を構成します。

  1. 認証プロバイダーを設定します。

    authProvider.sasl=org.apache.zookeeper.server.auth.SASLAuthenticationProvider
    
  2. zookeeper_jaas.conf ファイルを次のように構成します。2 か所のセミコロンに注意してください。

    Server {
           org.apache.zookeeper.server.auth.DigestLoginModule required
           user_super="admin-secret"
           user_kafka="kafka-secret";
    };
    
  3. ZooKeeper を起動するとき、次のとおり、JAAS ファイルの名前を JVM パラメーターとして渡します。

    export KAFKA_OPTS="-Djava.security.auth.login.config=etc/kafka/zookeeper_jaas.conf"
    bin/zookeeper-server-start etc/kafka/zookeeper.properties
    

    ちなみに

    これらの手順では、ZIP または TAR アーカイブを使用して Confluent Platform をインストールしているという想定に基づいています。詳細については「オンプレミスのデプロイ」を参照してください。

ブローカーの構成

管理者は、セキュアなクライアントとセキュアでないクライアントを組み合わせて構成できます。このチュートリアルでは、すべてのブローカー/クライアントおよびブローカー間のネットワーク通信を次の方法で確実に暗号化します。

  • すべてのブローカー/クライアント通信で SASL_SSL セキュリティプロトコルを使用すること。これにより、SASL/PLAIN を使用して通信の暗号化と認証が行われます。
  • すべてのブローカー間通信で SSL セキュリティプロトコルを使用すること。これにより、SSL を使用して通信の暗号化と認証が行われます。
  • セキュアでない PLAINTEXT ポートを有効化しないこと。

手順は以下のとおりです。

  1. 各ブローカーの server.properties で、必要なセキュリティプロトコルとポートを有効にします。SSLSASL_SSL の両方を有効にしていることに注意してください。

    listeners=SSL://:9093,SASL_SSL://:9094
    
  2. ブローカーが相互に認証するには(双方向 SSL 認証)、すべてのブローカーをクライアント認証用に構成する必要があります(この場合、リクエスト元のブローカーは "client" です)。ssl.client.auth=required の設定をお勧めします。これを requested として構成することはお勧めしません。この設定では構成が正しくないブローカーも正常に接続され、セキュリティが確保されていると錯覚してしまうからです。

    security.inter.broker.protocol=SSL
    ssl.client.auth=required
    
  3. すべてのブローカーの server.properties ファイルで、SSL トラストストア、キーストア、およびパスワードを定義します。ブローカーの構成ファイルにパスワードが直接格納されるため、ファイルシステムのアクセス許可を使用して、これらのファイルへのアクセスを制限することが重要です。

    ssl.truststore.location=/var/ssl/private/kafka.server.truststore.jks
    ssl.truststore.password=test1234
    ssl.keystore.location=/var/ssl/private/kafka.server.keystore.jks
    ssl.keystore.password=test1234
    ssl.key.password=test1234
    
  4. 各ブローカーの server.properties ファイルで SASL/PLAIN メカニズムを有効にします。

    sasl.enabled.mechanisms=PLAIN
    
  5. 各 Kafka ブローカーの構成ディレクトリに、ブローカーの JAAS の構成ファイル(この例では kafka_server_jaas.conf)を作成します。

    • KafkaServer セクションを構成します。このセクションは、ブローカーが(他のブローカーからの接続を含む)クライアント接続を検証するときに使用されます。プロパティ usernamepassword は、他のブローカーへの接続を開始するために、ブローカーで使用されます。この例の kafkabroker は、ブローカー間の通信に使用されるユーザーです。プロパティの集合 user_{userName} は、ブローカーに接続する他のすべてのクライアントのパスワードを定義します。この例では、2 つのユーザー kafkabrokerclient を使用します。
    • ブローカーが ZooKeeper に接続するときに使用される Client セクションを構成します。ここにある単一のユーザー名とパスワードは、ZooKeeper の jaas 構成と一致する必要があります。

    注釈

    各セクションの 2 か所のセミコロンに注意してください。

    KafkaServer {
       org.apache.kafka.common.security.plain.PlainLoginModule required
       username="kafkabroker"
       password="kafkabroker-secret"
       user_kafkabroker="kafkabroker-secret"
       user_kafka-broker-metric-reporter="kafkabroker-metric-reporter-secret"
       user_client="client-secret";
    };
    
    Client {
       org.apache.zookeeper.server.auth.DigestLoginModule required
       username="kafka"
       password="kafka-secret";
    };
    
  1. Confluent Control Center を使用してデプロイをモニタリングする場合に、Confluent Control Center を支えるモニタリングクラスターが同じセキュリティプロトコルで構成されている場合は、Confluent Metrics Reporter もセキュリティ用に構成する必要があります。これらの構成を各ブローカーの server.properties ファイルに追加します。

    metric.reporters=io.confluent.metrics.reporter.ConfluentMetricsReporter
    confluent.metrics.reporter.security.protocol=SASL_SSL
    confluent.metrics.reporter.ssl.truststore.location=/var/ssl/private/kafka.server.truststore.jks
    confluent.metrics.reporter.ssl.truststore.password=test1234
    confluent.metrics.reporter.sasl.mechanism=PLAIN
    confluent.metrics.reporter.sasl.jaas.config=org.apache.kafka.common.security.plain.PlainLoginModule required \
       username="kafka-broker-metric-reporter" \
       password="kafka-broker-metric-reporter-secret";
    
  2. ACL を有効にするには、オーソライザーを構成する必要があります。Kafka は、単純なオーソライザーの実装を提供します。これを使用するには、以下を server.properties に追加します。

    authorizer.class.name=kafka.security.auth.AclAuthorizer
    
  3. デフォルトの動作では、リソースに ACL が関連付けられていない場合、スーパーユーザー以外のユーザーはリソースにアクセスできません。ブローカー間操作に必要なアクセス権限をブローカープリンシパルに付与するために便利な方法は、ブローカープリンシパルをスーパーユーザーとして設定することです。このチュートリアルでは、ブローカー間セキュリティプロトコルを SSL として構成しているため、スーパーユーザー名をブローカーの証明書で構成されている distinguished name に設定します(他の 認可構成オプション を参照してください)。

    super.users=User:<DN of broker1>;User:<DN of broker2>;User:<DN of broker3>;User:kafka-broker-metric-reporter
    

前述の構成手順をまとめると、ブローカーの server.properties ファイルに含まれる構成設定は次のようになります。

# Enable SSL security protocol for inter-broker communication
# Enable SASL_SSL security protocol for broker-client communication
listeners=SSL://:9093,SASL_SSL://:9094
security.inter.broker.protocol=SSL
ssl.client.auth=required

# Broker security settings
ssl.truststore.location=/var/ssl/private/kafka.server.truststore.jks
ssl.truststore.password=test1234
ssl.keystore.location=/var/ssl/private/kafka.server.keystore.jks
ssl.keystore.password=test1234
ssl.key.password=test1234
sasl.enabled.mechanisms=PLAIN

# Confluent Metrics Reporter for monitoring with Confluent Control Center
metric.reporters=io.confluent.metrics.reporter.ConfluentMetricsReporter
confluent.metrics.reporter.security.protocol=SASL_SSL
confluent.metrics.reporter.ssl.truststore.location=/var/ssl/private/kafka.server.truststore.jks
confluent.metrics.reporter.ssl.truststore.password=test1234
confluent.metrics.reporter.sasl.mechanism=PLAIN
confluent.metrics.reporter.sasl.jaas.config=org.apache.kafka.common.security.plain.PlainLoginModule required \
   username="kafka-broker-metric-reporter" \
   password="kafkabroker-metric-reporter-secret";

# ACLs
authorizer.class.name=kafka.security.auth.SimpleAclAuthorizer
super.users=User:<DN of broker1>;User:<DN of broker2>;User:<DN of broker3>

注釈

これは完全なブローカー構成ではありません。Confluent Control Center を介して既に正常にモニタリングされているブローカーの Kafka クラスター(既知かつ動作中)でセキュリティを有効にするために必要な追加構成のみを抜粋したものです。

各 Kafka ブローカーを起動します。JAAS ファイルの名前を JVM パラメーターとして渡します。

export KAFKA_OPTS=-Djava.security.auth.login.config=/etc/kafka/kafka_server_jaas.conf
bin/kafka-server-start etc/kafka/server.properties

代わりに、次のようにして構成ファイルを変更することもできます。

listener.name.sasl_ssl.plain.sasl.jaas.config=org.apache.kafka.common.security.plain.PlainLoginModule required \
   username="admin" \
   password="admin-secret" \
   user_admin="admin-secret" \
   user_kafkabroker1="kafkabroker1-secret";

クライアントの構成

共通構成

保護された Kafka ブローカーとやり取りするコンポーネントはすべて「クライアント」であり、セキュリティ用に構成する必要があります。これらのクライアントには、Kafka Connect ワーカーと特定のコネクター(Replicator、Kafka Streams API クライアント、ksqlDB クライアント、Java 以外のクライアント、Confluent Control Center、Confluent Schema Registry、REST Proxy など)が含まれます。

セキュリティで保護された Kafka クラスターとのやり取りに必要なセキュリティ構成パラメーターの一般的なセットをすべてのクライアントで共有します。

  1. SSL 経由で暗号化し、SASL 経由で認証するには、SASL_SSL を使用するようにセキュリティプロトコルを構成します(SASL を使用せず、暗号化と認証の両方に SSL を使用する場合、セキュリティプロトコルは SSL になります)。

    security.protocol=SASL_SSL
    
  2. SSL 暗号化トラストストア設定を構成するには、トラストストア構成パラメーターを設定します。このチュートリアルでは、双方向 SSL ではなく SASL/PLAIN を介して認証が行われるため、クライアントにキーストアは必要ありません。

    ssl.truststore.location=/var/ssl/private/kafka.client.truststore.jks
    ssl.truststore.password=test1234
    
  3. SASL 認証を構成するには、SASL メカニズム(このチュートリアルでは PLAIN)を設定します。次に、Kafka ブローカーに接続する方法を記述するために、JAAS 構成のプロパティを構成します。ユーザーによる接続を構成するには、プロパティ username および password を使用します。

    sasl.mechanism=PLAIN
    sasl.jaas.config=org.apache.kafka.common.security.plain.PlainLoginModule required \
       username="client" \
       password="client-secret";
    

前述の構成手順をまとめると、クライアントで SSL 暗号化と SASL/PLAIN 認証を有効にするには、一般的な方法として、クライアントのプロパティファイルに以下を追加することになります。

security.protocol=SASL_SSL
ssl.truststore.location=/var/ssl/private/kafka.client.truststore.jks
ssl.truststore.password=test1234
sasl.mechanism=PLAIN
sasl.jaas.config=org.apache.kafka.common.security.plain.PlainLoginModule required \
    username="client" \
    password="client-secret";

以下のセクションで説明するように、各構成パラメーターの前に付く 構成プレフィックス は、クライアントによって異なります。

コンソールのプロデューサーとコンシューマーの構成

コンソールのプロデューサーとコンシューマー用のコマンドラインツールは、少量のデータをクラスターとの間で送受信する場合に便利な方法です。これらはクライアントであるため、セキュリティ構成も必要です。

  1. 前述のセキュリティ構成パラメーターを使用し、追加の構成プレフィックスなしで client_security.properties ファイルを作成します。

    security.protocol=SASL_SSL
    ssl.truststore.location=/var/ssl/private/kafka.client.truststore.jks
    ssl.truststore.password=test1234
    sasl.mechanism=PLAIN
    sasl.jaas.config=org.apache.kafka.common.security.plain.PlainLoginModule required \
        username="client" \
        password="client-secret";
    
  2. このプロパティファイルは、コマンドラインツールを使用するときに渡します。

    kafka-console-producer --broker-list kafka1:9094 --topic test-topic --producer.config client_security.properties
    kafka-console-consumer --bootstrap-server kafka1:9094 --topic test-topic --consumer.config client_security.properties
    

ksqlDB およびストリーム処理クライアントの構成

ksqlDB とストリーム処理クライアントのセキュリティを有効にするには、関連するクライアントコンストラクターにセキュリティ構成を渡すだけです。

基本的なクライアントセキュリティ構成は次のとおりです。

security.protocol=SASL_SSL
ssl.truststore.location=/var/ssl/private/kafka.client.truststore.jks
ssl.truststore.password=test1234
sasl.mechanism=PLAIN
sasl.jaas.config=org.apache.kafka.common.security.plain.PlainLoginModule required \
    username="client" \
    password="client-secret";

さらに、アプリケーションに以下を構成します。

  • トップレベル(追加の構成プレフィックスなし)
  • Confluent Control Center ストリームモニタリングに使用する Confluent モニタリングインターセプタープロデューサー(追加の構成プレフィックス producer.confluent.monitoring.interceptor.
  • Confluent Control Center ストリームモニタリングに使用する Confluent モニタリングインターセプターコンシューマー(追加の構成プレフィックス consumer.confluent.monitoring.interceptor.

これらの構成をまとめると、SSL 暗号化と SASL/PLAIN 認証用の ksqlDB 構成は次のようになります。これらを構成するには、以下に示すようにファイルからプロパティを読み込む方法と、プログラムでプロパティを設定する方法があります。

# Top level
security.protocol=SASL_SSL
ssl.truststore.location=/var/ssl/private/kafka.client.truststore.jks
ssl.truststore.password=test1234
sasl.mechanism=PLAIN
sasl.jaas.config=org.apache.kafka.common.security.plain.PlainLoginModule required username="client" password="client-secret";

# Embedded producer for streams monitoring with Confluent Control Center
producer.interceptor.classes=io.confluent.monitoring.clients.interceptor.MonitoringProducerInterceptor
producer.confluent.monitoring.interceptor.security.protocol=SASL_SSL
producer.confluent.monitoring.interceptor.ssl.truststore.location=/var/ssl/private/kafka.client.truststore.jks
producer.confluent.monitoring.interceptor.ssl.truststore.password=test1234
producer.confluent.monitoring.interceptor.sasl.mechanism=PLAIN
producer.confluent.monitoring.interceptor.sasl.jaas.config=org.apache.kafka.common.security.plain.PlainLoginModule required username="client" password="client-secret";

# Embedded consumer for streams monitoring with Confluent Control Center
consumer.interceptor.classes=io.confluent.monitoring.clients.interceptor.MonitoringConsumerInterceptor
consumer.confluent.monitoring.interceptor.security.protocol=SASL_SSL
consumer.confluent.monitoring.interceptor.ssl.truststore.location=/var/ssl/private/kafka.client.truststore.jks
consumer.confluent.monitoring.interceptor.ssl.truststore.password=test1234
consumer.confluent.monitoring.interceptor.sasl.mechanism=PLAIN
consumer.confluent.monitoring.interceptor.sasl.jaas.config=org.apache.kafka.common.security.plain.PlainLoginModule required username="client" password="client-secret";

注釈

これは完全な構成ではありません。Confluent Control Center を介してモニタリングされている ksqlDB または Java アプリケーション(既知かつ動作中)でセキュリティを有効にするために必要な追加構成のみを抜粋したものです。

Kafka Connect の構成

ブローカーから見ると、Kafka Connect は別のクライアントです。このチュートリアルでは、SSL 暗号化と SASL/PLAIN 認証用に Connect を構成します。Kafka Connect のセキュリティを有効にするには、Connect ワーカー、ソースコネクターで使用されるプロデューサー、およびシンクコネクターで使用されるコンシューマーにセキュリティ構成を渡すだけです。

基本的なクライアントセキュリティ構成は次のとおりです。

security.protocol=SASL_SSL
ssl.truststore.location=/var/ssl/private/kafka.client.truststore.jks
ssl.truststore.password=test1234
sasl.mechanism=PLAIN
sasl.jaas.config=org.apache.kafka.common.security.plain.PlainLoginModule required \
    username="client" \
    password="client-secret";

さらに、Kafka Connect に以下を構成します。

  • Connect ワーカー用のトップレベル(追加の構成プレフィックスなし)
  • ソースコネクター用の組み込みプロデューサー(追加の構成プレフィックス producer.
  • シンクコネクター用の組み込みコンシューマー(追加の構成プレフィックス consumer.
  • Confluent Control Center ストリームモニタリングに使用するソースコネクター用の Confluent モニタリングインターセプタープロデューサー(追加の構成プレフィックス producer.confluent.monitoring.interceptor.
  • Confluent Control Center ストリームモニタリングに使用するシンクコネクター用の Confluent モニタリングインターセプターコンシューマー(追加の構成プレフィックス consumer.confluent.monitoring.interceptor.

これらの構成をまとめると、SSL 暗号化と SASL/PLAIN 認証用の Kafka Connect ワーカー構成は次のようになります。これらの設定を connect-distributed.properties ファイルで構成することもできます。

# Connect worker
security.protocol=SASL_SSL
ssl.truststore.location=/var/ssl/private/kafka.client.truststore.jks
ssl.truststore.password=test1234
sasl.mechanism=PLAIN
sasl.jaas.config=org.apache.kafka.common.security.plain.PlainLoginModule required \
   username="connect" \
   password="connect-secret";

# Embedded producer for source connectors
producer.security.protocol=SASL_SSL
producer.ssl.truststore.location=/var/ssl/private/kafka.client.truststore.jks
producer.ssl.truststore.password=test1234
producer.sasl.mechanism=PLAIN
producer.sasl.jaas.config=org.apache.kafka.common.security.plain.PlainLoginModule required \
  username="connect" \
  password="connect-secret";

# Embedded consumer for sink connectors
consumer.security.protocol=SASL_SSL
consumer.ssl.truststore.location=/var/ssl/private/kafka.client.truststore.jks
consumer.ssl.truststore.password=test1234
consumer.sasl.mechanism=PLAIN
consumer.sasl.jaas.config=org.apache.kafka.common.security.plain.PlainLoginModule required \
  username="connect" \
  password="connect-secret";

# Embedded producer for source connectors for streams monitoring with Confluent Control Center
producer.interceptor.classes=io.confluent.monitoring.clients.interceptor.MonitoringProducerInterceptor
producer.confluent.monitoring.interceptor.security.protocol=SASL_SSL
producer.confluent.monitoring.interceptor.ssl.truststore.location=/var/ssl/private/kafka.client.truststore.jks
producer.confluent.monitoring.interceptor.ssl.truststore.password=test1234
producer.confluent.monitoring.interceptor.sasl.mechanism=PLAIN
producer.confluent.monitoring.interceptor.sasl.jaas.config=org.apache.kafka.common.security.plain.PlainLoginModule required \
  username="connect" \
  password="connect-secret";

# Embedded consumer for sink connectors for streams monitoring with Confluent Control Center
consumer.interceptor.classes=io.confluent.monitoring.clients.interceptor.MonitoringConsumerInterceptor
consumer.confluent.monitoring.interceptor.security.protocol=SASL_SSL
consumer.confluent.monitoring.interceptor.ssl.truststore.location=/var/ssl/private/kafka.client.truststore.jks
consumer.confluent.monitoring.interceptor.ssl.truststore.password=test1234
consumer.confluent.monitoring.interceptor.sasl.mechanism=PLAIN
consumer.confluent.monitoring.interceptor.sasl.jaas.config=org.apache.kafka.common.security.plain.PlainLoginModule required \
  username="connect" \
  password="connect-secret";

注釈

これは完全な Connect ワーカー構成ではありません。Confluent Control Center を介して既に正常にモニタリングされている Kafka Connect クラスター(既知かつ動作中)でセキュリティを有効にするために必要な追加構成のみを抜粋したものです。

このプロパティファイルは、各 Connect ワーカーを起動するときに渡します。

bin/connect-distributed etc/kafka/connect-distributed.properties

Replicator

Confluent Replicator は、Kafka ソースコネクターの一種で、送信元 Kafka クラスターから送信先クラスターにデータを複製します。Replicator に組み込まれたコンシューマーは、送信元クラスターからデータを読み取り、Kafka Connect ワーカーに組み込まれたプロデューサーは送信先クラスターにデータを書き込みます。

基本的なクライアントセキュリティ構成は次のとおりです。

security.protocol=SASL_SSL
ssl.truststore.location=/var/ssl/private/kafka.client.truststore.jks
ssl.truststore.password=test1234
sasl.mechanism=PLAIN
sasl.jaas.config=org.apache.kafka.common.security.plain.PlainLoginModule required \
    username="client" \
    password="client-secret";

さらに、Replicator に以下を構成します。

  • オリジンクラスターからのトップレベル Replicator コンシューマー(追加の構成プレフィックス src.kafka.
  • Confluent Control Center ストリームモニタリングに使用する、オリジンクラスターからの Confluent モニタリングインターセプターコンシューマー(追加の構成プレフィックス src.consumer.confluent.monitoring.interceptor.

前述の構成手順をまとめると、Replicator の JSON プロパティファイルに含まれる構成設定は次のようになります。

{
  "name":"replicator",
  "config":{
    ....
    "src.kafka.security.protocol" : "SASL_SSL",
    "src.kafka.ssl.truststore.location" : "var/private/ssl/kafka.server.truststore.jks",
    "src.kafka.ssl.truststore.password" : "test1234",
    "src.kafka.sasl.mechanism" : "PLAIN",
    "src.kafka.sasl.jaas.config" : "org.apache.kafka.common.security.plain.PlainLoginModule required username=\"replicator\" password=\"replicator-secret\";",
    "src.consumer.interceptor.classes": "io.confluent.monitoring.clients.interceptor.MonitoringConsumerInterceptor",
    "src.consumer.confluent.monitoring.interceptor.security.protocol": "SASL_SSL",
    "src.consumer.confluent.monitoring.interceptor.ssl.truststore.location": "/var/ssl/private/kafka.client.truststore.jks",
    "src.consumer.confluent.monitoring.interceptor.ssl.truststore.password": "confluent",
    "src.consumer.confluent.monitoring.interceptor.sasl.mechanism": "PLAIN",
    "src.consumer.confluent.monitoring.interceptor.sasl.jaas.config": "org.apache.kafka.common.security.plain.PlainLoginModule required username="client" password="client-secret";",
    ....
  }
}

注釈

これは完全な Replicator 構成ではありません。Confluent Control Center を使用して既に正常にモニタリングされている Replicator コネクター(既知かつ動作中)でセキュリティを有効にするために必要な追加構成のみを抜粋したものです。

Kafka Connect の起動後、Confluent Replicator を追加できます。

curl -X POST -H "Content-Type: application/json" --data @replicator_properties.json http://connect:8083/connectors

Confluent Control Center

Confluent Control Center ではストリーム処理に Kafka Streams を使用するため、Control Center を支えるモニタリングクラスター内のすべての Kafka ブローカーが保護されている場合は、別のクライアントである Control Center アプリケーションも保護する必要があります。

基本的なクライアントセキュリティ構成に、構成プレフィックス confluent.controlcenter.streams. を追加します。etc/confluent-control-center/control-center.properties ファイルで、以下のすべての修正を加えます。

confluent.controlcenter.streams.security.protocol=SASL_SSL
confluent.controlcenter.streams.ssl.truststore.location=/var/ssl/private/kafka.client.truststore.jks
confluent.controlcenter.streams.ssl.truststore.password=test1234
confluent.controlcenter.streams.sasl.mechanism=PLAIN
confluent.controlcenter.streams.sasl.jaas.config=org.apache.kafka.common.security.plain.PlainLoginModule required \
  username="confluent" \
  password="confluent-secret";

注釈

これは完全な Confluent Control Center 構成ではありません。Confluent Control Center のデプロイ(既知かつ動作中)でセキュリティを有効にするために必要な追加構成のみを抜粋したものです。

Confluent Control Center を起動します。

bin/control-center-start etc/confluent-control-center/control-center.properties

Confluent Metrics Reporter

Confluent Control Center を使用してデプロイをモニタリングしている場合は、Confluent Metrics Reporter もクライアントです。Confluent Control Center を支えるモニタリングクラスターが同じセキュリティプロトコルで構成されている場合は、各ブローカーの server.properties ファイルで Confluent Metrics Reporter をセキュリティ用に構成します。構成プレフィックスは confluent.metrics.reporter. です。前述の説明 を参照してください。

Confluent モニタリングインターセプター

Confluent モニタリングインターセプターを使用して Confluent Control Center でストリームモニタリング統計をレポートするコンポーネントのセキュリティを構成します。

# Embedded producer for streams monitoring with Confluent Control Center
producer.interceptor.classes=io.confluent.monitoring.clients.interceptor.MonitoringProducerInterceptor
producer.confluent.monitoring.interceptor.security.protocol=SASL_SSL
producer.confluent.monitoring.interceptor.ssl.truststore.location=/var/ssl/private/kafka.client.truststore.jks
producer.confluent.monitoring.interceptor.ssl.truststore.password=test1234
producer.confluent.monitoring.interceptor.sasl.mechanism=PLAIN
producer.confluent.monitoring.interceptor.sasl.jaas.config=org.apache.kafka.common.security.plain.PlainLoginModule required username="client" password="client-secret";

# Embedded consumer for streams monitoring with Confluent Control Center
consumer.interceptor.classes=io.confluent.monitoring.clients.interceptor.MonitoringConsumerInterceptor
consumer.confluent.monitoring.interceptor.security.protocol=SASL_SSL
consumer.confluent.monitoring.interceptor.ssl.truststore.location=/var/ssl/private/kafka.client.truststore.jks
consumer.confluent.monitoring.interceptor.ssl.truststore.password=test1234
consumer.confluent.monitoring.interceptor.sasl.mechanism=PLAIN
consumer.confluent.monitoring.interceptor.sasl.jaas.config=org.apache.kafka.common.security.plain.PlainLoginModule required username="client" password="client-secret";

認可と ACL

RBAC の使用時に ACL を追加、削除、またはリストするには、一元的な ACL を使用します。ACL 管理の最も一般的なタスクは、プロデューサーまたはコンシューマーとしてのプリンシパルの追加または削除です。たとえば、client-1 というクライアントを test-topic というトピックのプロデューサーおよびコンシューマーとして追加するには、以下を実行します。

confluent iam acl create --allow --principal User:client-1 --operation write --topic test-topic --kafka-cluster-id <kafka-cluster-id>
confluent iam acl create --allow --principal User:client-1 --operation read --topic test-topic --kafka-cluster-id <kafka-cluster-id>

CLI の詳細については、「confluent iam」で ACL サブコマンドを参照してください。RBAC を実行していない場合は、「ACL を使用した認可」を参照してください。

トラブルシューティング

最初の試行で構成が正しく動作しない場合は、出力をデバッグすると、問題の原因を診断するために役立ちます。

  1. ブローカーとクライアントで、キーストアとトラストストアのキーと証明書を検証します。

    keytool -list -v -keystore /var/ssl/private/kafka.server.keystore.jks
    
  2. etc/kafka/log4j.properties ファイルを変更して、Kafka 認可ログを有効にします。ログレベルを DEBUG に変更してから、ブローカーを再始動します。

    log4j.logger.kafka.authorizer.logger=DEBUG, authorizerAppender
    
  3. javax.net.debug システムプロパティを使用して SSL デバッグ出力を有効にします。これには、JVM の再起動が必要です。

    export KAFKA_OPTS=-Djavax.net.debug=all
    
  4. sun.security.krb5.debug システムプロパティを使用して、SASL デバッグ出力を有効にします。これには、JVM の再起動が必要です。

    export KAFKA_OPTS=-Dsun.security.krb5.debug=true
    

次のステップ

完全に保護されたマルチノードクラスターについては、Docker ベースの Confluent Platform のデモ を参照してください。Confluent Platform のすべてのコンポーネントについて、セキュリティ関連およびセキュリティ関連以外の構成パラメーターを含む構成全体を示しています。デモのプレイブックには、さらに詳しく学習できるセキュリティセクションがあります。

Confluent Platform のすべてのコンポーネントに関するセキュリティ設計と構成の詳細については、ドキュメント を参照してください。このチュートリアルでは SASL の例として PLAIN メカニズムを使用していますが、Confluent では、本稼働環境により適した GSSAPI(Kerberos)SCRAM もサポートしています。

Slack の Confluent Community セキュリティチャンネルから、フィードバックをお寄せください。