SSL による暗号化

SSL の概要

デフォルトでは、Apache Kafka® は PLAINTEXT で通信します。つまりすべてのデータはプレーンテキストで送信されます。通信を暗号化するには、デプロイのすべでの Confluent Platform コンポーネントで SSL 暗号化を使用するように構成する必要があります。

Secure Sockets Layer(SSL)は Transport Layer Security(TLS)の元になったもので、2015 年 6 月に非推奨になりました。ただし、歴史的な理由で、Kafka では Java と同様に、構成やコードで "SSL" という頭字語を "TLS" の代わりに使用します。このトピックでは頭字語 "SSL" だけを使用します。

暗号化または認証に対して SSL を構成できます。SSL 暗号化のみを構成できます(デフォルトでは、SSL 暗号化には、サーバーの証明書認証が含まれる)。ここでクライアント認証に別のメカニズム(たとえば、 SSLSASL)を独立して選択できます。技術的には、SSL の暗号化の時点で、1 方向の認証、つまりクライアントによるサーバー証明書の認証は、既に実行しています。このトピックで "SSL 認証" は、双方向認証、つまりブローカーによるクライアント証明書の認証も指しています。

SSL を有効にすると、暗号化オーバーヘッドのために、パフォーマンスに影響が出ることがあります。

SSL はプライベートキーと証明書のペアを使用します。これは SSL ハンドシェイクプロセスで使用されます。

  • 各ブローカーには個別のプライベートキーと証明書のペアが必要で、クライアントはこの証明書を使用して、対象のブローカーを認証します。
  • 各論理クライアントには、クライアント認証が有効な場合、プライベートキーと証明書のペアが必要で、ブローカーはこの証明書を使用して、対象クライアントを認証します。

各ブローカーと論理クライアントでトラストストアを構成できます。これは、信頼(認証)する対象の証明書(ブローカーまたは論理クライアントの ID)を判断するために使用されます。多くの方法でトラストストアを構成できます。次の 2 つの例を考えます。

  • トラストストアには 1 つまたは多数の証明書が含まれます。ブローカーまたは論理クライアントはこのトラストストアにリストされた証明書を信頼します。
  • トラストストアには認証機関(CA)が含まれます。ブローカーまたは論理クライアントは、トラストストアにある CA に署名された証明書を信頼します。

CA 方式を使用する方が便利です。新しいブローカーやクライアントを追加したときに、トラストストアの変更が必要ないためです。CA の方式の概要を、こちらの図 をご覧ください。

ただし、CA 方式では、このメカニズムで一度信頼した個別のブローカーやクライアントの認証を Kafka で簡単にブロックする方法をサポートしていません(証明書の失効は、一般に証明書失効リストや OCSP(オンライン証明書ステータスプロトコル)で実行します)。したがって、認可でアクセスをブロックする必要があります。

一方、1 つまたは多数の証明書を使用している場合、認証のブロックは、トラストストアからブローカーやクライアントの証明書を削除することにより、実行されます。

参考

この構成の動作例については、Confluent Platform デモ を参照してください。構成リファレンスについては、デモの docker-compose.yml ファイル を参照してください。

SSL キーと証明書の作成

方法については、「セキュリティチュートリアル」の SSL キーと証明書の作成 を参照してください。

ブローカー

Kafka クラスター内のすべてのブローカーを、クライアントからのセキュアな接続を受け付けるように構成します。ブローカーに構成の変更内容が反映されるためには、 ローリング再起動 が必要です。

次のセクションに示すとおり、Kafka ブローカーのセキュリティを有効にします。さらに、Confluent Control Center や Auto Data Balancer を使用している場合、次のコンポーネントに対してブローカーを構成します。

  1. パスワード、トラストストア、およびキーストアを、すべてのブローカーの server.properties ファイルに構成します。ブローカーの構成ファイルにパスワードが直接格納されるため、ファイルシステムのアクセス許可を使用して、これらのファイルへのアクセスを制限することが重要です。

    ssl.truststore.location=/var/ssl/private/kafka.server.truststore.jks
    ssl.truststore.password=test1234
    ssl.keystore.location=/var/ssl/private/kafka.server.keystore.jks
    ssl.keystore.password=test1234
    ssl.key.password=test1234
    
  2. ブローカー間のやりとりで、SSL を有効にする場合は、次の行をブローカーのプロパティファイルに追加します(デフォルトは PLAINTEXT です)。

    security.inter.broker.protocol=SSL
    
  3. クライアントとの、およびブローカー間の SSL 接続をリッスンするポートを Apache Kafka® ブローカーに通知します。listeners を構成する必要があります。また、listeners と値が異なる場合は、必要に応じて advertised.listeners を構成する必要があります。

    listeners=SSL://kafka1:9093
    advertised.listeners=SSL://<localhost>:9093
    
  4. SSL ポートと PLAINTEXT ポートを次の条件で構成します。

    • SSL をブローカー間のやりとりに対して有効にしない
    • クラスターに接続する一部のクライアントで SSL を使用しない
    listeners=PLAINTEXT://kafka1:9092,SSL://kafka1:9093
    advertised.listeners=PLAINTEXT://<localhost>:9092,SSL://<localhost>:9093
    

    advertised.host.nameadvertised.port は、単一の PLAINTEXT ポート用で、セキュアなプロトコルと互換性がないことに注意してください。代わりに advertised.listeners を使用します。

オプション設定

いくつかのオプション設定を示します。

ssl.cipher.suites 暗号スイートは、認証、暗号化、MAC、およびキー交換アルゴリズムの名前付き組み合わせで、TLS または SSL ネットワークプロトコルを使用したネットワーク接続のセキュリティ設定をネゴシエートするために使用されます。

  • 型: list
  • デフォルト: null(デフォルトで、サポートするすべての暗号スイートが有効)
  • 重要度: 中

ssl.enabled.protocols SSL 接続で有効なプロトコルのリストです。

  • 型: list
  • デフォルト: TLSv1.2,TLSv1.1,TLSv1
  • 重要度: 中

ssl.truststore.type トラストストアファイルのファイルフォーマットです。

  • 型: string
  • デフォルト: JKS
  • 重要度: 中

一部の国における輸入規制のため、Oracle の実装では、デフォルトで暗号化アルゴリズムの強度が制限されています。強力なアルゴリズムが必要な場合(256 ビットキーの AES など)、JCE Unlimited Strength Jurisdiction ポリシーファイル を入手し、JDK/JRE にインストールする必要があります。詳細については、JCA プロバイダーのドキュメント を参照してください。

クライアント

新しいプロデューサーとコンシューマークライアントは、Kafka バージョン 0.9.0 以上でセキュリティをサポートします。

Kafka Streams API を使用している場合、同等の SSL パラメーターおよび SASL パラメーターを構成する方法を参照してください。

クライアント認証がブローカーで要求されない場合、クライアントプロパティファイル client-ssl.properties に認証情報を格納できる最小の構成例を次に示します。クライアントの構成ファイルにパスワードが直接格納されるため、ファイルシステムのアクセス許可を使用して、これらのファイルへのアクセスを制限することが重要です。

bootstrap.servers=kafka1:9093
security.protocol=SSL
ssl.truststore.location=/var/ssl/private/kafka.client.truststore.jks
ssl.truststore.password=test1234

SSL のクライアント認証が要求される場合、クライアントでキーストアを用意することが必要です。必要な追加の構成については、 SSL 認証 を参照してください。

kafka-console-producerkafka-console-consumer を使用する例では、前に定義したプロパティを含む client-ssl.properties を渡します。

bin/kafka-console-producer --broker-list kafka1:9093 --topic test --producer.config client-ssl.properties
bin/kafka-console-consumer --bootstrap-server kafka1:9093 --topic test --consumer.config client-ssl.properties --from-beginning

オプション設定

いくつかのオプション設定を示します。

ssl.provider SSL 接続に使用するセキュリティプロバイダーの名前です。デフォルト値は、JVM のデフォルトのセキュリティプロバイダーです。

  • 型: string
  • デフォルト: null
  • 重要度: 中

ssl.cipher.suites 暗号スイートは、認証、暗号化、MAC、およびキー交換アルゴリズムの名前付き組み合わせで、TLS または SSL ネットワークプロトコルを使用したネットワーク接続のセキュリティ設定をネゴシエートするために使用されます。

  • 型: list
  • デフォルト: null(デフォルトで、サポートするすべての暗号スイートが有効)
  • 重要度: 中

ssl.enabled.protocols SSL 接続で有効なプロトコルのリストです。

  • 型: list
  • デフォルト: TLSv1.2,TLSv1.1,TLSv1
  • 重要度: 中

ssl.truststore.type トラストストアファイルのファイルフォーマットです。

  • 型: string
  • デフォルト: JKS
  • 重要度: 中

ZooKeeper

Confluent Platform バージョン 5.5.0 以上で、Kafka にバンドルされるバージョンの ZooKeeper で、TLS をサポートします。詳細については、「実行中クラスターへのセキュリティの追加」を参照してください。

Kafka Connect

このセクションでは、Kafka Connect のセキュリティを有効にする方法について説明します。Kafka Connect をセキュアにするには、次の対象でセキュリティを構成する必要があります。

  1. Kafka Connect ワーカー : Kafka Connect API の一部で、ワーカーは実際には内部的に機能する高度なクライアントです。
  2. Kafka Connect コネクター: コネクターにはプロデューサーとコンシューマーを組み込むことができます。そのため、ソースコネクターで使用される Connect プロデューサーやシンクコネクターで使用される Connect コンシューマーのデフォルトの構成をオーバーライドする必要があります。
  3. Kafka Connect REST: Kafka Connect では、 追加のプロパティ を使って SSL を使用するように構成できる REST API が公開されています。

次のセクションに示すとおり、Kafka Connect のセキュリティを構成します。さらに、Confluent Control Center のストリームモニタリングを Kafka Connect に対して使用している場合、次の対象でセキュリティを構成します。

次のプロパティを connect-distributed.properties に追加することで、Connect ワーカーが SSL を使用するトップレベルの設定を構成します。これらのトップレベルの設定は、Connect ワーカーでグループ調整のために使用され、クラスターのステート(構成とオフセットなど)を追跡するために使用される内部トピックへの読み書きに使用されます。

bootstrap.servers=kafka1:9093
security.protocol=SSL
ssl.truststore.location=/var/ssl/private/kafka.client.truststore.jks
ssl.truststore.password=test1234

Connect ワーカーでは、ソースコネクターで使用されるプロデューサーおよびシンクコネクターで使用されるコンシューマーを管理します。つまり、セキュリティを活用するコネクターでは、ワーカーが使用するプロデューサーやコンシューマーのデフォルト構成をオーバーライドする必要もあります。ソースコネクターかシンクコネクターに応じて次のとおり実行します。

  • ソースコネクター : 同じプロパティに producer プレフィックスを追加して構成します。

    producer.bootstrap.servers=kafka1:9093
    producer.security.protocol=SSL
    producer.ssl.truststore.location=/var/ssl/private/kafka.client.truststore.jks
    producer.ssl.truststore.password=test1234
    
  • シンクコネクター : 同じプロパティに consumer プレフィックスを追加して構成します。

    consumer.bootstrap.servers=kafka1:9093
    consumer.security.protocol=SSL
    consumer.ssl.truststore.location=/var/ssl/private/kafka.client.truststore.jks
    consumer.ssl.truststore.password=test1234
    
  • SSL と REST API を使用する場合、次の追加プロパティがあります。
プロパティ 注意事項
listeners
protocol://host:port,protocol2://host2:port 形式の
REST リスナーのリストです。
ここでプロトコルは、http または https のいずれかです。このプロパティが
定義されるとき、プロパティ rest.host.namerest.port は無視されます。
rest.advertised.listener
ワーカー間の通信に使用されるリスナーを構成します。
指定可能な値は、http または https のいずれかです。
listeners プロパティが定義されていない場合、またはプロパティに http
リスナーが含まれている場合、このフィールドのデフォルト値は http となります。listeners
プロパティが定義されていて、https リスナーのみが含まれている場合、デフォルト値は
https となります。
ssl.client.auth
有効な値は nonerequested および required です。次の条件を制御します。
1.クライアントで SSL/TLS クライアント認証が要求される(required
2.SSL/TLS クライアント認証の省略を決定できる(requested
3.SSL/TLS クライアント認証を無効にする(none
listeners.https.ssl.*
listeners.https. プレフィックスと SSL 構成パラメーターを使用して
デフォルトの SSL 構成をオーバーライドできます。これは
Kafka ブローカーへの接続で共有される構成です。このプレフィックスが付いたパラメーターが 1 つでもある場合、
この実装では、プレフィックス付きの SSL パラメーターのみを使用し、
このプレフィックスがない、すべての SSL パラメーターを無視します。プレフィックス listeners.https. のパラメーターが存在しない場合、
プレフィックスのないパラメーターが使用されます。

listeners.https.ssl.* プロパティが定義されていない場合、ssl.* プロパティが使用されることに注意してください。REST API の ssl.* プロパティのリストについては、「REST Proxy の構成オプション」を参照してください。

次に ssl.* プロパティを、ブローカーへの SSL 接続を使用するために設定する例を示します。listenershttps が含まれるので、これらの同じ設定は、Connect の SSL エンドポイントを構成するために使用されます。

listeners=https://myhost:8443
rest.advertised.listener=https
rest.advertised.host.name=<localhost>
rest.advertised.host.port=8083
ssl.client.auth=requested
ssl.truststore.location=/var/ssl/private/kafka.server.truststore.jks
ssl.truststore.password=test1234
ssl.keystore.location=/var/ssl/private/kafka.server.keystore.jks
ssl.keystore.password=test1234
ssl.key.password=test1234

Connect の SSL エンドポイントをブローカーへの SSL 接続と異なるように構成するには、単に listeners.https.ssl.* プロパティを適切な設定で定義します。注意が必要なのは、listeners.https.ssl.* プロパティが指定されると、トップレベルの ssl.* プロパティはいずれも適用されないことです。つまり必要な listeners.https.ssl.* プロパティをすべて必ず定義します。

listeners=https://myhost:8443
rest.advertised.listener=https
rest.advertised.host.name=<localhost>
rest.advertised.host.port=8083
listeners.https.ssl.client.auth=requested
listeners.https.ssl.truststore.location=/var/ssl/private/kafka.server.truststore.jks
listeners.https.ssl.truststore.password=test1234
listeners.https.ssl.keystore.location=/var/ssl/private/kafka.server.keystore.jks
listeners.https.ssl.keystore.password=test1234
listeners.https.ssl.key.password=test1234

Confluent Replicator

Confluent Replicator は、Kafka ソースコネクターの一種で、送信元 Kafka クラスターから送信先クラスターにデータを複製します。Replicator に組み込まれたコンシューマーは、送信元クラスターからデータを読み取り、Kafka Connect ワーカーに組み込まれたプロデューサーは送信先クラスターにデータを書き込みます。

Replicator バージョン 4.0 以下では、送信元および送信先の Kafka クラスターで ZooKeeper への接続が必要です。ZooKeeper で認証を有効にする場合、クライアントで ZooKeeper のセキュリティ認証情報を、Connect ワーカーのグローバル JAAS 構成設定 -Djava.security.auth.login.config に構成します。さらに送信元と送信先のクラスターで ZooKeeper のセキュリティ認証情報が一致する必要があります。

Confluent Replicator のセキュリティを構成するには、次に示すとおり Replicator コネクターを構成し、さらに次のコンポーネントを構成する必要があります。

SSL を Confluent Replicator の組み込みコンシューマーに追加するには、Replicator の JSON プロパティファイルを変更します。

次に、SSL 暗号化に対して追加する構成プロパティのサブセットの一例を示します。

{
  "name":"replicator",
    "config":{
      ....
      "src.kafka.ssl.truststore.location":"/etc/kafka/secrets/kafka.connect.truststore.jks",
      "src.kafka.ssl.truststore.password":"confluent",
      "src.kafka.security.protocol":"SSL"
      ....
    }
  }
}

参考

Confluent Replicator の構成例については、SSL のソースデモのスクリプト を参照してください。共通のセキュリティ構成のデモについては、Replicator のセキュリティデモ を参照してください。

送信先クラスターの SSL 暗号化を Confluent Replicator で構成するには、Replicator の JSON 構成を次の内容が含まれるように変更します。

{
  "name":"replicator",
    "config":{
      ....
      "dest.kafka.ssl.truststore.location":"/etc/kafka/secrets/kafka.connect.truststore.jks",
      "dest.kafka.ssl.truststore.password":"confluent",
      "dest.kafka.security.protocol":"SSL"
      ....
    }
  }
}

さらに次のプロパティが Connect ワーカーに必要です。

security.protocol=SSL
ssl.truststore.location=/etc/kafka/secrets/kafka.connect.truststore.jks
ssl.truststore.password=confluent
producer.security.protocol=SSL
producer.ssl.truststore.location=/etc/kafka/secrets/kafka.connect.truststore.jks
producer.ssl.truststore.password=confluent

詳細については、Connect ワーカーの全般セキュリティ構成 を参照してください。

参考

Confluent Replicator の構成例については、SSL の送信先スクリプト を参照してください。共通のセキュリティ構成のデモについては、Replicator のセキュリティデモ を参照してください。

Confluent Control Center

Confluent Control Center は Kafka Streams をステートストアとして使用します。つまり Control Center が支援するクラスターにあるすべての Kafka ブローカーがセキュアである場合、Control Center アプリケーションもセキュアにする必要があります。

注釈

RBAC が有効である場合、Control Center を Kerberos とともに使用できません。Control Center では、OAUTHBEARER 以外の SASL メカニズムをサポートできないためです。

次のセクションに示すとおり、Control Center アプリケーションのセキュリティを有効にします。さらに、以下のコンポーネントのセキュリティを構成します。

Control Center に対して SSL を etc/confluent-control-center/control-center.properties ファイルで有効にします。

confluent.controlcenter.streams.security.protocol=SSL
confluent.controlcenter.streams.ssl.truststore.location=/var/ssl/private/kafka.client.truststore.jks
confluent.controlcenter.streams.ssl.truststore.password=test1234

Confluent Metrics Reporter

このセクションでは、Confluent Control Center と Auto Data Balancer で使用するために、Confluent Metrics Reporter で SSL 暗号化 を有効にする方法について説明します。

Confluent Metrics Reporter の SSL を有効にするには、モニタリング対象の Kafka クラスターのブローカーにある server.properties ファイルに次の内容を追加します。

confluent.metrics.reporter.bootstrap.servers=kafka1:9093
confluent.metrics.reporter.security.protocol=SSL
confluent.metrics.reporter.ssl.truststore.location=/var/ssl/private/kafka.server.truststore.jks
confluent.metrics.reporter.ssl.truststore.password=test1234

Confluent Monitoring Interceptor

Confluent Monitoring Interceptor を Confluent Control Center のストリームモニタリングで使用します。このセクションでは、Confluent Monitoring Interceptor 向けのセキュリティを次の 3 か所で有効にする方法について説明します。

  1. 一般クライアント
  2. Kafka Connect
  3. Confluent Replicator

重要

Confluent Monitoring Interceptor の典型的なユースケースは、構成が一般に異なる、個別のモニタリングクラスターにモニタリングデータを渡すことです。インターセプターの構成では、モニタリングされているコンポーネントの構成を継承しません。モニタリングされているコンポーネントの構成を使用する場合は、適切なプレフィックスを追加する必要があります。たとえば、オプション confluent.monitoring.interceptor.security.protocol=SSL では、プロデューサーに使用する場合、producer. のプレフィックスを使用して producer.confluent.monitoring.interceptor.security.protocol=SSL とする必要があります。

一般クライアント用のインターセプター

Confluent Control Center のストリームモニタリングと Kafka クライアントを連動させる場合、各クライアントで Confluent Monitoring Interceptor に対して SSL 暗号化を構成する必要があります。

  1. クライアントでインターセプターが構成されていることを検証します。
  • プロデューサーの場合

    interceptor.classes=io.confluent.monitoring.clients.interceptor.MonitoringProducerInterceptor
    
  • コンシューマーの場合

    interceptor.classes=io.confluent.monitoring.clients.interceptor.MonitoringConsumerInterceptor
    
  1. インターセプターの SSL 暗号化を構成します。

    confluent.monitoring.interceptor.bootstrap.servers=kafka1:9093
    confluent.monitoring.interceptor.security.protocol=SSL
    confluent.monitoring.interceptor.ssl.truststore.location=/var/ssl/private/kafka.server.truststore.jks
    confluent.monitoring.interceptor.ssl.truststore.password=test1234
    

Kafka Connect 用のインターセプター

Confluent Control Center のストリームモニタリングと Kafka Connect を連動させる場合、Kafka Connect で Confluent Monitoring Interceptor に対して SSL を構成する必要があります。コネクターがソースかシンクかに応じて、次のプロパティを connect-distributed.properties に追加することで、Connect ワーカーを構成します。

  • ソースコネクター : Confluent Monitoring Interceptor で producer プレフィックスを使用して、SSL 暗号化を構成します。

    producer.interceptor.classes=io.confluent.monitoring.clients.interceptor.MonitoringProducerInterceptor
    producer.confluent.monitoring.interceptor.bootstrap.servers=kafka1:9093
    producer.confluent.monitoring.interceptor.security.protocol=SSL
    producer.confluent.monitoring.interceptor.ssl.truststore.location=/var/ssl/private/kafka.server.truststore.jks
    producer.confluent.monitoring.interceptor.ssl.truststore.password=test1234
    
  • シンクコネクター: Confluent Monitoring Interceptor で consumer プレフィックスを使用して、SSL 暗号化を構成します。

    consumer.interceptor.classes=io.confluent.monitoring.clients.interceptor.MonitoringConsumerInterceptor
    consumer.confluent.monitoring.interceptor.bootstrap.servers=kafka1:9093
    consumer.confluent.monitoring.interceptor.security.protocol=SSL
    consumer.confluent.monitoring.interceptor.ssl.truststore.location=/var/ssl/private/kafka.server.truststore.jks
    consumer.confluent.monitoring.interceptor.ssl.truststore.password=test1234
    

Replicator 用のインターセプター

Confluent Control Center のストリームモニタリングと Replicator を連動させる場合、Replicator の JSON 構成ファイルで Confluent Monitoring Interceptor に対して SSL を構成する必要があります。次に、SSL 暗号化に対して追加する構成プロパティのサブセットの一例を示します。

{
  "name":"replicator",
    "config":{
      ....
      "src.consumer.group.id": "replicator",
      "src.consumer.interceptor.classes": "io.confluent.monitoring.clients.interceptor.MonitoringConsumerInterceptor",
      "src.consumer.confluent.monitoring.interceptor.bootstrap.servers": "kafka1:9093",
      "src.consumer.confluent.monitoring.interceptor.security.protocol": "SSL",
      "src.consumer.confluent.monitoring.interceptor.ssl.truststore.location": "/var/ssl/private/kafka.client.truststore.jks",
      "src.consumer.confluent.monitoring.interceptor.ssl.truststore.password": "test1234",
      ....
    }
  }
}

Schema Registry

Schema Registry はスキーマを永続化するために Kafka を使用します。つまり、Kafka クラスターにデータを書き込むためのクライアントとして機能します。したがって、Kafka ブローカーでセキュリティが構成されている場合、Schema Registry もセキュリティを使用するように構成する必要があります。網羅的なリストについては、「Schema Registry の構成オプション」も参照してください。

SSL 暗号化に対して追加する schema-registry.properties 構成パラメーターのサブセットの一例を次に示します。

kafkastore.bootstrap.servers=SSL://kafka1:9093
kafkastore.security.protocol=SSL
kafkastore.ssl.truststore.location=/var/ssl/private/kafka.server.truststore.jks
kafkastore.ssl.truststore.password=test1234

REST Proxy

Confluent REST Proxy を SSL 暗号化でセキュアにするには、次の対象間でセキュリティを構成する必要があります。

  1. REST クライアントと REST Proxy(HTTPS)
  2. REST Proxy と Kafka クラスター

網羅的なリストについては、REST Proxy の構成オプション も参照してください。

まず HTTPS を REST クライアントと REST Proxy 間で構成します。HTTPS を構成するための kafka-rest.properties 構成パラメーターのサブセットの一例を次に示します。

ssl.truststore.location=/var/ssl/private/kafka.server.truststore.jks
ssl.truststore.password=test1234

次に REST Proxy と Kafka クラスター間で SSL 暗号化を構成します。SSL 暗号化に対して追加する kafka-rest.properties 構成プロパティのサブセットの一例を次に示します。

client.bootstrap.servers=kafka1:9093
client.security.protocol=SSL
client.ssl.truststore.location=/var/ssl/private/kafka.server.truststore.jks
client.ssl.truststore.password=test1234

SSL のログ記録

Kafka ブローカーまたはクライアントあるいはその両方を、javax.net.debug システムプロパティを指定して起動することで、SSL デバッグログを JVM レベルで有効にします。以下に例を示します。

export KAFKA_OPTS=-Djavax.net.debug=all
bin/kafka-server-start etc/kafka/server.properties

ちなみに

これらの手順では、ZIP または TAR アーカイブを使用して Confluent Platform をインストールしているという想定に基づいています。詳細については「オンプレミスのデプロイ」を参照してください。

ブローカーを起動したら、server.log に次のような内容を確認できます。

with addresses: PLAINTEXT -> EndPoint(192.168.64.1,9092,PLAINTEXT),SSL -> EndPoint(192.168.64.1,9093,SSL)

サーバーのキーストアとトラストストアが正しくセットアップされていることを検証するには、次のコマンドを実行します。

openssl s_client -debug -connect localhost:9093 -tls1

注 : TLSv1 が ssl.enabled.protocols に指定されている必要があります。

このコマンドの出力で、サーバーの証明書を確認します。

-----BEGIN CERTIFICATE-----
{variable sized random bytes}
-----END CERTIFICATE-----
subject=/C=US/ST=CA/L=Santa Clara/O=org/OU=org/CN=Joe Smith
issuer=/C=US/ST=CA/L=Santa Clara/O=org/OU=org/CN=kafka/emailAddress=test@test.com

詳細については、Oracle ドキュメント の『 Debugging SSL/TLS connections 』を参照してください。

openssl コマンドで証明書が表示されない場合、または他のエラーメッセージがある場合、使用するキーや証明書が正しくセットアップされていません。構成を見直してください。