SSL での暗号化と認証¶
SSL の概要¶
SSL 認証では、サーバーがクライアントを認証します("双方向認証" とも呼ばれています)。SSL 認証では SSL 暗号化が必須となるため、このページでは両者を同時に構成する方法を示します。この方法は、 SSL の暗号化 だけに必要な構成のスーパーセットとなります。
デフォルトでは、Apache Kafka® は PLAINTEXT
で通信します。つまりすべてのデータはプレーンテキストで送信されます。通信を暗号化するには、デプロイのすべでの Confluent Platform コンポーネントで SSL 暗号化を使用するように構成する必要があります。
Secure Sockets Layer(SSL)は Transport Layer Security(TLS)の元になったもので、2015 年 6 月に非推奨になりました。ただし、歴史的な理由で、Kafka では Java と同様に、構成やコードで "SSL" という頭字語を "TLS" の代わりに使用します。このトピックでは頭字語 "SSL" だけを使用します。
暗号化または認証に対して SSL を構成できます。SSL 暗号化のみを構成できます(デフォルトでは、SSL 暗号化には、サーバーの証明書認証が含まれる)。ここでクライアント認証に別のメカニズム(たとえば、 SSL、 SASL)を独立して選択できます。技術的には、SSL の暗号化の時点で、1 方向の認証、つまりクライアントによるサーバー証明書の認証は、既に実行しています。このトピックで "SSL 認証" は、双方向認証、つまりブローカーによるクライアント証明書の認証も指しています。
SSL を有効にすると、暗号化オーバーヘッドのために、パフォーマンスに影響が出ることがあります。
SSL はプライベートキーと証明書のペアを使用します。これは SSL ハンドシェイクプロセスで使用されます。
- 各ブローカーには個別のプライベートキーと証明書のペアが必要で、クライアントはこの証明書を使用して、対象のブローカーを認証します。
- 各論理クライアントには、クライアント認証が有効な場合、プライベートキーと証明書のペアが必要で、ブローカーはこの証明書を使用して、対象クライアントを認証します。
各ブローカーと論理クライアントでトラストストアを構成できます。これは、信頼(認証)する対象の証明書(ブローカーまたは論理クライアントの ID)を判断するために使用されます。多くの方法でトラストストアを構成できます。次の 2 つの例を考えます。
- トラストストアには 1 つまたは多数の証明書が含まれます。ブローカーまたは論理クライアントはこのトラストストアにリストされた証明書を信頼します。
- トラストストアには認証機関(CA)が含まれます。ブローカーまたは論理クライアントは、トラストストアにある CA に署名された証明書を信頼します。
CA 方式を使用する方が便利です。新しいブローカーやクライアントを追加したときに、トラストストアの変更が必要ないためです。CA の方式の概要を、こちらの図 をご覧ください。
ただし、CA 方式では、このメカニズムで一度信頼した個別のブローカーやクライアントの認証を Kafka で簡単にブロックする方法をサポートしていません(証明書の失効は、一般に証明書失効リストや OCSP(オンライン証明書ステータスプロトコル)で実行します)。したがって、認可でアクセスをブロックする必要があります。
一方、1 つまたは多数の証明書を使用している場合、認証のブロックは、トラストストアからブローカーやクライアントの証明書を削除することにより、実行されます。
参考
この構成の動作例については、Confluent Platform デモ を参照してください。構成リファレンスについては、デモの docker-compose.yml ファイル を参照してください。
SSL キーと証明書の作成¶
詳細については、「セキュリティのチュートリアル」を参照してください。ここでは、SSL キーと証明書の作成 方法が説明されています。
ブローカー¶
Kafka クラスター内のすべてのブローカーを、クライアントからのセキュアな接続を受け付けるように構成します。ブローカーに構成の変更内容が反映されるためには、 ローリング再起動 が必要です。
次のセクションに示すとおり、Kafka ブローカーのセキュリティを有効にします。さらに、Confluent Control Center や Auto Data Balancer を使用している場合、次のコンポーネントに対してブローカーを構成します。
トラストストア、キーストア、およびパスワードを、すべてのブローカーの
server.properties
ファイルに構成します。ブローカーの構成ファイルにパスワードが直接保管されるため、ファイルシステムのアクセス許可を使用して、これらのファイルへのアクセスを制限することが重要です。ssl.truststore.location=/var/private/ssl/kafka.server.truststore.jks ssl.truststore.password=test1234 ssl.keystore.location=/var/private/ssl/kafka.server.keystore.jks ssl.keystore.password=test1234 ssl.key.password=test1234
ssl.truststore.password
は技術的には省略可能ですが、使用することを強くお勧めします。パスワードが設定されていなくてもトラストストアにアクセスできますが、整合性チェックが無効になります。ブローカー間通信で、SSL を有効にする場合は、次の行をブローカーのプロパティファイルに追加します(デフォルトは
PLAINTEXT
)。security.inter.broker.protocol=SSL
クライアントやブローカー間の
SSL
接続をリッスンするために、Apache Kafka® ブローカーのポートを構成します。listeners
を構成する必要があります。また、listeners
と値が異なる場合は、必要に応じてadvertised.listeners
を構成する必要があります。listeners=SSL://kafka1:9093 advertised.listeners=SSL://localhost:9093
SSL
ポートとPLAINTEXT
ポートを次の条件で構成します。- SSL をブローカー間のやりとりに対して有効にしない
- クラスターに接続する一部のクライアントで SSL を使用しない
listeners=PLAINTEXT://kafka1:9092,SSL://kafka1:9093 advertised.listeners=PLAINTEXT://localhost:9092,SSL://localhost:9093
advertised.host.name
とadvertised.port
は、単一のPLAINTEXT
ポート用で、セキュアなプロトコルと互換性がないことに注意してください。代わりにadvertised.listeners
を使用します。ブローカーでクライアントの認証(双方向認証)を有効にするには、クライアント認証をすべてのブローカーで構成する必要があります。このとき、
requested
ではなくrequired
を使用するように構成します。これは、構成が正しくないクライアントの接続が成功し、セキュリティが確保されていると誤って認識してしまう可能性があるためです。ssl.client.auth=required
注釈
SASL 認証メカニズムが対象のリスナーで有効になっている場合、
ssl.client.auth=required
を指定している場合でも、SSL クライアントの認証が無効になります。対象リスナーで SASL を使用する場合にのみ、ブローカーはクライアントを認証します。
オプション設定¶
ssl.endpoint.identification.algorithm
- クライアントで使用してサーバーホスト名を検証するエンドポイント識別アルゴリズム。デフォルト値は
https
です。ブローカー間通信のためにブローカーによって行われるクライアント接続を含め、クライアントではブローカーのホスト名がその証明書のホスト名と一致することを検証します。サーバーホスト名の検証を無効にするには、ssl.endpoint.identification.algorithm
を空の文字列に設定します。
- 型: string
- デフォルト: https
- 重要度: 中
ssl.cipher.suites
- 暗号スイートは、認証、暗号化、MAC、およびキー交換アルゴリズムの名前付き組み合わせで、TLS または SSL ネットワークプロトコルを使用したネットワーク接続のセキュリティ設定をネゴシエートするために使用されます。
- 型: List
- デフォルト: null(デフォルトで、サポートするすべての暗号スイートが有効)
- 重要度: 中
ssl.enabled.protocols
- SSL 接続で有効なプロトコルのリスト。
- 型: List
- デフォルト: TLSv1.2,TLSv1.1,TLSv1
- 重要度: 中
ssl.truststore.type
- トラストストアファイルのファイルフォーマットです。
- 型: string
- デフォルト: JKS
- 重要度: 中
一部の国における輸入規制のため、Oracle の実装では、デフォルトで暗号化アルゴリズムの強度が制限されています。強力なアルゴリズムが必要な場合(256 ビットキーの AES など)、JCE Unlimited Strength Jurisdiction ポリシーファイル を入手し、JDK/JRE にインストールする必要があります。詳細については、JCA プロバイダーのドキュメント を参照してください。
クライアント¶
新しいプロデューサーとコンシューマークライアントは、Kafka バージョン 0.9.0 以上でセキュリティをサポートします。
Kafka Streams API を使用している場合、同等の SSL パラメーターおよび SASL パラメーターを構成する方法を参照してください。
次の構成例では、クライアント認証がブローカーで要求されることを前提にしています。したがってクライアントプロパティファイル client-ssl.properties
に認証情報を格納できます。ブローカーの構成ファイルにパスワードが直接保管されるため、ファイルシステムのアクセス許可を使用して、これらのファイルへのアクセスを制限することが重要です。
重要
以下の設定を Schema Registry または REST Proxy に対して構成する場合、各パラメーターのプレフィックスとして confluent.license
を使用する必要があります。たとえば、security.protocol
は confluent.license.security.protocol
になります。
bootstrap.servers=kafka1:9093
security.protocol=SSL
ssl.truststore.location=/var/private/ssl/kafka.client.truststore.jks
ssl.truststore.password=test1234
ssl.keystore.location=/var/private/ssl/kafka.client.keystore.jks
ssl.keystore.password=test1234
ssl.key.password=test1234
ssl.truststore.password
は技術的には省略可能ですが、使用することを強くお勧めします。パスワードが設定されていなくてもトラストストアにアクセスできますが、整合性チェックが無効になります。
次の例では、kafka-console-producer
と kafka-console-consumer
を使用し、前に定義した client-ssl.properties
を渡します。
bin/kafka-console-producer --broker-list kafka1:9093 --topic test --producer.config client-ssl.properties
bin/kafka-console-consumer --bootstrap-server kafka1:9093 --topic test --consumer.config client-ssl.properties --from-beginning
オプション設定¶
ssl.endpoint.identification.algorithm
- クライアントで使用してサーバーホスト名を検証するエンドポイント識別アルゴリズム。デフォルト値は
https
です。ブローカー間通信のためにブローカーによって行われるクライアント接続を含め、クライアントではブローカーのホスト名がその証明書のホスト名と一致することを検証します。サーバーホスト名の検証を無効にするには、ssl.endpoint.identification.algorithm
を空の文字列に設定します。
- 型: string
- デフォルト: https
- 重要度: 中
ssl.provider
- SSL 接続に使用するセキュリティプロバイダーの名前。デフォルト値は、JVM のデフォルトのセキュリティプロバイダーです。
- 型: string
- デフォルト: null
- 重要度: 中
ssl.cipher.suites
- 暗号スイートは、認証、暗号化、MAC、およびキー交換アルゴリズムの名前付き組み合わせで、TLS または SSL ネットワークプロトコルを使用したネットワーク接続のセキュリティ設定をネゴシエートするために使用されます。
- 型: List
- デフォルト: null(デフォルトで、サポートするすべての暗号スイートが有効)
- 重要度: 中
ssl.enabled.protocols
- SSL 接続で有効なプロトコルのリスト。
- 型: List
- デフォルト: TLSv1.2,TLSv1.1,TLSv1
- 重要度: 中
ssl.truststore.type
- トラストストアファイルのファイルフォーマットです。
- 型: string
- デフォルト: JKS
- 重要度: 中
ZooKeeper¶
Confluent Platform バージョン 5.5.0 以上で、Kafka にバンドルされるバージョンの ZooKeeper で、TLS をサポートします。詳細については、「実行中クラスターへのセキュリティの追加」を参照してください。
Kafka Connect¶
このセクションでは、Kafka Connect のセキュリティを有効にする方法について説明します。Kafka Connect をセキュアにするには、次の対象でセキュリティを構成する必要があります。
- Kafka Connect ワーカー : Kafka Connect API の一部で、ワーカーは実際には内部的に機能する高度なクライアントです。
- Kafka Connect コネクター: コネクターにはプロデューサーとコンシューマーを組み込むことができます。そのため、ソースコネクターで使用される Connect プロデューサーやシンクコネクターで使用される Connect コンシューマーのデフォルトの構成をオーバーライドする必要があります。
- Kafka Connect REST: Kafka Connect では、 追加のプロパティ を使って SSL を使用するように構成できる REST API が公開されています。
次のセクションに示すとおり、Kafka Connect のセキュリティを構成します。さらに、Confluent Control Center のストリームモニタリングを Kafka Connect に対して使用している場合、次の対象でセキュリティを構成します。
次のプロパティを connect-distributed.properties
に追加することで、Connect ワーカーが SSL を使用するトップレベルの設定を構成します。これらのトップレベルの設定は、Connect ワーカーでグループ調整のために使用されます。またこれらの設定を使用して、クラスターのステート(構成とオフセットなど)を追跡するために使われる内部トピックの読み取りと書き込みが実行されます。ここでは、クライアント認証がブローカーにより要求されていることが前提です。
bootstrap.servers=kafka1:9093
security.protocol=SSL
ssl.truststore.location=/var/private/ssl/kafka.client.truststore.jks
ssl.truststore.password=test1234
ssl.keystore.location=/var/private/ssl/kafka.client.keystore.jks
ssl.keystore.password=test1234
ssl.key.password=test1234
Connect ワーカーでは、ソースコネクターで使用されるプロデューサーおよびシンクコネクターで使用されるコンシューマーを管理します。したがって、セキュリティを活用するコネクターでは、ワーカーが使用するプロデューサーやコンシューマーのデフォルト構成をオーバーライドする必要もあります。ここでは、クライアント認証がブローカーにより要求されていることが前提です。
ソースコネクター : 同じプロパティに
producer
プレフィックスを追加して構成します。producer.bootstrap.servers=kafka1:9093 producer.security.protocol=SSL producer.ssl.truststore.location=/var/private/ssl/kafka.client.truststore.jks producer.ssl.truststore.password=test1234 producer.ssl.keystore.location=/var/private/ssl/kafka.client.keystore.jks producer.ssl.keystore.password=test1234 producer.ssl.key.password=test1234
シンクコネクター : 同じプロパティに
consumer
プレフィックスを追加して構成します。consumer.bootstrap.servers=kafka1:9093 consumer.security.protocol=SSL consumer.ssl.truststore.location=/var/private/ssl/kafka.client.truststore.jks consumer.ssl.truststore.password=test1234 consumer.ssl.keystore.location=/var/private/ssl/kafka.client.keystore.jks consumer.ssl.keystore.password=test1234 consumer.ssl.key.password=test1234
ちなみに
詳細については「Kafka Connect のセキュリティの基礎」を参照してください。
Confluent Replicator¶
Confluent Replicator は、Kafka ソースコネクターの一種で、送信元 Kafka クラスターから送信先クラスターにデータを複製します。Replicator に組み込まれたコンシューマーは、送信元クラスターからデータを読み取り、Kafka Connect ワーカーに組み込まれたプロデューサーは送信先クラスターにデータを書き込みます。
Replicator バージョン 4.0 以下では、送信元および送信先の Kafka クラスターで ZooKeeper への接続が必要です。ZooKeeper で認証を有効にする場合、クライアントで ZooKeeper のセキュリティ認証情報を、Connect ワーカーのグローバル JAAS 構成設定 -Djava.security.auth.login.config
に構成します。さらに送信元と送信先のクラスターで ZooKeeper のセキュリティ認証情報が一致する必要があります。
Confluent Replicator のセキュリティを構成するには、次に示すとおり Replicator コネクターを構成し、さらに次のコンポーネントを構成する必要があります。
SSL を Confluent Replicator の組み込みコンシューマーに追加するには、Replicator の JSON プロパティファイルを変更します。
これは SSL の暗号化と認証用に追加する構成プロパティのサブセットの一例です。ここでは、クライアント認証がブローカーにより要求されていることが前提です。
{
"name":"replicator",
"config":{
....
"src.kafka.ssl.truststore.location":"/etc/kafka/secrets/kafka.connect.truststore.jks",
"src.kafka.ssl.truststore.password":"confluent",
"src.kafka.ssl.keystore.location":"/etc/kafka/secrets/kafka.connect.keystore.jks",
"src.kafka.ssl.keystore.password":"confluent",
"src.kafka.ssl.key.password":"confluent",
"src.kafka.security.protocol":"SSL"
....
}
}
}
参考
Confluent Replicator の構成例については、SSL の送信元認証デモのスクリプト を参照してください。共通のセキュリティ構成のデモについては、Replicator のセキュリティデモ を参照してください。
SSL 認証を使用して送信先クラスター用に Confluent Replicator を構成するには、Replicator の JSON 構成を次の内容が含まれるように変更します。
{
"name":"replicator",
"config":{
....
"dest.kafka.ssl.truststore.location":"/etc/kafka/secrets/kafka.connect.truststore.jks",
"dest.kafka.ssl.truststore.password":"confluent",
"dest.kafka.ssl.keystore.location":"/etc/kafka/secrets/kafka.connect.keystore.jks",
"dest.kafka.ssl.keystore.password":"confluent",
"dest.kafka.ssl.key.password":"confluent",
"dest.kafka.security.protocol":"SSL"
....
}
}
}
さらに次のプロパティが Connect ワーカーに必要です。
security.protocol=SSL
ssl.truststore.location=/etc/kafka/secrets/kafka.connect.truststore.jks
ssl.truststore.password=confluent
ssl.keystore.location=/etc/kafka/secrets/kafka.connect.keystore.jks
ssl.keystore.password=confluent
ssl.key.password=confluent
producer.security.protocol=SSL
producer.ssl.truststore.location=/etc/kafka/secrets/kafka.connect.truststore.jks
producer.ssl.truststore.password=confluent
producer.ssl.keystore.location=/etc/kafka/secrets/kafka.connect.keystore.jks
producer.ssl.keystore.password=confluent
producer.ssl.key.password=confluent
詳細については、 こちら で Connect ワーカーの全般的なセキュリティ構成を確認してください。
参考
Confluent Replicator の構成例については、SSL の送信先認証デモのスクリプト を参照してください。共通のセキュリティ構成のデモについては、Replicator のセキュリティデモ を参照してください。
Confluent Control Center¶
Confluent Control Center は Kafka Streams をステートストアとして使用します。つまり Control Center が支援するクラスターにあるすべての Kafka ブローカーがセキュアである場合、Control Center アプリケーションもセキュアにする必要があります。
注釈
RBAC が有効である場合、Control Center を Kerberos とともに使用できません。Control Center では、OAUTHBEARER 以外の SASL メカニズムをサポートできないためです。
次のセクションに示すとおり、Control Center アプリケーションのセキュリティを有効にします。さらに、以下のコンポーネントのセキュリティを構成します。
- Confluent Metrics Reporter: モニタリング対象の本稼働環境のクラスターで必須
- Confluent Monitoring Interceptor: Control Center のストリームモニタリングを使用している場合に必要
Confluent Control Center の SSL を etc/confluent-control-center/control-center.properties
ファイルで有効にします。ここでは、クライアント認証がブローカーにより要求されていることが前提です。
confluent.controlcenter.streams.security.protocol=SSL
confluent.controlcenter.streams.ssl.truststore.location=/var/private/ssl/kafka.client.truststore.jks
confluent.controlcenter.streams.ssl.truststore.password=test1234
confluent.controlcenter.streams.ssl.keystore.location=/var/private/ssl/kafka.client.keystore.jks
confluent.controlcenter.streams.ssl.keystore.password=test1234
confluent.controlcenter.streams.ssl.key.password=test1234
Confluent Metrics Reporter¶
このセクションでは、Confluent Control Center と Auto Data Balancer で使用される Confluent Metrics Reporter の SSL 暗号化と認証を有効にする方法について説明します。
Confluent Metrics Reporter の SSL を有効にするには、モニタリング対象の Kafka クラスターのブローカーにある server.properties
に次の内容を追加します。ここでは、クライアント認証がブローカーにより要求されていることが前提です。
confluent.metrics.reporter.bootstrap.servers=kafka1:9093
confluent.metrics.reporter.security.protocol=SSL
confluent.metrics.reporter.ssl.truststore.location=/var/private/ssl/kafka.client.truststore.jks
confluent.metrics.reporter.ssl.truststore.password=test1234
confluent.metrics.reporter.ssl.keystore.location=/var/private/ssl/kafka.client.keystore.jks
confluent.metrics.reporter.ssl.keystore.password=test1234
confluent.metrics.reporter.ssl.key.password=test1234
Confluent モニタリングインターセプター¶
Confluent Monitoring Interceptor を Confluent Control Center のストリームモニタリングで使用します。このセクションでは、Confluent Monitoring Interceptor 向けのセキュリティを次の 3 か所で有効にする方法について説明します。
- 一般クライアント
- Kafka Connect
- Confluent Replicator
重要
Confluent Monitoring Interceptor の典型的なユースケースは、構成が一般に異なる、個別のモニタリングクラスターにモニタリングデータを渡すことです。インターセプターの構成では、モニタリングされているコンポーネントの構成を継承しません。モニタリングされているコンポーネントの構成を使用する場合は、適切なプレフィックスを追加する必要があります。たとえば、オプション confluent.monitoring.interceptor.security.protocol=SSL
では、プロデューサーに使用する場合、producer.
のプレフィックスを使用して producer.confluent.monitoring.interceptor.security.protocol=SSL
とする必要があります。
一般クライアント用のインターセプター¶
Kafka クライアントで動作する Confluent Control Center のストリームモニタリングについては、各クライアントで Confluent Monitoring Interceptor の SSL 暗号化と認証を構成する必要があります。
- クライアントでインターセプターが構成されていることを検証します。
プロデューサーの場合
interceptor.classes=io.confluent.monitoring.clients.interceptor.MonitoringProducerInterceptor
コンシューマーの場合
interceptor.classes=io.confluent.monitoring.clients.interceptor.MonitoringConsumerInterceptor
インターセプターに対して SSL 暗号化と認証を構成します。ここでは、クライアント認証がブローカーにより要求されていることが前提です。
confluent.monitoring.interceptor.bootstrap.servers=kafka1:9093
confluent.monitoring.interceptor.security.protocol=SSL
confluent.monitoring.interceptor.ssl.truststore.location=/var/private/ssl/kafka.client.truststore.jks
confluent.monitoring.interceptor.ssl.truststore.password=test1234
confluent.monitoring.interceptor.ssl.keystore.location=/var/private/ssl/kafka.client.keystore.jks
confluent.monitoring.interceptor.ssl.keystore.password=test1234
confluent.monitoring.interceptor.ssl.key.password=test1234
Kafka Connect 用のインターセプター¶
Confluent Control Center のストリームモニタリングと Kafka Connect を連動させる場合、Kafka Connect で Confluent Monitoring Interceptor に対して SSL を構成する必要があります。ここでは、クライアント認証がブローカーにより要求されていることが前提です。コネクターがソースかシンクかに応じて、次のプロパティを connect-distributed.properties
に追加することで、Connect ワーカーを構成します。
ソースコネクター :
producer
プレフィックスを使用して、SSL 暗号化と認証用に Confluent Monitoring Interceptor を構成します。producer.interceptor.classes=io.confluent.monitoring.clients.interceptor.MonitoringProducerInterceptor producer.confluent.monitoring.interceptor.bootstrap.servers=kafka1:9093 producer.confluent.monitoring.interceptor.security.protocol=SSL producer.confluent.monitoring.interceptor.ssl.truststore.location=/var/private/ssl/kafka.client.truststore.jks producer.confluent.monitoring.interceptor.ssl.truststore.password=test1234 producer.confluent.monitoring.interceptor.ssl.keystore.location=/var/private/ssl/kafka.client.keystore.jks producer.confluent.monitoring.interceptor.ssl.keystore.password=test1234 producer.confluent.monitoring.interceptor.ssl.key.password=test1234
シンクコネクター:
consumer
プレフィックスを使用して、SSL 暗号化と認証用に Confluent Monitoring Interceptor を構成します。consumer.interceptor.classes=io.confluent.monitoring.clients.interceptor.MonitoringConsumerInterceptor consumer.confluent.monitoring.interceptor.bootstrap.servers=kafka1:9093 consumer.confluent.monitoring.interceptor.security.protocol=SSL consumer.confluent.monitoring.interceptor.ssl.truststore.location=/var/private/ssl/kafka.client.truststore.jks consumer.confluent.monitoring.interceptor.ssl.truststore.password=test1234 consumer.confluent.monitoring.interceptor.ssl.keystore.location=/var/private/ssl/kafka.client.keystore.jks consumer.confluent.monitoring.interceptor.ssl.keystore.password=test1234 consumer.confluent.monitoring.interceptor.ssl.key.password=test1234
Replicator 用のインターセプター¶
Confluent Control Center のストリームモニタリングと Replicator を連動させる場合、Replicator の JSON 構成ファイルで Confluent Monitoring Interceptor に対して SSL を構成する必要があります。次に、SSL の暗号化と認証用に追加する構成プロパティのサブセットの一例を示します。
{
"name":"replicator",
"config":{
....
"src.consumer.group.id": "replicator",
"src.consumer.interceptor.classes": "io.confluent.monitoring.clients.interceptor.MonitoringConsumerInterceptor",
"src.consumer.confluent.monitoring.interceptor.security.protocol": "SSL",
"src.consumer.confluent.monitoring.interceptor.bootstrap.servers": "kafka1:9093",
"src.consumer.confluent.monitoring.interceptor.ssl.truststore.location": "/var/private/ssl/kafka.client.truststore.jks",
"src.consumer.confluent.monitoring.interceptor.ssl.truststore.password": "test1234",
"src.consumer.confluent.monitoring.interceptor.ssl.keystore.location": "/var/private/ssl/kafka.client.keystore.jks",
"src.consumer.confluent.monitoring.interceptor.ssl.keystore.password": "test1234",
"src.consumer.confluent.monitoring.interceptor.ssl.key.password": "test1234",
....
}
}
}
Schema Registry¶
Schema Registry はスキーマを永続化するために Kafka を使用します。つまり、Kafka クラスターにデータを書き込むためのクライアントとして機能します。したがって、Kafka ブローカーでセキュリティが構成されている場合、Schema Registry もセキュリティを使用するように構成する必要があります。網羅的なリストについては、「Schema Registry の構成オプション」も参照してください。
SSL の暗号化と認証用に追加する schema-registry.properties
構成パラメーターのサブセットの一例を次に示します。ここでは、クライアント認証がブローカーにより要求されていることが前提です。
kafkastore.bootstrap.servers=SSL://kafka1:9093
kafkastore.security.protocol=SSL
kafkastore.ssl.truststore.location=/var/private/ssl/kafka.client.truststore.jks
kafkastore.ssl.truststore.password=test1234
kafkastore.ssl.keystore.location=/var/private/ssl/kafka.client.keystore.jks
kafkastore.ssl.keystore.password=test1234
kafkastore.ssl.key.password=test1234
REST Proxy¶
Confluent REST Proxy を SSL 暗号化と認証でセキュアにするには、以下の間でセキュリティを構成する必要があります。
- REST クライアントと REST Proxy(HTTPS)
- REST Proxy と Kafka クラスター
網羅的なリストについては、 REST Proxy の構成オプション も参照してください。
HTTPS を REST クライアントと REST Proxy 間で構成します。HTTPS を構成するための
kafka-rest.properties
構成パラメーターのサブセットの一例を次に示します。ssl.truststore.location=/var/private/ssl/kafka.client.truststore.jks ssl.truststore.password=test1234 ssl.keystore.location=/var/private/ssl/kafka.client.keystore.jks ssl.keystore.password=test1234 ssl.key.password=test1234
REST Proxy と Kafka クラスター間で SSL 暗号化と認証を構成します。SSL の暗号化と認証用に追加する
kafka-rest.properties
構成パラメーターのサブセットの一例を次に示します。ここでは、クライアント認証がブローカーにより要求されていることが前提です。client.bootstrap.servers=kafka1:9093 client.security.protocol=SSL client.ssl.truststore.location=/var/private/ssl/kafka.client.truststore.jks client.ssl.truststore.password=test1234 client.ssl.keystore.location=/var/private/ssl/kafka.client.keystore.jks client.ssl.keystore.password=test1234 client.ssl.key.password=test1234
SSL のログ記録¶
Kafka ブローカーまたはクライアントあるいはその両方を、javax.net.debug
システムプロパティを指定して起動することで、SSL デバッグログを JVM レベルで有効にします。以下に例を示します。
export KAFKA_OPTS=-Djavax.net.debug=all
bin/kafka-server-start etc/kafka/server.properties
ちなみに
これらの手順では、ZIP または TAR アーカイブを使用して Confluent Platform をインストールしているという想定に基づいています。詳細については「オンプレミスのデプロイ」を参照してください。
ブローカーを起動したら、server.log
に次のような内容を確認できます。
with addresses: PLAINTEXT -> EndPoint(192.168.64.1,9092,PLAINTEXT),SSL -> EndPoint(192.168.64.1,9093,SSL)
サーバーのキーストアとトラストストアが正しくセットアップされていることを検証するには、次のコマンドを実行します。
openssl s_client -debug -connect localhost:9093 -tls1
注 : TLSv1 が ssl.enabled.protocols
に指定されている必要があります。
このコマンドの出力で、サーバーの証明書を確認します。
-----BEGIN CERTIFICATE-----
{variable sized random bytes}
-----END CERTIFICATE-----
subject=/C=US/ST=CA/L=Santa Clara/O=org/OU=org/CN=Joe Smith
issuer=/C=US/ST=CA/L=Santa Clara/O=org/OU=org/CN=kafka/emailAddress=test@test.com
詳細については、Oracle ドキュメント の『 Debugging SSL/TLS connections 』を参照してください。
openssl
コマンドで証明書が表示されない場合、または他のエラーメッセージがある場合、使用するキーや証明書が正しくセットアップされていません。構成を見直してください。