Kafka Streams のセキュリティ

Kafka Streams では、Apache Kafka® の セキュリティ機能 がネイティブに統合され、Kafka のクライアント側セキュリティ機能がすべてサポートされます。Kafka Streams では、Java のプロデューサーおよびコンシューマー API が利用されます。

ストリーム処理アプリケーションのセキュリティを確保するには、対応する Kafka プロデューサーおよびコンシューマークライアントでセキュリティ設定を構成してから、Kafka Streams アプリケーションでの対応する構成設定を指定します。

Kafka は、認証されたクライアントと認証されていないクライアント、および暗号化されたクライアントと暗号化されていないクライアントの混在を含む、クラスターの暗号化と認証をサポートします。セキュリティの使用はオプションです。

以下に、関連するいくつかのクライアント側セキュリティ機能を示します。

アプリケーションと Kafka ブローカー間を伝送するデータの暗号化
アプリケーションと Kafka ブローカー間でクライアント/サーバー通信の暗号化を有効にすることができます。たとえば、Kafka との間でデータを読み書きするときに、常に暗号化を使用するようにアプリケーションを構成できます。これは、内部ネットワーク、パブリックインターネット、パートナーネットワークなど、セキュリティドメイン間でデータの読み書きを行う場合にきわめて重要です。
クライアント認証
アプリケーションから Kafka ブローカーへの接続でクライアント認証を有効にすることができます。たとえば、特定のアプリケーションにのみ Kafka クラスターへの接続が許可されるように定義できます。
クライアント認可
アプリケーションが行う読み取りおよび書き込み操作について、クライアント認可を有効にすることができます。たとえば、特定のアプリケーションにのみ Kafka トピックの読み書きが許可されるように定義できます。データの汚染や不正行為を防ぐために、Kafka トピックへの書き込みアクセスを制限することもできます。

Kafka のセキュリティ機能の詳細については、「Kafka のセキュリティ」およびブログの投稿「Apache Kafka Security 101」を参照してください。

セキュアな Kafka クラスターに必要な ACL 設定

Kafka クラスターでは、ACL を使用してリソースへのアクセス(トピックの作成などの機能)を制御できます。このようなクラスターでは、Kafka Streams を含む各クライアントに適切なアクセスを認可するために、クライアントを特定のユーザーとして認証することが求められます。特に、Kafka Streams アプリケーションがセキュアな Kafka クラスターに対して実行される場合は、アプリケーションを実行するプリンシパルの ACL が、 内部トピック を作成するアクセス許可をアプリケーションに与えるように設定されている必要があります。

このアクセス許可をアプリケーションに付与しない場合は、必要な内部トピックを手動で作成します。内部トピックが存在していれば、Kafka Streams で再作成が試みられることはありません。

注釈

内部の再パーティショントピックと changelog トピックは、適切な数のパーティションを持つように作成する必要があります。そうでない場合、Kafka Streams の起動時にエラーが発生します。これらのトピックには、入力トピックと同じ数のパーティションが必要です。複数のトピックがある場合は、すべての入力トピックの中で最大のパーティション数を使用する必要があります。

また、changelog トピックはログの圧縮を有効にして作成する必要があります。そうでない場合、アプリケーションでデータの損失が発生することがあります。ウィンドウ化された KTable の changelog トピックでは、"delete,compact" を適用し、対応するストアに基づいて保持時間を設定します。未完了の状態での削除を避けるために、ストアの保持時間に猶予時間を追加してください。デフォルトでは、Kafka Streams はストアの保持時間に 24 時間を追加します。

Topology#describe() を使用すると、必要な内部トピックの名前を詳しく確認できます。すべての内部トピックは、<application.id>-<operatorName>-<suffix> という名前付けパターンに従います。 suffixrepartitionchangelog のどちらかになります。

注釈

この名前付けパターンはパブリック API の一部ではないため、将来のリリースでも変わらないという保証はありません。

最適なセキュリティを確保するには、最小限の ACL 操作を設定します。Kafka Streams のプリンシパルには以下の操作だけを許可します。

  • トピックリソース(内部トピック): READ、DELETE、WRITE、CREATE
  • コンシューマーグループリソース: READ、DESCRIBE
  • トピックリソース(入力トピック): READ
  • トピックリソース(出力トピック): WRITE

たとえば、以下のような設定の Kafka Streams アプリケーションがあるとします。

  • application.id 構成の値は team1-streams-app1 です。
  • Kafka クラスターに team1 ユーザーとして認証します。
  • アプリケーションにコーディングされたトポロジーは、入力トピック input-topic1input-topic2 から読み取ります。
  • アプリケーションのトポロジーは、出力トピック output-topic1output-topic2 に書き込みます。
  • アプリケーションは厳密に 1 回の処理の保証が有効になっています(processing.guarantee=exactly_once)。

以下のコマンドは、このアプリケーションを機能させるために必要な ACL を Kafka クラスターに作成します。

# Allow Streams to read the input topics:
bin/kafka-acls ... --add --allow-principal User:team1 --operation READ --topic input-topic1 --topic input-topic2

# Allow Streams to write to the output topics:
bin/kafka-acls ... --add --allow-principal User:team1 --operation WRITE --topic output-topic1 --topic output-topic2

# Allow Streams to manage its own internal topics:
bin/kafka-acls ... --add --allow-principal User:team1 --operation READ --operation DELETE --operation WRITE --operation CREATE --resource-pattern-type prefixed --topic team1-streams-app1

# Allow Streams to manage its own consumer groups:
bin/kafka-acls ... --add --allow-principal User:team1 --operation READ --operation DESCRIBE --resource-pattern-type prefixed --group team1-streams-app1

# Allow Streams EOS:
bin/kafka-acls ... --add --allow-principal User:team1 --operation WRITE --operation DESCRIBE --transactional-id team1-streams-app1 --resource-pattern-type prefixed

Kafka Streams 内のトピックを再パーティション化する予定がある場合は、cleanup.policy=delete を指定して DELETE 操作を許可してください。DELETE 操作は、再パーティション化の後、クリーンアップで古いレコードをログから確実に削除するために必要です。DELETE 操作を許可しないと、ファイルディスクリプターの使用量が増える可能性があります。

RBAC ロールバインディング

Kafka Streams では、Kafka クラスターのリソースに対するアクセスを制御するためのロールベースアクセス制御(RBAC)がサポートされています。

次の表は、クラスターリソースにアクセスするために必要な RBAC ロールの一覧です。

リソース ロール コマンド 注意事項
入力トピック DeveloperRead
confluent iam rbac role-binding create \
  --principal User:<interactive_user_name> \
  --role DeveloperRead \
  --resource Topic:<kafka_topic_name> \
  --kafka-cluster-id <kafka_cluster_id>
 
出力トピック DeveloperWrite
confluent iam rbac role-binding create \
  --principal User:<interactive_user_name> \
  --role DeveloperWrite \
  --resource Topic:<kafka_topic_name> \
  --kafka-cluster-id <kafka_cluster_id>
 
内部トピック ResourceOwner
confluent iam rbac role-binding create \
  --principal User:<interactive_user_name> \
  --role ResourceOwner \
  --prefix \
  --resource Topic:<application_id> \
  --kafka-cluster-id <kafka_cluster_id>
内部の削除呼び出しなど、内部トピック管理のためにすべての内部トピックで必要です。
べき等性を持つプロデューサー DeveloperWrite
confluent iam rbac role-binding create \
  --principal User:<interactive_user_name> \
  --role DeveloperWrite \
  --resource Cluster:<kafka_cluster_id>  \
  --kafka-cluster-id <kafka_cluster_id>
トピックが含まれていないため、クラスターにロールバインディングがあります。
トランザクションプロデューサー DeveloperWrite
confluent iam rbac role-binding create \
  --principal User:<interactive_user_name> \
  --role DeveloperWrite \
  --prefix \
  --resource TransactionalId:<application_id> \
  --kafka-cluster-id <kafka_cluster_id>
processing.guaranteeexactly_once または exactly_once_v2 が設定されている場合。
Consumer group DeveloperRead
confluent iam rbac role-binding create \
  --principal User:<interactive_user_name> \
  --role DeveloperRead \
  --prefix \
  --resource Group:<application_id> \
  --kafka-cluster-id <kafka_cluster_id>
 
入力/出力トピックのある Schema Registry DeveloperRead
confluent iam rbac role-binding create \
  --principal User:<interactive_user_name> \
  --role DeveloperRead \
  --prefix \
  --resource Subject:<topic_prefix> \
  --kafka-cluster-id <kafka_cluster_id>
リソースが Subject:<record_name_prefix> の場合もあります。
内部トピックのある Schema Registry ResourceOwner
confluent iam rbac role-binding create \
  --principal User:<interactive_user_name> \
  --role ResourceOwner \
  --prefix \
  --resource Subject:<application_id> \
  --kafka-cluster-id <kafka_cluster_id>
内部トピックのスキーマ使用が有効な場合。

セキュリティの例

以下の例は、Confluent ブログの投稿「Apache Kafka Security 101」を基にしています。この例の目的は、クライアント認証を有効にして、Kafka クラスターとの通信時に伝送中のデータを暗号化するように Kafka Streams アプリケーションを構成することです。

ちなみに

完全なデモアプリケーションは、Confluent のサンプルリポジトリの SecureKafkaStreamsExample.java に用意されています。

この例では、クラスター内の Kafka ブローカーのセキュリティが既にセットアップされ、必要な SSL 証明書がアプリケーションのローカルファイルシステムの場所から使用可能になっていることを前提としています。たとえば、Docker を使用している場合は、これらの SSL 証明書を Docker イメージ内の適切な場所にも含める必要があります。

以下のスニペットは、クライアント認証を有効にして、Kafka Streams アプリケーションと、その読み書き先の Kafka クラスターとの間を伝送するデータを SSL で暗号化する設定を示しています。

# Essential security settings to enable client authentication and SSL encryption
bootstrap.servers=kafka.example.com:9093
security.protocol=SSL
ssl.truststore.location=/etc/security/tls/kafka.client.truststore.jks
ssl.truststore.password=test1234
ssl.keystore.location=/etc/security/tls/kafka.client.keystore.jks
ssl.keystore.password=test1234
ssl.key.password=test1234

これらの設定を、アプリケーションで StreamsConfig インスタンスに対して構成します。これらの設定により、Kafka との間で読み書きされるすべての伝送中のデータが暗号化され、アプリケーションは自身を通信先の Kafka ブローカーに対して認証します。この例では、クライアント認可には対応していないことに注意してください。

// Code of your Java application that uses the Kafka Streams library
Properties settings = new Properties();
settings.put(StreamsConfig.APPLICATION_ID_CONFIG, "secure-kafka-streams-app");
// Where to find secure Kafka brokers.  Here, it's on port 9093.
settings.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "kafka.example.com:9093");
//
// ...further non-security related settings may follow here...
//
// Security settings.
// 1. These settings must match the security settings of the secure Kafka cluster.
// 2. The SSL trust store and key store files must be locally accessible to the application.
settings.put(CommonClientConfigs.SECURITY_PROTOCOL_CONFIG, "SSL");
settings.put(SslConfigs.SSL_TRUSTSTORE_LOCATION_CONFIG, "/etc/security/tls/kafka.client.truststore.jks");
settings.put(SslConfigs.SSL_TRUSTSTORE_PASSWORD_CONFIG, "test1234");
settings.put(SslConfigs.SSL_KEYSTORE_LOCATION_CONFIG, "/etc/security/tls/kafka.client.keystore.jks");
settings.put(SslConfigs.SSL_KEYSTORE_PASSWORD_CONFIG, "test1234");
settings.put(SslConfigs.SSL_KEY_PASSWORD_CONFIG, "test1234");

アプリケーションで構成したセキュリティ設定が正しくない場合は、実行時、通常は起動直後にエラーが発生します。たとえば、ssl.keystore.password 設定に入力したパスワードが間違っていると、以下のようなエラーメッセージが記録され、アプリケーションが終了します。

# Misconfigured ssl.keystore.password
Exception in thread "main" org.apache.kafka.common.KafkaException: Failed to construct kafka producer
[...snip...]
Caused by: org.apache.kafka.common.KafkaException: org.apache.kafka.common.KafkaException:
   java.io.IOException: Keystore was tampered with, or password was incorrect
[...snip...]
Caused by: java.security.UnrecoverableKeyException: Password verification failed

Kafka Streams アプリケーションのログファイルをモニタリングしてこのようなエラーメッセージを検出すると、不適切なアプリケーション構成をすばやく特定できます。

注釈

このウェブサイトには、Apache License v2 の条件に基づいて Apache Software Foundation で開発されたコンテンツが含まれています。