Kafka Connect のセキュリティの基礎¶
暗号化¶
Apache Kafka® クラスターで SSL 暗号化を有効にした場合は、Kafka Connect でもセキュリティが構成されていることを確認する必要があります。Kafka Connect で暗号化を構成する方法については、以下のセクションをクリックして参照してください。
認証¶
Kafka クラスターで認証を有効にした場合は、Kafka Connect でもセキュリティが構成されていることを確認する必要があります。Kafka Connect で認証を構成する方法については、以下のセクションをクリックして参照してください。
プリンシパルの分割¶
Connect ワーカーの構成で、producer.
および consumer.
のプレフィックスが付いているすべてのプロパティは、そのワーカー内で作成されたすべてのソースコネクターおよびシンクコネクターに適用されます。admin.
プレフィックスは、シンクコネクターのエラーレポートに使用します。これらのプレフィックスの使用方法は、以下のとおりです。
consumer.
プレフィックスでは、シンクコネクターに関するコンシューマーの動作を制御できます。producer.
プレフィックスでは、ソースコネクターに関するプロデューサーの動作を制御できます。producer.
とadmin.
のプレフィックスはどちらも、シンクコネクターのエラーレポートに関するプロデューサーおよびクライアントの動作を制御します。
個々のコネクターでこれらのプロパティをオーバーライドするには、producer.override.
、consumer.override.
、admin.override.
のプレフィックスを使用します。これには、ワーカーのサービスプリンシパルの構成のオーバーライドも含まれ、コネクターごとに個別のサービスプリンシパルが作成されます。オーバーライドは、デフォルトでは無効です。有効にするには、connector.client.config.override.policy
ワーカープロパティを使用します。このプロパティでは、ワーカーで許可されるコネクターごとのオーバーライドを設定します。オーバーライドポリシーの初期設定(OOTB)のオプションは次のとおりです。
connector.client.config.override.policy=None
- デフォルトです。構成のオーバーライドを許可しません。
connector.client.config.override.policy=Principal
producer.override.
、consumer.override
、admin.override
プレフィックスを使用したsecurity.protocol
、sasl.jaas.config
、sasl.mechanism
構成プロパティのオーバーライドを許可します。
connector.client.config.override.policy=All
producer.override.
、consumer.override
、admin.override
プレフィックスを使用したすべての構成プロパティのオーバーライドを許可します。
ちなみに
OOTB のポリシーではニーズが満たされない場合は、ConnectorClientConfigOverridePolicy クラスを独自に記述して実装することができます。
Kafka ブローカーで SSL を使用したクライアント認証がサポートされている場合は、ワーカーとコネクターで別々のプリンシパルを構成できます。この場合、それぞれについて、 別々の証明書を生成 し、その証明書を別々のキーストアにインストールする必要があります。
Connect の構成の主な違いは以下のとおりです。固有のパスワード、キーストアの場所、キーストアのパスワードが使用されることに注意してください。
# Authentication settings for Connect workers
ssl.keystore.location=/var/private/ssl/kafka.worker.keystore.jks
ssl.keystore.password=worker1234
ssl.key.password=worker1234
Connect ワーカーでは、ソースコネクターで使用されるプロデューサーおよびシンクコネクターで使用されるコンシューマーを管理します。つまり、セキュリティを活用するコネクターでは、ワーカーが使用するプロデューサーやコンシューマーのデフォルト構成をオーバーライドする必要もあります。
# Authentication settings for Connect producers used with source connectors
producer.ssl.keystore.location=/var/private/ssl/kafka.source.keystore.jks
producer.ssl.keystore.password=connector1234
producer.ssl.key.password=connector1234
# Authentication settings for Connect consumers used with sink connectors
consumer.ssl.keystore.location=/var/private/ssl/kafka.sink.keystore.jks
consumer.ssl.keystore.password=connector1234
consumer.ssl.key.password=connector1234
ACL に関する考慮事項¶
コネクターごとに個別のプリンシパルを使用することにより、きめ細かな アクセス制御リスト (ACL)を定義できます。たとえば、この機能を使用して、Connect クラスターで使用される内部トピックにコネクター自体が書き込むことを禁止することができます。また、ソースコネクターとシンクコネクターで別々のキーストアを使用して、ソースコネクターにはトピックに対する書き込みの権限のみを付与し、ソースコネクターには同じトピックに対する読み取りの権限のみを付与することができます。
ワーカーの ACL 要件¶
ワーカーには、クラスター内のすべてのワーカーが参加する共通のグループへのアクセス権限、および Connect で必須の内部トピック すべてへのアクセス権限を付与する必要があります。内部トピックへの読み取りと書き込みのアクセス権限は必ず必要になりますが、作成のアクセス権限は、内部トピックがまだ存在せず、Kafka Connect で自動的に作成する必要がある場合にのみ必要になります。以下の表は、必須のアクセス許可と、その値の指定に使用する構成設定をまとめたものです。
操作 | リソース | 構成する項目 |
---|---|---|
作成 | クラスター | config.storage.topic |
作成 | クラスター | config.storage.replication.factor |
作成 | クラスター | offset.storage.topic |
作成 | クラスター | offset.storage.partitions |
作成 | クラスター | offset.storage.replication.factor |
作成 | クラスター | status.storage.topic |
作成 | クラスター | status.storage.partitions |
作成 | クラスター | status.storage.replication.factor |
読み取り/書き込み | トピック | config.storage.topic |
読み取り/書き込み | トピック | offsets.storage.topic |
読み取り/書き込み | トピック | status.storage.topic |
読み取り | グループ | group.id |
コマンドラインから新しい ACL を作成する方法の詳細については、「ACL の追加」を参照してください。
コネクターの ACL 要件¶
ソースコネクターには、そのコネクターが書き込む必要があるすべてのトピックに対する WRITE
(書き込み)のアクセス許可を付与する必要があります。同様に、シンクコネクターには、そのコネクターが読み取るすべてのトピックに対する READ
(読み取り)のアクセス許可が必要です。また、シンクタスクは内部的にコンシューマーグループに依存するため、グループの READ
(読み取り)のアクセス許可も必要です。Connect はコンシューマーの group.id
をシンクコネクターごとに規則に従って connect-{name}
と定義します。この {name}
はコネクターの名前に置き換えられます。たとえば、シンクコネクターの名前が "hdfs-logs" で、読み取り元のトピックの名前が "logs" の場合、次のコマンドを使用して ACL を追加することができます。
bin/kafka-acls --bootstrap-server localhost:9092 --command-config adminclient-configs.conf \
--add --allow-principal User:<Sink Connector Principal> \
--consumer --topic logs --group connect-hdfs-logs
Connect Reporter¶
Kafka Connect Reporter は、シンク操作の結果をレポータートピックに送信します。レコードのシンクに成功した後、またはエラー条件に従って、結果レポートを提出するために Connect Reporter が呼び出されます。このレポートは、シンクイベントに関する追加情報とともに、元のレコードの処理方法に関する詳細が含められるように構成されています。それらのレコードは、消費できるように、構成可能な成功トピックおよびエラートピックに書き込まれます。シンクコネクター構成に追加されている基本的な Connect Reporter 構成プロパティの例を以下に示します。
reporter.bootstrap.servers=localhost:9092
reporter.result.topic.name=success-responses
reporter.result.topic.replication.factor=1
reporter.error.topic.name=error-responses
reporter.error.topic.replication.factor=1
Connect Reporter を完全に無効にするには、「Connect Reporter の無効化」を参照してください。
セキュアな環境の場合は、管理クライアントとプロデューサーの両方に対して構成ブロックを使用します。プロデューサーは、レポータートピックにレコードを送信するように構築されています。管理クライアントは、トピックを作成します。認証情報は、セキュアな環境で追加する必要があります。管理プロパティとプロデューサープロパティの例を以下に示します。
reporter.admin.bootstrap.servers=localhost:9092
reporter.admin.sasl.jaas.config=org.apache.kafka.common.security.plain.PlainLoginModule \
required username="<username>" password="<password>";
reporter.admin.security.protocol=SASL_SSL
reporter.admin.sasl.mechanism=PLAIN
reporter.producer.bootstrap.servers=localhost:9092
reporter.producer.sasl.jaas.config=org.apache.kafka.common.security.plain.PlainLoginModule \
required username="<username>" password="<password>";
reporter.producer.security.protocol=SASL_SSL
reporter.producer.sasl.mechanism=PLAIN
該当する Kafka Connect シンクコネクターの各ドキュメントには、追加の Reporter 構成プロパティの例が記載されています。例については、HTTP Sink connecter の 「Reporter」のプロパティ を参照してください。
Reporter と Kerberos のセキュリティ¶
Reporter と Kerberos セキュリティに必要なすべての構成プロパティを持つシンクコネクター構成の例を以下に示します。この例は、Prometheus Metrics Sink Connector を示していますが、対象のシンクコネクターに合わせて変更することができます。
{
"name" : "prometheus-connector",
"config" : {
"topics":"prediction-metrics",
"connector.class" : "io.confluent.connect.prometheus.PrometheusMetricsSinkConnector",
"tasks.max" : "1",
"confluent.topic.bootstrap.servers":"localhost:9092",
"confluent.topic.ssl.truststore.location":"/etc/pki/hadoop/kafkalab.jks",
"confluent.topic.ssl.truststore.password":"xxxx",
"confluent.topic.ssl.keystore.location":"/etc/pki/hadoop/kafkalab.jks",
"confluent.topic.ssl.keystore.password":"xxxx",
"confluent.topic.ssl.key.password":"xxxx",
"confluent.topic.security.protocol":"SASL_SSL",
"confluent.topic.replication.factor": "3",
"confluent.topic.sasl.kerberos.service.name":"kafka",
"confluent.topic.sasl.jaas.config":"com.sun.security.auth.module.Krb5LoginModule required \nuseKeyTab=true \nstoreKey=true \nkeyTab=\"/etc/security/keytabs/svc.kfkconnect.lab.keytab\" \nprincipal=\"svc.kfkconnect.lab@DS.DTVENG.NET\";",
"prometheus.scrape.url": "http://localhost:8889/metrics",
"value.converter": "org.apache.kafka.connect.json.JsonConverter",
"value.converter.schemas.enable": "false",
"behavior.on.error": "LOG",
"reporter.result.topic.replication.factor": "3",
"reporter.error.topic.replication.factor": "3",
"reporter.bootstrap.servers":"localhost:9092",
"reporter.producer.ssl.truststore.location":"/etc/pki/hadoop/kafkalab.jks",
"reporter.producer.ssl.truststore.password":"xxxx",
"reporter.producer.ssl.keystore.location":"/etc/pki/hadoop/kafkalab.jks",
"reporter.producer.ssl.keystore.password":"xxxx",
"reporter.producer.ssl.key.password":"xxxx",
"reporter.producer.security.protocol":"SASL_SSL",
"reporter.producer.sasl.kerberos.service.name":"kafka",
"reporter.producer.sasl.jaas.config":"com.sun.security.auth.module.Krb5LoginModule required \nuseKeyTab=true \nstoreKey=true \nkeyTab=\"/etc/security/keytabs/svc.kfkconnect.lab.keytab\" \nprincipal=\"svc.kfkconnect.lab@DS.DTVENG.NET\";",
"reporter.admin.ssl.truststore.location":"/etc/pki/hadoop/kafkalab.jks",
"reporter.admin.ssl.truststore.password":"xxxx",
"reporter.admin.ssl.keystore.location":"/etc/pki/hadoop/kafkalab.jks",
"reporter.admin.ssl.keystore.password":"xxxx",
"reporter.admin.ssl.key.password":"xxxx",
"reporter.admin.security.protocol":"SASL_SSL",
"reporter.admin.sasl.kerberos.service.name":"kafka",
"reporter.admin.sasl.jaas.config":"com.sun.security.auth.module.Krb5LoginModule required \nuseKeyTab=true \nstoreKey=true \nkeyTab=\"/etc/security/keytabs/svc.kfkconnect.lab.keytab\" \nprincipal=\"svc.kfkconnect.lab@DS.DTVENG.NET\";",
"confluent.license":"eyJ0eXAiOiJK ...omitted"
}
ロールベースアクセス制御¶
組織で ロールベースアクセス制御(RBAC) が有効になっている場合は、Kafka Connect または Apache Kafka® クラスターの操作を行う前に、ユーザープリンシパル、RBAC ロール、および RBAC ロールのアクセス許可を確認する必要があります。Kafka Connect で Kafka クラスターを保護するための RBAC の構成方法については、「Kafka Connect と RBAC」を参照してください。
シークレットの外部化¶
ConfigProvider クラスインターフェイスを使用することにより、コネクターの構成でシークレットがクリアテキストで表示されることを防止できます。
ConfigProvider クラスのインターフェイスでは、起動時に動的に解決されるワーカー構成の変数を使用することができます。また、コネクターが(再)起動されたときに動的に解決されるコネクター構成の変数を使用することもできます。構成プロパティ値の中の変数をシークレットの代わりに使用したり、実行時に動的に解決する必要がある情報の代わりに使用したりすることができます。
注釈
コネクターの構成は、Connect REST API の変数で保持され、共有されます。コネクターの起動時にのみ、メモリー内の変数の一時的な解決や置き換えが行われます。シークレットは、コネクター構成、ログ、または REST API のリクエストや応答の中には 保持されません。
Connect ワーカーは、変数の解決に、ワーカー構成で定義されている名前付きの ConfigProviders に依存します。各変数では、使用するべき ConfigProvider の名前と、ConfigProvider が変数を置換文字列に解決するために使用する情報を指定します。
ConfigProvider
の実装はすべて、標準的な Java の ServiceLoader
メカニズムを使用して検出されます。ConfigProvider
のカスタム実装を作成するには、ConfigProvider インターフェイスを実装します。実装クラス、および ConfigProvider
実装クラスの完全修飾名を含めた META-INF/services/org.apache.kafka.common.config.provider.ConfigProvider
という名前のファイルを JAR ファイルにパッケージ化します。JAR ファイルでは、Connect フレームワークで提供されているもの以外のサードパーティのライブラリを使用できますが、以下に示すように JAR ファイルと一緒にインストールする必要があることに注意してください。
ConfigProvider
のカスタム実装をインストールするには、JAR ファイルを含む新しいサブディレクトリを Connect の plugin.path
にあるディレクトリに追加し、Connect ワーカーを(再)起動します。Connect ワーカーが起動すると、ワーカー構成で指定された ConfigProvider
のすべての実装がインスタンス化されます。config.providers.[provider].param.
のプレフィックスが付加されているプロパティはすべて、ConfigProvider
の configure()
メソッドに渡されます。Connect ワーカーがシャットダウンされると、ConfigProvider
の close()
メソッドが呼び出されます。
重要
Connect クラスター内のすべてのワーカーは、ワーカー構成内のすべての変数と、すべてのコネクター構成で使用されるすべての変数を解決できなければなりません。そのためには、以下の要件を満たしている必要があります。
- Connect クラスター内のすべてのワーカーに、同じ一連の名前付き構成プロバイダーが必要です。
- 各ワーカーの各プロバイダーには、ワーカー構成またはコネクター構成で使用される変数を解決するために必要なリソースへのアクセス権限が付与されている必要があります。
これを実現するため、分散ワーカーの構成には以下の構成プロパティが追加されています。
config.providers
: プロバイダーの名前のコンマ区切りリスト。config.providers.{name}.class
: プロバイダーの Java クラス名。config.providers.{name}.param.{param-name}
: 初期化時に上記の Java クラスに渡すパラメーター。
FileConfigProvider¶
Kafka は、変数の参照を各ワーカーのローカルファイルの値に置き換えることができる、FileConfigProvider
と呼ばれる ConfigProvider
の実装を提供しています。たとえば、構成プロパティにシークレットを設定するのではなく、ローカルファイルにシークレットを保存して、コネクター構成の変数を使用することができます。コネクターが起動されると、Connect は、ファイル構成プロバイダーを使用して変数を解決し、実際のシークレットに置き換えます。このようにすることで、コネクターの構成が Connect REST API で保持され、共有される場合に、コネクターの構成にシークレットを含めないようにすることができます。
重要
Connect クラスター内の すべてのワーカー は、構成プロバイダーを参照するすべての変数によって参照されるファイルにアクセスできる必要があります。
FileConfigProvider
を名前で参照する変数は、${provider:[path:]key}
という形式を取る必要があります。path は、各 Connect ワーカーのプロパティファイルの完全修飾パス、key はそのプロパティファイル内のキーの名前です。FileConfigProvider
では、各変数にパス(およびそのファイルのプロパティキー)が指定されているすべてのファイルの読み取りがサポートされています。Connect は、このような変数を解決するとき、プロパティファイルを読み取り、対応するキーの値を抽出して、変数全体をその値に置き換えます。
データベースの URL、ユーザー名、パスワードを含む JDBC コネクター構成を以下に示します。
connection.url=jdbc:oracle:thin:@myhost:1521:orcl
connection.user=scott
connection.password=<my-secret-password>
これらの詳細をコネクター構成で公開する代わりに、FileConfigProvider
を使用して、Connect の各ワーカーがアクセスできるファイルに保管することで、他の OS ユーザーから保護することができます。以下の例では、/opt/connect-secrets.properties
という名前の別のファイルを示します。/opt/connect-secrets.properties
に追加されているプロパティは次のとおりです。
productsdb-url=jdbc:oracle:thin:@myhost:1521:orcl
productsdb-username=scott
productsdb-password=my-secret-password
other-connector-url=jdbc:oracle:thin:@myhost:1521:orcl
other-connector-username=customers
other-connector-password=superSecret!
各 Connect ワーカーが FileConfigProvider
を使用するように構成することができます。ワーカーの構成には、以下のプロパティが含まれます。
# Additional properties added to the worker configuration
config.providers=file
config.providers.file.class=org.apache.kafka.common.config.provider.FileConfigProvider
JDBC コネクターの構成で、シークレットの代わりに "変数" を使用できるようになりました。
# Additional properties added to the connector configuration
connection.url=${file:/opt/connect-secrets.properties:productsdb-url}
connection.user=${file:/opt/connect-secrets.properties:productsdb-username}
connection.password=${file:/opt/connect-secrets.properties:productsdb-password}
別のコネクターの構成では、別のファイルの変数を使用することや、同じファイル内の別のプロパティを使用する変数を使用することができます。
# Additional properties added to another connector configuration
connection.url=${file:/opt/connect-secrets.properties:other-connector-url}
connection.user=${file:/opt/connect-secrets.properties:other-connector-username}
connection.password=${file:/opt/connect-secrets.properties:other-connector-password}
InternalSecretConfigProvider¶
Confluent Platform には、ConfigProvider
の実装がもう 1 つ用意されており、InternalSecretConfigProvider
という名前で、Connect の シークレットレジストリ で使用されます。シークレットレジストリはシークレットを提供するレイヤーであり、これにより、Connect が暗号化された Connect 認証情報を、REST API で公開されるトピックに保管できるようにします。これにより、実際のコネクターの構成に、暗号化されていない認証情報が含まれないようにすることができます。以下の例は、ワーカーの構成ファイルで InternalSecretConfigProvider
を構成する方法を示しています。
### Secret Provider
config.providers=secret
config.providers.secret.class=io.confluent.connect.secretregistry.rbac.config.provider.InternalSecretConfigProvider
config.providers.secret.param.master.encryption.key=<encryption key>
config.providers.secret.param.kafkastore.bootstrap.servers=SASL_PLAINTEXT://<Kafka broker URLs>
config.providers.secret.param.kafkastore.security.protocol=SASL_PLAINTEXT
config.providers.secret.param.kafkastore.sasl.mechanism=OAUTHBEARER
config.providers.secret.param.kafkastore.sasl.login.callback.handler.class=io.confluent.kafka.clients.plugins.auth.token.TokenUserLoginCallbackHandler
config.providers.secret.param.kafkastore.sasl.jaas.config=org.apache.kafka.common.security.oauthbearer.OAuthBearerLoginModule required \
username="<service-principal-username>" \
password="<service-principal-password>" \
metadataServerUrls="<metadata server URLs>";
HTTP または HTTPS を使用するための Connect REST API の構成¶
Kafka Connect では、デフォルトで HTTP を使用して REST API の呼び出しができます。HTTP と HTTPS のどちらか、または両方を使用できるように、Connect を構成することもできます。
リスナーの構成パラメーターで、Kafka Connect で使用されるプロトコルを指定します。この構成には、リスナーのリストを protocol://host:port,protocol2://host2:port2
の形式で含める必要があります。以下に例を示します。
listeners=http://localhost:8080,https://localhost:8443
デフォルトでは、リスナーが指定されていない場合、REST サーバーは HTTP プロトコルを使用して、ポート 8083 で実行されます。HTTPS を使用する場合は、構成に SSL の構成を含める必要があります。デフォルトでは、ssl.*
の設定が使用されます。listeners.https
プレフィックスを使用することにより、REST API では、Kafka ブローカーとは異なる構成を使用できます。listeners.https
プレフィックスを使用する場合、ssl.*
オプションは無視されます。
REST API で HTTPS を構成するには、以下のフィールドを使用できます。
ssl.keystore.location
ssl.keystore.password
ssl.keystore.type
ssl.key.password
ssl.truststore.location
ssl.truststore.password
ssl.truststore.type
ssl.enabled.protocols
ssl.provider
ssl.protocol
ssl.cipher.suites
ssl.keymanager.algorithm
ssl.secure.random.implementation
ssl.trustmanager.algorithm
ssl.endpoint.identification.algorithm
ssl.client.auth
詳細については、「分散ワーカーの構成」を参照してください。
REST API は、Kafka Connect のモニタリングと管理、および Kafka Connect のクラスター間通信に使用されます。フォロワーノードの REST API で受信されたリクエストは、リーダーノードの REST API に転送されます。URI ホストが、リッスンしている URI と異なる場合は、rest.advertised.host.name
、rest.advertised.port
、rest.advertised.listener
構成オプションを使用して URI を変更できます。この URI は、フォロワーノードがリーダーと接続するために使用されます。
HTTP リスナーと HTTPS リスナーの両方を使用する場合は、rest.advertised.listener
オプションを使用して、クラスター間通信に使用するリスナーを指定できます。ノード間の通信に HTTPS を使用する場合は、同じ ssl.*
または listeners.https
オプションを使用して HTTPS クライアントを構成できます。
現在サポートされている REST API のエンドポイントは以下のとおりです。
GET /connectors
- アクティブなコネクターのリストを返します。POST /connectors
- 新しいコネクターを作成します。リクエスト本文は、string 型の名前フィールドと、コネクターの構成パラメーターが指定されたオブジェクト構成フィールドを含む JSON オブジェクトである必要があります。GET /connectors/{name}
- 指定したコネクターの情報を取得します。GET /connectors/{name}/config
- 指定したコネクターの構成パラメーターを取得します。PUT /connectors/{name}/config
- 指定したコネクターの構成パラメーターを更新します。GET /connectors/{name}/status
- コネクターの現在のステータスを取得します。このステータスには、実行中、エラー発生または一時停止中に加え、どのワーカーに割り当てられているか、エラーが発生していた場合はエラー情報、そのコネクターのすべてのタスクの状態が含まれます。GET /connectors/{name}/tasks
- コネクターで現在実行中のタスクのリストを取得します。GET /connectors/{name}/tasks/{taskid}/status
- タスクの現在のステータスを取得します。このステータスには、実行中、エラー発生または一時停止中に加え、どのワーカーに割り当てられているか、エラーが発生していた場合はエラー情報が含まれます。PUT /connectors/{name}/pause
- コネクターおよびそのコネクターのタスクを一時停止します。コネクターが再開されるまで、メッセージ処理は停止します。PUT /connectors/{name}/resume
- 一時停止しているコネクターを再開させます。コネクターが一時停止していない場合は、何もしません。POST /connectors/{name}/restart
- コネクターを再起動します。通常は、コネクターで障害が発生した場合に使用します。POST /connectors/{name}/tasks/{taskId}/restart
- 個々のタスクを再開します。通常は、コネクターで障害が発生した場合に使用します。DELETE /connectors/{name}
- コネクターを削除します。すべてのタスクを中止し、構成を削除します。
また、Kafka Connect REST API を使用して、コネクタープラグインの情報を取得することもできます。
GET /connector-plugins
- Kafka Connect クラスターにインストールされているコネクタープラグインのリストを返します。この API では、リクエストを処理するワーカー上のコネクターのみがチェックされます。そのため、新しいコネクター JAR を追加した場合のローリングアップグレード中などには、結果に不整合が生じることがあります。PUT /connector-plugins/{connector-type}/config/validate
- 指定された構成値を、構成の定義と比較して検証します。この API は、構成の項目ごとに検証を行い、推奨値と、検証中のエラーメッセージを返します。
詳細については、「REST API」を参照してください。
HTTPS エンドポイントを使用するように Kafka Connect を構成し、Confluent Control Center で接続する場合のデモについては、「Confluent Platform デモ」を参照してください。