TLS での暗号化と認証

TLS の概要

Confluent Cloud では、OpenSSL に基づく Transport Layer Security(TLS) の暗号化をサポートしています。OpenSSL は、Transport Layer Security(TLS)および Secure Socket Layer(SSL)プロトコルの実装を提供するオープンソースの暗号化ツールキットです。TLS 認証では、サーバーがクライアントを認証します("双方向の認証" とも呼ばれます)。

TLS 認証では TLS 暗号化が必須となるため、このページでは両者を同時に構成する方法を示します。この方法は、 SSL の暗号化 だけに必要な構成のスーパーセットとなります。

デフォルトでは、Apache Kafka® は PLAINTEXT で通信します。つまりすべてのデータはプレーンテキストで送信されます。通信を暗号化するには、デプロイのすべでの Confluent Platform コンポーネントで TLS/SSL 暗号化を使用するように構成する必要があります。

Confluent Cloud では、OpenSSL に基づく Transport Layer Security(TLS) の暗号化をサポートしています。OpenSSL は、Transport Layer Security(TLS)および Secure Socket Layer(SSL)プロトコルの実装を提供するオープンソースの暗号化ツールキットです。TLS 認証では、サーバーがクライアントを認証します("双方向の認証" とも呼ばれます)。

Secure Sockets Layer(SSL)は Transport Layer Security(TLS)の元になったもので、2015 年 6 月に非推奨になりました。過去の経緯から、構成やコードで SSLTLS の代わりに使用されています。

暗号化に対して TLS を構成できますが、認証に対しても TLS を構成できます。TLS 暗号化(デフォルトでは、TLS 暗号化には、サーバーの証明書認証が含まれる)のみを構成し、クライアント認証には別のメカニズム(TLSSASL など)を個別に選択することもできます。技術的には、TLS の暗号化の時点で、一方向の認証、つまりクライアントによるサーバー証明書の認証は、既に実行しています。このトピックにおいて、"TLS 認証" は双方向認証(ブローカーによるクライアント証明書の認証も実行される)を指しています。

TLS を有効にすると、暗号化オーバーヘッドのために、パフォーマンスに影響が出ることがあります。

TLS はプライベートキーと証明書のペアを使用します。これは TLS ハンドシェイクプロセスで使用されます。

  • 各ブローカーには個別のプライベートキーと証明書のペアが必要で、クライアントはこの証明書を使用して、対象のブローカーを認証します。
  • 各論理クライアントには、クライアント認証が有効な場合、プライベートキーと証明書のペアが必要で、ブローカーはこの証明書を使用して、対象クライアントを認証します。

各ブローカーと論理クライアントでトラストストアを構成できます。これは、信頼(認証)する対象の証明書(ブローカーまたは論理クライアントの ID)を判断するために使用されます。多くの方法でトラストストアを構成できます。次の 2 つの例を考えます。

  • トラストストアには 1 つまたは多数の証明書が含まれます。ブローカーまたは論理クライアントはこのトラストストアにリストされた証明書を信頼します。
  • トラストストアには認証機関(CA)が含まれます。ブローカーまたは論理クライアントは、トラストストアにある CA に署名された証明書を信頼します。

CA 方式を使用する方が便利です。新しいブローカーやクライアントを追加したときに、トラストストアの変更が必要ないためです。CA の方式の概要を、こちらの図 をご覧ください。

ただし、CA 方式では、このメカニズムで一度信頼した個別のブローカーやクライアントの認証を Kafka で簡単にブロックする方法をサポートしていません(証明書の失効は、一般に証明書失効リストや OCSP(オンライン証明書ステータスプロトコル)で実行します)。したがって、認可でアクセスをブロックする必要があります。

一方、1 つまたは多数の証明書を使用している場合、認証のブロックは、トラストストアからブローカーやクライアントの証明書を削除することにより、実行されます。

参考

この構成の動作例については、Confluent Platform デモ を参照してください。構成リファレンスについては、デモの docker-compose.yml ファイル を参照してください。

TLS キーと証明書の作成

詳細については、「セキュリティのチュートリアル」を参照してください。ここでは、TLS キーと証明書の作成 方法が説明されています。

Brokers

Kafka クラスター内のすべてのブローカーを、クライアントからのセキュアな接続を受け付けるように構成します。ブローカーに構成の変更内容が反映されるためには、 ローリング再起動 が必要です。

次のセクションに示すとおり、Kafka ブローカーのセキュリティを有効にします。さらに、Confluent Control Center や Auto Data Balancer を使用している場合、次のコンポーネントに対してブローカーを構成します。

注釈

ブローカーの必須の構成プロパティと省略可能な構成プロパティの詳細については、「Kafka ブローカーの構成」を参照してください。

  1. トラストストア、キーストア、およびパスワードを、すべてのブローカーの server.properties ファイルに構成します。ブローカーの構成ファイルにパスワードが直接保管されるため、ファイルシステムのアクセス許可を使用して、これらのファイルへのアクセスを制限することが重要です。

    ssl.truststore.location=/var/private/ssl/kafka.server.truststore.jks
    ssl.truststore.password=test1234
    ssl.keystore.location=/var/private/ssl/kafka.server.keystore.jks
    ssl.keystore.password=test1234
    ssl.key.password=test1234
    
    Copy

    ssl.truststore.password は技術的には省略可能ですが、使用することを強くお勧めします。パスワードが設定されていなくてもトラストストアにアクセスできますが、整合性チェックが無効になります。

  2. ブローカー間通信において TLS を有効にする場合は、次の行をブローカーのプロパティファイルに追加します(デフォルトは PLAINTEXT)。

    security.inter.broker.protocol=SSL
    
    Copy
  3. クライアントやブローカー間の TLS(SSL)接続をリッスンするために、Apache Kafka® ブローカーのポートを構成します。listeners を構成する必要があります。また、listeners と値が異なる場合は、必要に応じて advertised.listeners を構成する必要があります。

    listeners=SSL://kafka1:9093
    advertised.listeners=SSL://localhost:9093
    
    Copy
  4. 以下に該当する場合は、TLS(SSL)ポートと PLAINTEXT ポートを構成します。

    • TLS をブローカー間通信に対して有効にしない
    • クラスターに接続する一部のクライアントで TLS を使用しない
    listeners=PLAINTEXT://kafka1:9092,SSL://kafka1:9093
    advertised.listeners=PLAINTEXT://localhost:9092,SSL://localhost:9093
    
    Copy

    advertised.host.nameadvertised.port は、単一の PLAINTEXT ポート用で、セキュアなプロトコルと互換性がないことに注意してください。代わりに advertised.listeners を使用します。

  5. ブローカーでクライアントの認証を有効にする(双方向認証)には、クライアント認証をすべてのブローカーで構成する必要があります。このとき、requested ではなく required を使用するように構成します。これは、構成が正しくないクライアントの接続が成功し、セキュリティが確保されていると誤って認識してしまう可能性があるためです。

    ssl.client.auth=required
    
    Copy

    注釈

    SASL 認証メカニズムが対象のリスナーで有効になっている場合、ssl.client.auth=required を指定している場合でも、TLS クライアントの認証が無効になります。対象リスナーで SASL を使用する場合にのみ、ブローカーはクライアントを認証します。

オプション設定

ssl.endpoint.identification.algorithm
クライアントで使用してサーバーホスト名を検証するエンドポイント識別アルゴリズム。デフォルト値は https です。ブローカー間通信のためにブローカーによって行われるクライアント接続を含め、クライアントではブローカーのホスト名がその証明書のホスト名と一致することを検証します。サーバーホスト名の検証を無効にするには、ssl.endpoint.identification.algorithm を空の文字列に設定します。
  • 型: string
  • デフォルト: https
  • 重要度: 中
ssl.cipher.suites
暗号スイートは、認証、暗号化、MAC、およびキー交換アルゴリズムの名前付き組み合わせで、TLS ネットワークプロトコルを使用したネットワーク接続のセキュリティ設定をネゴシエートするために使用されます。
  • 型: リスト
  • デフォルト: null(デフォルトで、サポートするすべての暗号スイートが有効)
  • 重要度: 中
ssl.enabled.protocols
TLS 接続で有効なプロトコルのリスト。
  • 型: リスト
  • デフォルト: TLSv1.2、TLSv1.1、TLSv1
  • 重要度: 中
ssl.truststore.type
トラストストアファイルのファイルフォーマットです。
  • 型: string
  • デフォルト: JKS
  • 重要度: 中

一部の国における輸入規制のため、Oracle の実装では、デフォルトで暗号化アルゴリズムの強度が制限されています。強力なアルゴリズムが必要な場合(256 ビットキーの AES など)、JCE Unlimited Strength Jurisdiction ポリシーファイル を入手し、JDK/JRE にインストールする必要があります。詳細については、JCA プロバイダーのドキュメント を参照してください。

クライアント

新しいプロデューサーとコンシューマークライアントは、Kafka バージョン 0.9.0 以上でセキュリティをサポートします。

Kafka Streams API を使用している場合、同等の SSL パラメーターおよび SASL パラメーターを構成する方法を参照してください。

次の構成例では、クライアント認証がブローカーで要求されることを前提にしています。したがってクライアントプロパティファイル client-ssl.properties に認証情報を格納できます。ブローカーの構成ファイルにパスワードが直接保管されるため、ファイルシステムのアクセス許可を使用して、これらのファイルへのアクセスを制限することが重要です。

重要

以下の設定を Schema Registry または REST Proxy に対して構成する場合、各パラメーターのプレフィックスとして confluent.license を使用する必要があります。たとえば、security.protocolconfluent.license.security.protocol になります。

bootstrap.servers=kafka1:9093
security.protocol=SSL
ssl.truststore.location=/var/private/ssl/kafka.client.truststore.jks
ssl.truststore.password=test1234
ssl.keystore.location=/var/private/ssl/kafka.client.keystore.jks
ssl.keystore.password=test1234
ssl.key.password=test1234
Copy

ssl.truststore.password は技術的には省略可能ですが、使用することを強くお勧めします。パスワードが設定されていなくてもトラストストアにアクセスできますが、整合性チェックが無効になります。

次の例では、kafka-console-producerkafka-console-consumer を使用し、前に定義した client-ssl.properties を渡します。

bin/kafka-console-producer --broker-list kafka1:9093 --topic test --producer.config client-ssl.properties
bin/kafka-console-consumer --bootstrap-server kafka1:9093 --topic test --consumer.config client-ssl.properties --from-beginning
Copy

オプション設定

ssl.endpoint.identification.algorithm
クライアントで使用してサーバーホスト名を検証するエンドポイント識別アルゴリズム。デフォルト値は https です。ブローカー間通信のためにブローカーによって行われるクライアント接続を含め、クライアントではブローカーのホスト名がその証明書のホスト名と一致することを検証します。サーバーホスト名の検証を無効にするには、ssl.endpoint.identification.algorithm を空の文字列に設定します。
  • 型: string
  • デフォルト: https
  • 重要度: 中
ssl.provider
TLS 接続に使用するセキュリティプロバイダーの名前。デフォルト値は、Java 仮想マシンのデフォルトのセキュリティプロバイダーです。
  • 型: string
  • デフォルト: null
  • 重要度: 中
ssl.cipher.suites
暗号スイートは、認証、暗号化、MAC、およびキー交換アルゴリズムの名前付き組み合わせで、TLS ネットワークプロトコルを使用したネットワーク接続のセキュリティ設定をネゴシエートするために使用されます。
  • 型: リスト
  • デフォルト: null(デフォルトで、サポートするすべての暗号スイートが有効)
  • 重要度: 中
ssl.enabled.protocols
TLS 接続で有効なプロトコルのリスト。
  • 型: リスト
  • デフォルト: TLSv1.2、TLSv1.1、TLSv1
  • 重要度: 中
ssl.truststore.type
トラストストアファイルのファイルフォーマットです。
  • 型: string
  • デフォルト: JKS
  • 重要度: 中

ZooKeeper

Confluent Platform バージョン 5.5.0 以降、Kafka にバンドルされるバージョンの ZooKeeper で TLS をサポートしています。詳細については、「実行中クラスターへのセキュリティの追加」を参照してください。

Kafka Connect

このセクションでは、Kafka Connect のセキュリティを有効にする方法について説明します。Kafka Connect をセキュアにするには、次の対象でセキュリティを構成する必要があります。

  1. Kafka Connect ワーカー : Kafka Connect API の一部で、ワーカーは実際には内部的に機能する高度なクライアントです。
  2. Kafka Connect コネクター: コネクターにはプロデューサーとコンシューマーを組み込むことができます。そのため、ソースコネクターで使用される Connect プロデューサーやシンクコネクターで使用される Connect コンシューマーのデフォルトの構成をオーバーライドする必要があります。
  3. Kafka Connect REST: Kafka Connect では、 追加のプロパティ を使って SSL を使用するように構成できる REST API が公開されています。

次のセクションに示すとおり、Kafka Connect のセキュリティを構成します。さらに、Confluent Control Center のストリームモニタリングを Kafka Connect に対して使用している場合、次の対象でセキュリティを構成します。

次のプロパティを connect-distributed.properties に追加することで、Connect ワーカーが TLS を使用するトップレベルの設定を構成します。これらのトップレベルの設定は、Connect ワーカーでグループ調整のために使用されます。またこれらの設定を使用して、クラスターのステート(構成とオフセットなど)を追跡するために使われる内部トピックの読み取りと書き込みが実行されます。ここでは、クライアント認証がブローカーにより要求されていることが前提です。

bootstrap.servers=kafka1:9093
security.protocol=SSL
ssl.truststore.location=/var/private/ssl/kafka.client.truststore.jks
ssl.truststore.password=test1234
ssl.keystore.location=/var/private/ssl/kafka.client.keystore.jks
ssl.keystore.password=test1234
ssl.key.password=test1234
Copy

Connect ワーカーでは、ソースコネクターで使用されるプロデューサーおよびシンクコネクターで使用されるコンシューマーを管理します。したがって、セキュリティを活用するコネクターでは、ワーカーが使用するプロデューサーやコンシューマーのデフォルト構成をオーバーライドする必要もあります。ここでは、クライアント認証がブローカーにより要求されていることが前提です。

  • ソースコネクター : 同じプロパティに producer プレフィックスを追加して構成します。

    producer.bootstrap.servers=kafka1:9093
    producer.security.protocol=SSL
    producer.ssl.truststore.location=/var/private/ssl/kafka.client.truststore.jks
    producer.ssl.truststore.password=test1234
    producer.ssl.keystore.location=/var/private/ssl/kafka.client.keystore.jks
    producer.ssl.keystore.password=test1234
    producer.ssl.key.password=test1234
    
    Copy
  • シンクコネクター : 同じプロパティに consumer プレフィックスを追加して構成します。

    consumer.bootstrap.servers=kafka1:9093
    consumer.security.protocol=SSL
    consumer.ssl.truststore.location=/var/private/ssl/kafka.client.truststore.jks
    consumer.ssl.truststore.password=test1234
    consumer.ssl.keystore.location=/var/private/ssl/kafka.client.keystore.jks
    consumer.ssl.keystore.password=test1234
    consumer.ssl.key.password=test1234
    
    Copy

ちなみに

詳細については「Kafka Connect のセキュリティの基礎」を参照してください。

Confluent Replicator

Confluent Replicator は、Kafka ソースコネクターの一種で、送信元 Kafka クラスターから送信先クラスターにデータを複製します。Replicator に組み込まれたコンシューマーは、送信元クラスターからデータを読み取り、Kafka Connect ワーカーに組み込まれたプロデューサーは送信先クラスターにデータを書き込みます。

Replicator バージョン 4.0 以下では、送信元および送信先の Kafka クラスターで ZooKeeper への接続が必要です。ZooKeeper で認証を有効にする場合、クライアントで ZooKeeper のセキュリティ認証情報を、Connect ワーカーのグローバル JAAS 構成設定 -Djava.security.auth.login.config に構成します。さらに送信元と送信先のクラスターで ZooKeeper のセキュリティ認証情報が一致する必要があります。

Confluent Replicator のセキュリティを構成するには、次に示すとおり Replicator コネクターを構成し、さらに次のコンポーネントを構成する必要があります。

TLS を Confluent Replicator の組み込みコンシューマーに追加するには、Replicator の JSON プロパティファイルを変更します。

これは TLS の暗号化と認証用に追加する構成プロパティのサブセットの一例です。ここでは、クライアント認証がブローカーにより要求されていることが前提です。

{
  "name":"replicator",
    "config":{
      ....
      "src.kafka.ssl.truststore.location":"/etc/kafka/secrets/kafka.connect.truststore.jks",
      "src.kafka.ssl.truststore.password":"confluent",
      "src.kafka.ssl.keystore.location":"/etc/kafka/secrets/kafka.connect.keystore.jks",
      "src.kafka.ssl.keystore.password":"confluent",
      "src.kafka.ssl.key.password":"confluent",
      "src.kafka.security.protocol":"SSL"
      ....
    }
  }
}
Copy

参考

Confluent Replicator の構成例については、SSL の送信元認証デモのスクリプト を参照してください。共通のセキュリティ構成のデモについては、Replicator のセキュリティデモ を参照してください。

TLS 認証を使用して送信先クラスター用に Confluent Replicator を構成するには、Replicator の JSON 構成を次の内容が含まれるように変更します。

{
  "name":"replicator",
    "config":{
      ....
      "dest.kafka.ssl.truststore.location":"/etc/kafka/secrets/kafka.connect.truststore.jks",
      "dest.kafka.ssl.truststore.password":"confluent",
      "dest.kafka.ssl.keystore.location":"/etc/kafka/secrets/kafka.connect.keystore.jks",
      "dest.kafka.ssl.keystore.password":"confluent",
      "dest.kafka.ssl.key.password":"confluent",
      "dest.kafka.security.protocol":"SSL"
      ....
    }
  }
}
Copy

さらに次のプロパティが Connect ワーカーに必要です。

security.protocol=SSL
ssl.truststore.location=/etc/kafka/secrets/kafka.connect.truststore.jks
ssl.truststore.password=confluent
ssl.keystore.location=/etc/kafka/secrets/kafka.connect.keystore.jks
ssl.keystore.password=confluent
ssl.key.password=confluent
producer.security.protocol=SSL
producer.ssl.truststore.location=/etc/kafka/secrets/kafka.connect.truststore.jks
producer.ssl.truststore.password=confluent
producer.ssl.keystore.location=/etc/kafka/secrets/kafka.connect.keystore.jks
producer.ssl.keystore.password=confluent
producer.ssl.key.password=confluent
Copy

詳細については、Connect ワーカーの全般的なセキュリティ構成 を確認してください。

参考

Confluent Replicator の構成例については、SSL の送信先認証デモのスクリプト を参照してください。共通のセキュリティ構成のデモについては、Replicator のセキュリティデモ を参照してください。

Confluent Control Center

Confluent Control Center は Kafka Streams をステートストアとして使用します。つまり Control Center が支援するクラスターにあるすべての Kafka ブローカーがセキュアである場合、Control Center アプリケーションもセキュアにする必要があります。

注釈

RBAC が有効である場合、Control Center を Kerberos とともに使用できません。Control Center では、OAUTHBEARER 以外の SASL メカニズムをサポートできないためです。

次のセクションに示すとおり、Control Center アプリケーションのセキュリティを有効にします。さらに、以下のコンポーネントのセキュリティを構成します。

Confluent Control Center の TLS を etc/confluent-control-center/control-center.properties ファイルで有効にします。ここでは、クライアント認証がブローカーにより要求されていることが前提です。

confluent.controlcenter.streams.security.protocol=SSL
confluent.controlcenter.streams.ssl.truststore.location=/var/private/ssl/kafka.client.truststore.jks
confluent.controlcenter.streams.ssl.truststore.password=test1234
confluent.controlcenter.streams.ssl.keystore.location=/var/private/ssl/kafka.client.keystore.jks
confluent.controlcenter.streams.ssl.keystore.password=test1234
confluent.controlcenter.streams.ssl.key.password=test1234
Copy

Confluent Metrics Reporter

このセクションでは、Confluent Control Center と Auto Data Balancer で使用される Confluent Metrics Reporter の TLS 暗号化と認証を有効にする方法について説明します。

Confluent Metrics Reporter の TLS を有効にするには、モニタリング対象の Kafka クラスターのブローカーにある server.properties に次の内容を追加します。ここでは、クライアント認証がブローカーにより要求されていることが前提です。

confluent.metrics.reporter.bootstrap.servers=kafka1:9093
confluent.metrics.reporter.security.protocol=SSL
confluent.metrics.reporter.ssl.truststore.location=/var/private/ssl/kafka.client.truststore.jks
confluent.metrics.reporter.ssl.truststore.password=test1234
confluent.metrics.reporter.ssl.keystore.location=/var/private/ssl/kafka.client.keystore.jks
confluent.metrics.reporter.ssl.keystore.password=test1234
confluent.metrics.reporter.ssl.key.password=test1234
Copy

Confluent モニタリングインターセプター

Confluent Monitoring Interceptor を Confluent Control Center のストリームモニタリングで使用します。このセクションでは、Confluent Monitoring Interceptor 向けのセキュリティを次の 3 か所で有効にする方法について説明します。

  1. 一般クライアント
  2. Kafka Connect
  3. Confluent Replicator

重要

Confluent Monitoring Interceptor の典型的なユースケースは、構成が一般に異なる、個別のモニタリングクラスターにモニタリングデータを渡すことです。インターセプターの構成では、モニタリングされているコンポーネントの構成を継承しません。モニタリングされているコンポーネントの構成を使用する場合は、適切なプレフィックスを追加する必要があります。たとえば、オプション confluent.monitoring.interceptor.security.protocol=SSL では、プロデューサーに使用する場合、producer. のプレフィックスを使用して producer.confluent.monitoring.interceptor.security.protocol=SSL とする必要があります。

一般クライアント用のインターセプター

Kafka クライアントで動作する Confluent Control Center のストリームモニタリングについては、各クライアントで Confluent Monitoring Interceptor の TLS 暗号化と認証を構成する必要があります。

  1. クライアントでインターセプターが構成されていることを検証します。
  • プロデューサーの場合

    interceptor.classes=io.confluent.monitoring.clients.interceptor.MonitoringProducerInterceptor
    
    Copy
  • コンシューマーの場合

    interceptor.classes=io.confluent.monitoring.clients.interceptor.MonitoringConsumerInterceptor
    
    Copy

インターセプターに対して TLS 暗号化と認証を構成します。ここでは、クライアント認証がブローカーにより要求されていることが前提です。

confluent.monitoring.interceptor.bootstrap.servers=kafka1:9093
confluent.monitoring.interceptor.security.protocol=SSL
confluent.monitoring.interceptor.ssl.truststore.location=/var/private/ssl/kafka.client.truststore.jks
confluent.monitoring.interceptor.ssl.truststore.password=test1234
confluent.monitoring.interceptor.ssl.keystore.location=/var/private/ssl/kafka.client.keystore.jks
confluent.monitoring.interceptor.ssl.keystore.password=test1234
confluent.monitoring.interceptor.ssl.key.password=test1234
Copy

Kafka Connect 用のインターセプター

Confluent Control Center のストリームモニタリングと Kafka Connect を連動させる場合、Kafka Connect で Confluent Monitoring Interceptor に対して SSL を構成する必要があります。ここでは、クライアント認証がブローカーにより要求されていることが前提です。コネクターがソースかシンクかに応じて、次のプロパティを connect-distributed.properties に追加することで、Connect ワーカーを構成します。

  • ソースコネクター : producer プレフィックスを使用して、TLS 暗号化と認証用に Confluent Monitoring Interceptor を構成します。

    producer.interceptor.classes=io.confluent.monitoring.clients.interceptor.MonitoringProducerInterceptor
    producer.confluent.monitoring.interceptor.bootstrap.servers=kafka1:9093
    producer.confluent.monitoring.interceptor.security.protocol=SSL
    producer.confluent.monitoring.interceptor.ssl.truststore.location=/var/private/ssl/kafka.client.truststore.jks
    producer.confluent.monitoring.interceptor.ssl.truststore.password=test1234
    producer.confluent.monitoring.interceptor.ssl.keystore.location=/var/private/ssl/kafka.client.keystore.jks
    producer.confluent.monitoring.interceptor.ssl.keystore.password=test1234
    producer.confluent.monitoring.interceptor.ssl.key.password=test1234
    
    Copy
  • シンクコネクター: consumer プレフィックスを使用して、TLS 暗号化と認証用に Confluent Monitoring Interceptor を構成します。

    consumer.interceptor.classes=io.confluent.monitoring.clients.interceptor.MonitoringConsumerInterceptor
    consumer.confluent.monitoring.interceptor.bootstrap.servers=kafka1:9093
    consumer.confluent.monitoring.interceptor.security.protocol=SSL
    consumer.confluent.monitoring.interceptor.ssl.truststore.location=/var/private/ssl/kafka.client.truststore.jks
    consumer.confluent.monitoring.interceptor.ssl.truststore.password=test1234
    consumer.confluent.monitoring.interceptor.ssl.keystore.location=/var/private/ssl/kafka.client.keystore.jks
    consumer.confluent.monitoring.interceptor.ssl.keystore.password=test1234
    consumer.confluent.monitoring.interceptor.ssl.key.password=test1234
    
    Copy

Replicator 用のインターセプター

Replicator で動作する Confluent Control Center のストリームモニタリングについては、Replicator の JSON 構成ファイルで Confluent Monitoring Interceptor の TLS を構成する必要があります。次に、TLS の暗号化と認証用に追加する構成プロパティのサブセットの一例を示します。

{
  "name":"replicator",
    "config":{
      ....
      "src.consumer.group.id": "replicator",
      "src.consumer.interceptor.classes": "io.confluent.monitoring.clients.interceptor.MonitoringConsumerInterceptor",
      "src.consumer.confluent.monitoring.interceptor.security.protocol": "SSL",
      "src.consumer.confluent.monitoring.interceptor.bootstrap.servers": "kafka1:9093",
      "src.consumer.confluent.monitoring.interceptor.ssl.truststore.location": "/var/private/ssl/kafka.client.truststore.jks",
      "src.consumer.confluent.monitoring.interceptor.ssl.truststore.password": "test1234",
      "src.consumer.confluent.monitoring.interceptor.ssl.keystore.location": "/var/private/ssl/kafka.client.keystore.jks",
      "src.consumer.confluent.monitoring.interceptor.ssl.keystore.password": "test1234",
      "src.consumer.confluent.monitoring.interceptor.ssl.key.password": "test1234",
      ....
    }
  }
}
Copy

Self-Balancing クラスターでの TLS の有効化

Self-Balancing クラスターで TLS 暗号化を有効にするには、Kafka クラスターのブローカーにある server.properties ファイルに次の内容を追加します。

confluent.rebalancer.metrics.security.protocol=SSL
confluent.rebalancer.metrics.ssl.truststore.location=/etc/kafka/secrets/kafka.client.truststore.jks
confluent.rebalancer.metrics.ssl.truststore.password=confluent
confluent.rebalancer.metrics.ssl.keystore.location=/etc/kafka/secrets/kafka.client.keystore.jks
confluent.rebalancer.metrics.ssl.keystore.password=confluent
confluent.rebalancer.metrics.ssl.key.password=confluent
Copy

Schema Registry の数

Schema Registry はスキーマを永続化するために Kafka を使用します。つまり、Kafka クラスターにデータを書き込むためのクライアントとして機能します。したがって、Kafka ブローカーでセキュリティが構成されている場合、Schema Registry もセキュリティを使用するように構成する必要があります。網羅的なリストについては、「Schema Registry の構成オプション」も参照してください。

TLS の暗号化と認証用に追加する schema-registry.properties 構成パラメーターのサブセットの一例を次に示します。ここでは、クライアント認証がブローカーにより要求されていることが前提です。

kafkastore.bootstrap.servers=SSL://kafka1:9093
kafkastore.security.protocol=SSL
kafkastore.ssl.truststore.location=/var/private/ssl/kafka.client.truststore.jks
kafkastore.ssl.truststore.password=test1234
kafkastore.ssl.keystore.location=/var/private/ssl/kafka.client.keystore.jks
kafkastore.ssl.keystore.password=test1234
kafkastore.ssl.key.password=test1234
Copy

REST Proxy

Confluent REST Proxy を TLS 暗号化と認証でセキュアにするには、以下の間でセキュリティを構成する必要があります。

  1. REST クライアントと REST Proxy(HTTPS)
  2. REST Proxy と Kafka クラスター

網羅的なリストについては、REST Proxy の構成オプション も参照してください。

  1. HTTPS を REST クライアントと REST Proxy 間で構成します。HTTPS を構成するための kafka-rest.properties 構成パラメーターのサブセットの一例を次に示します。

    ssl.truststore.location=/var/private/ssl/kafka.client.truststore.jks
    ssl.truststore.password=test1234
    ssl.keystore.location=/var/private/ssl/kafka.client.keystore.jks
    ssl.keystore.password=test1234
    ssl.key.password=test1234
    
    Copy
  2. REST Proxy と Kafka クラスター間で TLS 暗号化と認証を構成します。TLS の暗号化と認証用に追加する kafka-rest.properties 構成パラメーターのサブセットの一例を次に示します。ここでは、クライアント認証がブローカーにより要求されていることが前提です。

    client.bootstrap.servers=kafka1:9093
    client.security.protocol=SSL
    client.ssl.truststore.location=/var/private/ssl/kafka.client.truststore.jks
    client.ssl.truststore.password=test1234
    client.ssl.keystore.location=/var/private/ssl/kafka.client.keystore.jks
    client.ssl.keystore.password=test1234
    client.ssl.key.password=test1234
    
    Copy

TLS/SSL のログ記録

Kafka ブローカーまたはクライアントあるいはその両方を、javax.net.debug システムプロパティを指定して起動することで、TLS/SSL デバッグログを JVM レベルで有効にします。以下に例を示します。

export KAFKA_OPTS=-Djavax.net.debug=all
bin/kafka-server-start etc/kafka/server.properties
Copy

ちなみに

これらの手順では、ZIP または TAR アーカイブを使用して Confluent Platform をインストールしているという想定に基づいています。詳細については「オンプレミスのデプロイ」を参照してください。

ブローカーを起動したら、server.log に次のような内容を確認できます。

with addresses: PLAINTEXT -> EndPoint(192.168.64.1,9092,PLAINTEXT),SSL -> EndPoint(192.168.64.1,9093,SSL)
Copy

サーバーのキーストアとトラストストアが正しくセットアップされていることを検証するには、次のコマンドを実行します。

openssl s_client -debug -connect localhost:9093 -tls1
Copy

注 : TLSv1 が ssl.enabled.protocols に指定されている必要があります。

このコマンドの出力で、サーバーの証明書を確認します。

-----BEGIN CERTIFICATE-----
{variable sized random bytes}
-----END CERTIFICATE-----
subject=/C=US/ST=CA/L=Santa Clara/O=org/OU=org/CN=Joe Smith
issuer=/C=US/ST=CA/L=Santa Clara/O=org/OU=org/CN=kafka/emailAddress=test@test.com
Copy

詳細については、Oracle ドキュメント の『 Debugging SSL/TLS connections 』を参照してください。

openssl コマンドで証明書が表示されない場合、または他のエラーメッセージがある場合、使用するキーや証明書が正しくセットアップされていません。構成を見直してください。