TLS/SSL での暗号化と認証

TLS/SSL の概要

TLS/SSL 認証では、サーバーがクライアントを認証します("双方向認証" とも呼ばれています)。TLS 認証では TLS 暗号化が必須となるため、このページでは両者を同時に構成する方法を示します。この方法は、 SSL の暗号化 だけに必要な構成のスーパーセットとなります。

デフォルトでは、Apache Kafka® は PLAINTEXT で通信します。つまりすべてのデータはプレーンテキストで送信されます。通信を暗号化するには、デプロイのすべでの Confluent Platform コンポーネントで TLS/SSL 暗号化を使用するように構成する必要があります。

Secure Sockets Layer(SSL)は Transport Layer Security(TLS)の元になったもので、2015 年 6 月に非推奨になりました。ただし、歴史的な理由で、Kafka では Java と同様に、構成やコードで "SSL" という頭字語を "TLS" の代わりに使用します。

暗号化に対して TLS/SSL を構成できますが、認証に対しても TLS/SSL を構成できます。TLS/SSL 暗号化(デフォルトでは、TLS/SSL 暗号化には、サーバーの証明書認証が含まれる)のみを構成し、クライアント認証には別のメカニズム(SSLSASL など)を個別に選択することもできます。技術的には、TLS/SSL の暗号化の時点で、1 方向の認証、つまりクライアントによるサーバー証明書の認証は、既に実行しています。このトピックにおいて、"TLS/SSL 認証" は双方向認証(ブローカーによるクライアント証明書の認証も実行される)を指しています。

TLS/SSL を有効にすると、暗号化オーバーヘッドのために、パフォーマンスに影響が出ることがあります。

TLS/SSL はプライベートキーと証明書のペアを使用します。これは TLS/SSL ハンドシェイクプロセスで使用されます。

  • 各ブローカーには個別のプライベートキーと証明書のペアが必要で、クライアントはこの証明書を使用して、対象のブローカーを認証します。
  • 各論理クライアントには、クライアント認証が有効な場合、プライベートキーと証明書のペアが必要で、ブローカーはこの証明書を使用して、対象クライアントを認証します。

各ブローカーと論理クライアントでトラストストアを構成できます。これは、信頼(認証)する対象の証明書(ブローカーまたは論理クライアントの ID)を判断するために使用されます。多くの方法でトラストストアを構成できます。次の 2 つの例を考えます。

  • トラストストアには 1 つまたは多数の証明書が含まれます。ブローカーまたは論理クライアントはこのトラストストアにリストされた証明書を信頼します。
  • トラストストアには認証機関(CA)が含まれます。ブローカーまたは論理クライアントは、トラストストアにある CA に署名された証明書を信頼します。

CA 方式を使用する方が便利です。新しいブローカーやクライアントを追加したときに、トラストストアの変更が必要ないためです。CA の方式の概要を、こちらの図 をご覧ください。

ただし、CA 方式では、このメカニズムで一度信頼した個別のブローカーやクライアントの認証を Kafka で簡単にブロックする方法をサポートしていません(証明書の失効は、一般に証明書失効リストや OCSP(オンライン証明書ステータスプロトコル)で実行します)。したがって、認可でアクセスをブロックする必要があります。

一方、1 つまたは多数の証明書を使用している場合、認証のブロックは、トラストストアからブローカーやクライアントの証明書を削除することにより、実行されます。

参考

この構成の動作例については、Confluent Platform デモ を参照してください。構成リファレンスについては、デモの docker-compose.yml ファイル を参照してください。

TLS/SSL キーと証明書の作成

詳細については、「セキュリティのチュートリアル」を参照してください。ここでは、SSL キーと証明書の作成 方法が説明されています。

ブローカー

Kafka クラスター内のすべてのブローカーを、クライアントからのセキュアな接続を受け付けるように構成します。ブローカーに構成の変更内容が反映されるためには、 ローリング再起動 が必要です。

次のセクションに示すとおり、Kafka ブローカーのセキュリティを有効にします。さらに、Confluent Control Center や Auto Data Balancer を使用している場合、次のコンポーネントに対してブローカーを構成します。

注釈

ブローカーの必須の構成プロパティと省略可能な構成プロパティの詳細については、「Kafka ブローカーの構成」を参照してください。

  1. トラストストア、キーストア、およびパスワードを、すべてのブローカーの server.properties ファイルに構成します。ブローカーの構成ファイルにパスワードが直接保管されるため、ファイルシステムのアクセス許可を使用して、これらのファイルへのアクセスを制限することが重要です。

    ssl.truststore.location=/var/private/ssl/kafka.server.truststore.jks
    ssl.truststore.password=test1234
    ssl.keystore.location=/var/private/ssl/kafka.server.keystore.jks
    ssl.keystore.password=test1234
    ssl.key.password=test1234
    

    ssl.truststore.password は技術的には省略可能ですが、使用することを強くお勧めします。パスワードが設定されていなくてもトラストストアにアクセスできますが、整合性チェックが無効になります。

  2. ブローカー間通信で、TLS/SSL を有効にする場合は、次の行をブローカーのプロパティファイルに追加します(デフォルトは PLAINTEXT)。

    security.inter.broker.protocol=SSL
    
  3. クライアントやブローカー間の SSL 接続をリッスンするために、Apache Kafka® ブローカーのポートを構成します。listeners を構成する必要があります。また、listeners と値が異なる場合は、必要に応じて advertised.listeners を構成する必要があります。

    listeners=SSL://kafka1:9093
    advertised.listeners=SSL://localhost:9093
    
  4. SSL ポートと PLAINTEXT ポートを次の条件で構成します。

    • TLS/SSL をブローカー間通信に対して有効にしない。
    • クラスターに接続する一部のクライアントで TLS/SSL を使用しない。
    listeners=PLAINTEXT://kafka1:9092,SSL://kafka1:9093
    advertised.listeners=PLAINTEXT://localhost:9092,SSL://localhost:9093
    

    advertised.host.nameadvertised.port は、単一の PLAINTEXT ポート用で、セキュアなプロトコルと互換性がないことに注意してください。代わりに advertised.listeners を使用します。

  5. ブローカーでクライアントの認証(双方向認証)を有効にするには、クライアント認証をすべてのブローカーで構成する必要があります。このとき、requested ではなく required を使用するように構成します。これは、構成が正しくないクライアントの接続が成功し、セキュリティが確保されていると誤って認識してしまう可能性があるためです。

    ssl.client.auth=required
    

    注釈

    SASL 認証メカニズムが対象のリスナーで有効になっている場合、ssl.client.auth=required を指定している場合でも、SSL クライアントの認証が無効になります。対象リスナーで SASL を使用する場合にのみ、ブローカーはクライアントを認証します。

オプション設定

ssl.endpoint.identification.algorithm
クライアントで使用してサーバーホスト名を検証するエンドポイント識別アルゴリズム。デフォルト値は https です。ブローカー間通信のためにブローカーによって行われるクライアント接続を含め、クライアントではブローカーのホスト名がその証明書のホスト名と一致することを検証します。サーバーホスト名の検証を無効にするには、ssl.endpoint.identification.algorithm を空の文字列に設定します。
  • 型: string
  • デフォルト: https
  • 重要度: 中
ssl.cipher.suites
暗号スイートは、認証、暗号化、MAC、およびキー交換アルゴリズムの名前付き組み合わせで、TLS/SSL ネットワークプロトコルを使用したネットワーク接続のセキュリティ設定をネゴシエートするために使用されます。
  • 型: リスト
  • デフォルト: null(デフォルトで、サポートするすべての暗号スイートが有効)
  • 重要度: 中
ssl.enabled.protocols
SSL 接続で有効なプロトコルのリスト。
  • 型: リスト
  • デフォルト: TLSv1.2,TLSv1.1,TLSv1
  • 重要度: 中
ssl.truststore.type
トラストストアファイルのファイルフォーマットです。
  • 型: string
  • デフォルト: JKS
  • 重要度: 中

一部の国における輸入規制のため、Oracle の実装では、デフォルトで暗号化アルゴリズムの強度が制限されています。強力なアルゴリズムが必要な場合(256 ビットキーの AES など)、JCE Unlimited Strength Jurisdiction ポリシーファイル を入手し、JDK/JRE にインストールする必要があります。詳細については、JCA プロバイダーのドキュメント を参照してください。

クライアント

新しいプロデューサーとコンシューマークライアントは、Kafka バージョン 0.9.0 以上でセキュリティをサポートします。

Kafka Streams API を使用している場合、同等の SSL パラメーターおよび SASL パラメーターを構成する方法を参照してください。

次の構成例では、クライアント認証がブローカーで要求されることを前提にしています。したがってクライアントプロパティファイル client-ssl.properties に認証情報を格納できます。ブローカーの構成ファイルにパスワードが直接保管されるため、ファイルシステムのアクセス許可を使用して、これらのファイルへのアクセスを制限することが重要です。

重要

以下の設定を Schema Registry または REST Proxy に対して構成する場合、各パラメーターのプレフィックスとして confluent.license を使用する必要があります。たとえば、security.protocolconfluent.license.security.protocol になります。

bootstrap.servers=kafka1:9093
security.protocol=SSL
ssl.truststore.location=/var/private/ssl/kafka.client.truststore.jks
ssl.truststore.password=test1234
ssl.keystore.location=/var/private/ssl/kafka.client.keystore.jks
ssl.keystore.password=test1234
ssl.key.password=test1234

ssl.truststore.password は技術的には省略可能ですが、使用することを強くお勧めします。パスワードが設定されていなくてもトラストストアにアクセスできますが、整合性チェックが無効になります。

次の例では、kafka-console-producerkafka-console-consumer を使用し、前に定義した client-ssl.properties を渡します。

bin/kafka-console-producer --broker-list kafka1:9093 --topic test --producer.config client-ssl.properties
bin/kafka-console-consumer --bootstrap-server kafka1:9093 --topic test --consumer.config client-ssl.properties --from-beginning

オプション設定

ssl.endpoint.identification.algorithm
クライアントで使用してサーバーホスト名を検証するエンドポイント識別アルゴリズム。デフォルト値は https です。ブローカー間通信のためにブローカーによって行われるクライアント接続を含め、クライアントではブローカーのホスト名がその証明書のホスト名と一致することを検証します。サーバーホスト名の検証を無効にするには、ssl.endpoint.identification.algorithm を空の文字列に設定します。
  • 型: string
  • デフォルト: https
  • 重要度: 中
ssl.provider
TLS/SSL 接続に使用するセキュリティプロバイダーの名前。デフォルト値は、Java 仮想マシンのデフォルトのセキュリティプロバイダーです。
  • 型: string
  • デフォルト: null
  • 重要度: 中
ssl.cipher.suites
暗号スイートは、認証、暗号化、MAC、およびキー交換アルゴリズムの名前付き組み合わせで、TLS/SSL ネットワークプロトコルを使用したネットワーク接続のセキュリティ設定をネゴシエートするために使用されます。
  • 型: リスト
  • デフォルト: null(デフォルトで、サポートするすべての暗号スイートが有効)
  • 重要度: 中
ssl.enabled.protocols
TLS/SSL 接続で有効なプロトコルのリストです。
  • 型: リスト
  • デフォルト: TLSv1.2,TLSv1.1,TLSv1
  • 重要度: 中
ssl.truststore.type
トラストストアファイルのファイルフォーマットです。
  • 型: string
  • デフォルト: JKS
  • 重要度: 中

ZooKeeper

Confluent Platform バージョン 5.5.0 以降、Kafka にバンドルされるバージョンの ZooKeeper で TLS/SSL をサポートしています。詳細については、「実行中クラスターへのセキュリティの追加」を参照してください。

Kafka Connect

このセクションでは、Kafka Connect のセキュリティを有効にする方法について説明します。Kafka Connect をセキュアにするには、次の対象でセキュリティを構成する必要があります。

  1. Kafka Connect ワーカー : Kafka Connect API の一部で、ワーカーは実際には内部的に機能する高度なクライアントです。
  2. Kafka Connect コネクター: コネクターにはプロデューサーとコンシューマーを組み込むことができます。そのため、ソースコネクターで使用される Connect プロデューサーやシンクコネクターで使用される Connect コンシューマーのデフォルトの構成をオーバーライドする必要があります。
  3. Kafka Connect REST: Kafka Connect では、 追加のプロパティ を使って SSL を使用するように構成できる REST API が公開されています。

次のセクションに示すとおり、Kafka Connect のセキュリティを構成します。さらに、Confluent Control Center のストリームモニタリングを Kafka Connect に対して使用している場合、次の対象でセキュリティを構成します。

次のプロパティを connect-distributed.properties に追加することで、Connect ワーカーが TLS/SSL を使用する最上位の設定を構成します。これらのトップレベルの設定は、Connect ワーカーでグループ調整のために使用されます。またこれらの設定を使用して、クラスターのステート(構成とオフセットなど)を追跡するために使われる内部トピックの読み取りと書き込みが実行されます。ここでは、クライアント認証がブローカーにより要求されていることが前提です。

bootstrap.servers=kafka1:9093
security.protocol=SSL
ssl.truststore.location=/var/private/ssl/kafka.client.truststore.jks
ssl.truststore.password=test1234
ssl.keystore.location=/var/private/ssl/kafka.client.keystore.jks
ssl.keystore.password=test1234
ssl.key.password=test1234

Connect ワーカーでは、ソースコネクターで使用されるプロデューサーおよびシンクコネクターで使用されるコンシューマーを管理します。したがって、セキュリティを活用するコネクターでは、ワーカーが使用するプロデューサーやコンシューマーのデフォルト構成をオーバーライドする必要もあります。ここでは、クライアント認証がブローカーにより要求されていることが前提です。

  • ソースコネクター : 同じプロパティに producer プレフィックスを追加して構成します。

    producer.bootstrap.servers=kafka1:9093
    producer.security.protocol=SSL
    producer.ssl.truststore.location=/var/private/ssl/kafka.client.truststore.jks
    producer.ssl.truststore.password=test1234
    producer.ssl.keystore.location=/var/private/ssl/kafka.client.keystore.jks
    producer.ssl.keystore.password=test1234
    producer.ssl.key.password=test1234
    
  • シンクコネクター : 同じプロパティに consumer プレフィックスを追加して構成します。

    consumer.bootstrap.servers=kafka1:9093
    consumer.security.protocol=SSL
    consumer.ssl.truststore.location=/var/private/ssl/kafka.client.truststore.jks
    consumer.ssl.truststore.password=test1234
    consumer.ssl.keystore.location=/var/private/ssl/kafka.client.keystore.jks
    consumer.ssl.keystore.password=test1234
    consumer.ssl.key.password=test1234
    

ちなみに

詳細については「Kafka Connect のセキュリティの基礎」を参照してください。

Confluent Replicator

Confluent Replicator は、Kafka ソースコネクターの一種で、送信元 Kafka クラスターから送信先クラスターにデータを複製します。Replicator に組み込まれたコンシューマーは、送信元クラスターからデータを読み取り、Kafka Connect ワーカーに組み込まれたプロデューサーは送信先クラスターにデータを書き込みます。

Replicator バージョン 4.0 以下では、送信元および送信先の Kafka クラスターで ZooKeeper への接続が必要です。ZooKeeper で認証を有効にする場合、クライアントで ZooKeeper のセキュリティ認証情報を、Connect ワーカーのグローバル JAAS 構成設定 -Djava.security.auth.login.config に構成します。さらに送信元と送信先のクラスターで ZooKeeper のセキュリティ認証情報が一致する必要があります。

Confluent Replicator のセキュリティを構成するには、次に示すとおり Replicator コネクターを構成し、さらに次のコンポーネントを構成する必要があります。

TLS/SSL を Confluent Replicator の組み込みコンシューマーに追加するには、Replicator の JSON プロパティファイルを変更します。

これは TLS/SSL の暗号化と認証用に追加する構成プロパティのサブセットの一例です。ここでは、クライアント認証がブローカーにより要求されていることが前提です。

{
  "name":"replicator",
    "config":{
      ....
      "src.kafka.ssl.truststore.location":"/etc/kafka/secrets/kafka.connect.truststore.jks",
      "src.kafka.ssl.truststore.password":"confluent",
      "src.kafka.ssl.keystore.location":"/etc/kafka/secrets/kafka.connect.keystore.jks",
      "src.kafka.ssl.keystore.password":"confluent",
      "src.kafka.ssl.key.password":"confluent",
      "src.kafka.security.protocol":"SSL"
      ....
    }
  }
}

参考

Confluent Replicator の構成例については、SSL の送信元認証デモのスクリプト を参照してください。共通のセキュリティ構成のデモについては、Replicator のセキュリティデモ を参照してください。

SSL 認証を使用して送信先クラスター用に Confluent Replicator を構成するには、Replicator の JSON 構成を次の内容が含まれるように変更します。

{
  "name":"replicator",
    "config":{
      ....
      "dest.kafka.ssl.truststore.location":"/etc/kafka/secrets/kafka.connect.truststore.jks",
      "dest.kafka.ssl.truststore.password":"confluent",
      "dest.kafka.ssl.keystore.location":"/etc/kafka/secrets/kafka.connect.keystore.jks",
      "dest.kafka.ssl.keystore.password":"confluent",
      "dest.kafka.ssl.key.password":"confluent",
      "dest.kafka.security.protocol":"SSL"
      ....
    }
  }
}

さらに次のプロパティが Connect ワーカーに必要です。

security.protocol=SSL
ssl.truststore.location=/etc/kafka/secrets/kafka.connect.truststore.jks
ssl.truststore.password=confluent
ssl.keystore.location=/etc/kafka/secrets/kafka.connect.keystore.jks
ssl.keystore.password=confluent
ssl.key.password=confluent
producer.security.protocol=SSL
producer.ssl.truststore.location=/etc/kafka/secrets/kafka.connect.truststore.jks
producer.ssl.truststore.password=confluent
producer.ssl.keystore.location=/etc/kafka/secrets/kafka.connect.keystore.jks
producer.ssl.keystore.password=confluent
producer.ssl.key.password=confluent

詳細については、Connect ワーカーの全般的なセキュリティ構成 を確認してください。

参考

Confluent Replicator の構成例については、SSL の送信先認証デモのスクリプト を参照してください。共通のセキュリティ構成のデモについては、Replicator のセキュリティデモ を参照してください。

Confluent Control Center

Confluent Control Center は Kafka Streams をステートストアとして使用します。つまり Control Center が支援するクラスターにあるすべての Kafka ブローカーがセキュアである場合、Control Center アプリケーションもセキュアにする必要があります。

注釈

RBAC が有効である場合、Control Center を Kerberos とともに使用できません。Control Center では、OAUTHBEARER 以外の SASL メカニズムをサポートできないためです。

次のセクションに示すとおり、Control Center アプリケーションのセキュリティを有効にします。さらに、以下のコンポーネントのセキュリティを構成します。

Confluent Control Center の TLS/SSL を etc/confluent-control-center/control-center.properties ファイルで有効にします。ここでは、クライアント認証がブローカーにより要求されていることが前提です。

confluent.controlcenter.streams.security.protocol=SSL
confluent.controlcenter.streams.ssl.truststore.location=/var/private/ssl/kafka.client.truststore.jks
confluent.controlcenter.streams.ssl.truststore.password=test1234
confluent.controlcenter.streams.ssl.keystore.location=/var/private/ssl/kafka.client.keystore.jks
confluent.controlcenter.streams.ssl.keystore.password=test1234
confluent.controlcenter.streams.ssl.key.password=test1234

Confluent Metrics Reporter

このセクションでは、Confluent Control Center と Auto Data Balancer で使用される Confluent Metrics Reporter の SSL 暗号化と認証を有効にする方法について説明します。

Confluent Metrics Reporter の TLS/SSL を有効にするには、モニタリング対象の Kafka クラスターのブローカーにある server.properties に次の内容を追加します。ここでは、クライアント認証がブローカーにより要求されていることが前提です。

confluent.metrics.reporter.bootstrap.servers=kafka1:9093
confluent.metrics.reporter.security.protocol=SSL
confluent.metrics.reporter.ssl.truststore.location=/var/private/ssl/kafka.client.truststore.jks
confluent.metrics.reporter.ssl.truststore.password=test1234
confluent.metrics.reporter.ssl.keystore.location=/var/private/ssl/kafka.client.keystore.jks
confluent.metrics.reporter.ssl.keystore.password=test1234
confluent.metrics.reporter.ssl.key.password=test1234

Confluent モニタリングインターセプター

Confluent Monitoring Interceptor を Confluent Control Center のストリームモニタリングで使用します。このセクションでは、Confluent Monitoring Interceptor 向けのセキュリティを次の 3 か所で有効にする方法について説明します。

  1. 一般クライアント
  2. Kafka Connect
  3. Confluent Replicator

重要

Confluent Monitoring Interceptor の典型的なユースケースは、構成が一般に異なる、個別のモニタリングクラスターにモニタリングデータを渡すことです。インターセプターの構成では、モニタリングされているコンポーネントの構成を継承しません。モニタリングされているコンポーネントの構成を使用する場合は、適切なプレフィックスを追加する必要があります。たとえば、オプション confluent.monitoring.interceptor.security.protocol=SSL では、プロデューサーに使用する場合、producer. のプレフィックスを使用して producer.confluent.monitoring.interceptor.security.protocol=SSL とする必要があります。

一般クライアント用のインターセプター

Confluent Control Center のストリームモニタリングを Kafka クライアントと連携させる場合、各クライアントで Confluent Monitoring Interceptor の TLS/SSL 暗号化と認証を構成する必要があります。

  1. クライアントでインターセプターが構成されていることを検証します。
  • プロデューサーの場合

    interceptor.classes=io.confluent.monitoring.clients.interceptor.MonitoringProducerInterceptor
    
  • コンシューマーの場合

    interceptor.classes=io.confluent.monitoring.clients.interceptor.MonitoringConsumerInterceptor
    

インターセプターに対して TLS/SSL 暗号化と認証を構成します。ここでは、クライアント認証がブローカーにより要求されていることが前提です。

confluent.monitoring.interceptor.bootstrap.servers=kafka1:9093
confluent.monitoring.interceptor.security.protocol=SSL
confluent.monitoring.interceptor.ssl.truststore.location=/var/private/ssl/kafka.client.truststore.jks
confluent.monitoring.interceptor.ssl.truststore.password=test1234
confluent.monitoring.interceptor.ssl.keystore.location=/var/private/ssl/kafka.client.keystore.jks
confluent.monitoring.interceptor.ssl.keystore.password=test1234
confluent.monitoring.interceptor.ssl.key.password=test1234

Kafka Connect 用のインターセプター

Confluent Control Center のストリームモニタリングと Kafka Connect を連動させる場合、Kafka Connect で Confluent Monitoring Interceptor に対して SSL を構成する必要があります。ここでは、クライアント認証がブローカーにより要求されていることが前提です。コネクターがソースかシンクかに応じて、次のプロパティを connect-distributed.properties に追加することで、Connect ワーカーを構成します。

  • ソースコネクター : producer プレフィックスを使用して、TLS/SSL 暗号化と認証用に Confluent Monitoring Interceptor を構成します。

    producer.interceptor.classes=io.confluent.monitoring.clients.interceptor.MonitoringProducerInterceptor
    producer.confluent.monitoring.interceptor.bootstrap.servers=kafka1:9093
    producer.confluent.monitoring.interceptor.security.protocol=SSL
    producer.confluent.monitoring.interceptor.ssl.truststore.location=/var/private/ssl/kafka.client.truststore.jks
    producer.confluent.monitoring.interceptor.ssl.truststore.password=test1234
    producer.confluent.monitoring.interceptor.ssl.keystore.location=/var/private/ssl/kafka.client.keystore.jks
    producer.confluent.monitoring.interceptor.ssl.keystore.password=test1234
    producer.confluent.monitoring.interceptor.ssl.key.password=test1234
    
  • シンクコネクター: consumer プレフィックスを使用して、TLS/SSL 暗号化と認証用に Confluent Monitoring Interceptor を構成します。

    consumer.interceptor.classes=io.confluent.monitoring.clients.interceptor.MonitoringConsumerInterceptor
    consumer.confluent.monitoring.interceptor.bootstrap.servers=kafka1:9093
    consumer.confluent.monitoring.interceptor.security.protocol=SSL
    consumer.confluent.monitoring.interceptor.ssl.truststore.location=/var/private/ssl/kafka.client.truststore.jks
    consumer.confluent.monitoring.interceptor.ssl.truststore.password=test1234
    consumer.confluent.monitoring.interceptor.ssl.keystore.location=/var/private/ssl/kafka.client.keystore.jks
    consumer.confluent.monitoring.interceptor.ssl.keystore.password=test1234
    consumer.confluent.monitoring.interceptor.ssl.key.password=test1234
    

Replicator 用のインターセプター

Confluent Control Center のストリームモニタリングを Replicator と連携させる場合、Replicator の JSON 構成ファイルで Confluent Monitoring Interceptor の TLS/SSL を構成する必要があります。次に、TLS/SSL の暗号化と認証用に追加する構成プロパティのサブセットの一例を示します。

{
  "name":"replicator",
    "config":{
      ....
      "src.consumer.group.id": "replicator",
      "src.consumer.interceptor.classes": "io.confluent.monitoring.clients.interceptor.MonitoringConsumerInterceptor",
      "src.consumer.confluent.monitoring.interceptor.security.protocol": "SSL",
      "src.consumer.confluent.monitoring.interceptor.bootstrap.servers": "kafka1:9093",
      "src.consumer.confluent.monitoring.interceptor.ssl.truststore.location": "/var/private/ssl/kafka.client.truststore.jks",
      "src.consumer.confluent.monitoring.interceptor.ssl.truststore.password": "test1234",
      "src.consumer.confluent.monitoring.interceptor.ssl.keystore.location": "/var/private/ssl/kafka.client.keystore.jks",
      "src.consumer.confluent.monitoring.interceptor.ssl.keystore.password": "test1234",
      "src.consumer.confluent.monitoring.interceptor.ssl.key.password": "test1234",
      ....
    }
  }
}

Self-Balancing クラスターでの TLS/SSL の有効化

Self-Balancing クラスターで TLS/SSL 暗号化を有効にするには、Kafka クラスターのブローカーにある server.properties ファイルに次の内容を追加します。

confluent.rebalancer.metrics.security.protocol=SSL
confluent.rebalancer.metrics.ssl.truststore.location=/etc/kafka/secrets/kafka.client.truststore.jks
confluent.rebalancer.metrics.ssl.truststore.password=confluent
confluent.rebalancer.metrics.ssl.keystore.location=/etc/kafka/secrets/kafka.client.keystore.jks
confluent.rebalancer.metrics.ssl.keystore.password=confluent
confluent.rebalancer.metrics.ssl.key.password=confluent

Schema Registry の数

Schema Registry はスキーマを永続化するために Kafka を使用します。つまり、Kafka クラスターにデータを書き込むためのクライアントとして機能します。したがって、Kafka ブローカーでセキュリティが構成されている場合、Schema Registry もセキュリティを使用するように構成する必要があります。網羅的なリストについては、「Schema Registry の構成オプション」も参照してください。

TLS/SSL の暗号化と認証用に追加する schema-registry.properties 構成パラメーターのサブセットの一例を次に示します。ここでは、クライアント認証がブローカーにより要求されていることが前提です。

kafkastore.bootstrap.servers=SSL://kafka1:9093
kafkastore.security.protocol=SSL
kafkastore.ssl.truststore.location=/var/private/ssl/kafka.client.truststore.jks
kafkastore.ssl.truststore.password=test1234
kafkastore.ssl.keystore.location=/var/private/ssl/kafka.client.keystore.jks
kafkastore.ssl.keystore.password=test1234
kafkastore.ssl.key.password=test1234

REST Proxy

Confluent REST Proxy を TLS/SSL 暗号化と認証でセキュアにするには、以下の間でセキュリティを構成する必要があります。

  1. REST クライアントと REST Proxy(HTTPS)
  2. REST Proxy と Kafka クラスター

網羅的なリストについては、REST Proxy の構成オプション も参照してください。

  1. HTTPS を REST クライアントと REST Proxy 間で構成します。HTTPS を構成するための kafka-rest.properties 構成パラメーターのサブセットの一例を次に示します。

    ssl.truststore.location=/var/private/ssl/kafka.client.truststore.jks
    ssl.truststore.password=test1234
    ssl.keystore.location=/var/private/ssl/kafka.client.keystore.jks
    ssl.keystore.password=test1234
    ssl.key.password=test1234
    
  2. REST Proxy と Kafka クラスター間で TLS/SSL 暗号化と認証を構成します。TLS/SSL の暗号化と認証用に追加する kafka-rest.properties 構成パラメーターのサブセットの一例を次に示します。ここでは、クライアント認証がブローカーにより要求されていることが前提です。

    client.bootstrap.servers=kafka1:9093
    client.security.protocol=SSL
    client.ssl.truststore.location=/var/private/ssl/kafka.client.truststore.jks
    client.ssl.truststore.password=test1234
    client.ssl.keystore.location=/var/private/ssl/kafka.client.keystore.jks
    client.ssl.keystore.password=test1234
    client.ssl.key.password=test1234
    

SSL のログ記録

Kafka ブローカーまたはクライアントあるいはその両方を、javax.net.debug システムプロパティを指定して起動することで、SSL デバッグログを JVM レベルで有効にします。以下に例を示します。

export KAFKA_OPTS=-Djavax.net.debug=all
bin/kafka-server-start etc/kafka/server.properties

ちなみに

これらの手順では、ZIP または TAR アーカイブを使用して Confluent Platform をインストールしているという想定に基づいています。詳細については「オンプレミスのデプロイ」を参照してください。

ブローカーを起動したら、server.log に次のような内容を確認できます。

with addresses: PLAINTEXT -> EndPoint(192.168.64.1,9092,PLAINTEXT),SSL -> EndPoint(192.168.64.1,9093,SSL)

サーバーのキーストアとトラストストアが正しくセットアップされていることを検証するには、次のコマンドを実行します。

openssl s_client -debug -connect localhost:9093 -tls1

注 : TLSv1 が ssl.enabled.protocols に指定されている必要があります。

このコマンドの出力で、サーバーの証明書を確認します。

-----BEGIN CERTIFICATE-----
{variable sized random bytes}
-----END CERTIFICATE-----
subject=/C=US/ST=CA/L=Santa Clara/O=org/OU=org/CN=Joe Smith
issuer=/C=US/ST=CA/L=Santa Clara/O=org/OU=org/CN=kafka/emailAddress=test@test.com

詳細については、Oracle ドキュメント の『 Debugging SSL/TLS connections 』を参照してください。

openssl コマンドで証明書が表示されない場合、または他のエラーメッセージがある場合、使用するキーや証明書が正しくセットアップされていません。構成を見直してください。