Kafka Connect のセキュリティの基礎

暗号化

Apache Kafka® クラスターで SSL 暗号化を有効にした場合は、Kafka Connect でもセキュリティが構成されていることを確認する必要があります。Kafka Connect で暗号化を構成する方法については、以下のセクションをクリックして参照してください。

SSL による暗号化

認証

Kafka クラスターで認証を有効にした場合は、Kafka Connect でもセキュリティが構成されていることを確認する必要があります。Kafka Connect で認証を構成する方法については、以下のセクションをクリックして参照してください。

プリンシパルの分割

Connect ワーカーの構成で、producer. および consumer. のプレフィックスが付いているすべてのプロパティは、そのワーカー内で作成されたすべてのソースコネクターおよびシンクコネクターに適用されます。admin. プレフィックスは、シンクコネクターのエラーレポートに使用します。これらのプレフィックスの使用方法は、以下のとおりです。

  • consumer. プレフィックスでは、シンクコネクターに関するコンシューマーの動作を制御できます。
  • producer. プレフィックスでは、ソースコネクターに関するプロデューサーの動作を制御できます。
  • producer.admin. のプレフィックスはどちらも、シンクコネクターのエラーレポートに関するプロデューサーおよびクライアントの動作を制御します。

個々のコネクターでこれらのプロパティをオーバーライドするには、producer.override.consumer.override.admin.override. のプレフィックスを使用します。これには、ワーカーのサービスプリンシパルの構成のオーバーライドも含まれ、コネクターごとに個別のサービスプリンシパルが作成されます。オーバーライドは、デフォルトでは無効です。有効にするには、connector.client.config.override.policy ワーカープロパティを使用します。このプロパティでは、ワーカーで許可されるコネクターごとのオーバーライドを設定します。オーバーライドポリシーの初期設定(OOTB)のオプションは次のとおりです。

  • connector.client.config.override.policy=None
    デフォルトです。構成のオーバーライドを許可しません。
  • connector.client.config.override.policy=Principal
    producer.override.consumer.overrideadmin.override プレフィックスを使用した security.protocolsasl.jaas.configsasl.mechanism 構成プロパティのオーバーライドを許可します。
  • connector.client.config.override.policy=All
    producer.override.consumer.overrideadmin.override プレフィックスを使用したすべての構成プロパティのオーバーライドを許可します。

ちなみに

OOTB のポリシーではニーズが満たされない場合は、ConnectorClientConfigOverridePolicy クラスを独自に記述して実装することができます。

Kafka ブローカーで SSL を使用したクライアント認証がサポートされている場合は、ワーカーとコネクターで別々のプリンシパルを構成できます。この場合、それぞれについて、 別々の証明書を生成 し、その証明書を別々のキーストアにインストールする必要があります。

Connect の構成の主な違いは以下のとおりです。固有のパスワード、キーストアの場所、キーストアのパスワードが使用されることに注意してください。

# Authentication settings for Connect workers
ssl.keystore.location=/var/private/ssl/kafka.worker.keystore.jks
ssl.keystore.password=worker1234
ssl.key.password=worker1234

Connect ワーカーでは、ソースコネクターで使用されるプロデューサーおよびシンクコネクターで使用されるコンシューマーを管理します。つまり、セキュリティを活用するコネクターでは、ワーカーが使用するプロデューサーやコンシューマーのデフォルト構成をオーバーライドする必要もあります。

# Authentication settings for Connect producers used with source connectors
producer.ssl.keystore.location=/var/private/ssl/kafka.source.keystore.jks
producer.ssl.keystore.password=connector1234
producer.ssl.key.password=connector1234

# Authentication settings for Connect consumers used with sink connectors
consumer.ssl.keystore.location=/var/private/ssl/kafka.sink.keystore.jks
consumer.ssl.keystore.password=connector1234
consumer.ssl.key.password=connector1234

ACL に関する考慮事項

コネクターごとに個別のプリンシパルを使用することにより、きめ細かな アクセス制御リスト (ACL)を定義できます。たとえば、この機能を使用して、Connect クラスターで使用される内部トピックにコネクター自体が書き込むことを禁止することができます。また、ソースコネクターとシンクコネクターで別々のキーストアを使用して、ソースコネクターにはトピックに対する書き込みの権限のみを付与し、ソースコネクターには同じトピックに対する読み取りの権限のみを付与することができます。

ワーカーの ACL 要件

Workers must be given access to the common group that all workers in a cluster join, and to all the internal topics required by Connect. Read and write access to the internal topics are always required, but create access is only required if the internal topics don't yet exist and Kafka Connect is to automatically create them. The table below shows each required permission and the relevant configuration setting used to define its value.

操作 リソース 構成する項目
作成 クラスター config.storage.topic
作成 クラスター config.storage.replication.factor
作成 クラスター offset.storage.topic
作成 クラスター offset.storage.partitions
作成 クラスター offset.storage.replication.factor
作成 クラスター status.storage.topic
作成 クラスター status.storage.partitions
作成 クラスター status.storage.replication.factor
読み取り/書き込み トピック config.storage.topic
読み取り/書き込み トピック offsets.storage.topic
読み取り/書き込み トピック status.storage.topic
読み取り グループ group.id

コマンドラインから新しい ACL を作成する方法の詳細については、「ACL の追加」を参照してください。

コネクターの ACL 要件

ソースコネクターには、そのコネクターが書き込む必要があるすべてのトピックに対する WRITE (書き込み)のアクセス許可を付与する必要があります。同様に、シンクコネクターには、そのコネクターが読み取るすべてのトピックに対する READ (読み取り)のアクセス許可が必要です。また、シンクタスクは内部的にコンシューマーグループに依存するため、グループの READ (読み取り)のアクセス許可も必要です。Connect はコンシューマーの group.id をシンクコネクターごとに規則に従って connect-{name} と定義します。この {name} はコネクターの名前に置き換えられます。たとえば、シンクコネクターの名前が "hdfs-logs" で、読み取り元のトピックの名前が "logs" の場合、次のコマンドを使用して ACL を追加することができます。

bin/kafka-acls --bootstrap-server localhost:9092 --command-config adminclient-configs.conf \
 --add --allow-principal User:<Sink Connector Principal> \
 --consumer --topic logs --group connect-hdfs-logs

Connect Reporter

Kafka Connect Reporter は、シンク操作の結果をレポータートピックに送信します。レコードのシンクに成功した後、またはエラー条件に従って、結果レポートを提出するために Connect Reporter が呼び出されます。このレポートは、シンクイベントに関する追加情報とともに、元のレコードの処理方法に関する詳細が含められるように構成されています。それらのレコードは、消費できるように、構成可能な成功トピックおよびエラートピックに書き込まれます。シンクコネクター構成に追加されている基本的な Connect Reporter 構成プロパティの例を以下に示します。

reporter.bootstrap.servers=localhost:9092
reporter.result.topic.name=success-responses
reporter.result.topic.replication.factor=1
reporter.error.topic.name=error-responses
reporter.error.topic.replication.factor=1

To completely disable Connect Reporter, see Disabling Connect Reporter.

セキュアな環境の場合は、管理クライアントとプロデューサーの両方に対して構成ブロックを使用します。プロデューサーは、レポータートピックにレコードを送信するように構築されています。管理クライアントは、トピックを作成します。認証情報は、セキュアな環境で追加する必要があります。管理プロパティとプロデューサープロパティの例を以下に示します。

reporter.admin.bootstrap.servers=localhost:9092
reporter.admin.sasl.jaas.config=org.apache.kafka.common.security.plain.PlainLoginModule \
required username="<username>" password="<password>";
reporter.admin.security.protocol=SASL_SSL
reporter.admin.sasl.mechanism=PLAIN

reporter.producer.bootstrap.servers=localhost:9092
reporter.producer.sasl.jaas.config=org.apache.kafka.common.security.plain.PlainLoginModule \
required username="<username>" password="<password>";
reporter.producer.security.protocol=SASL_SSL
reporter.producer.sasl.mechanism=PLAIN

Additional Reporter configuration property examples are provided in each applicable Kafka Connect sink connector document. For an example, see the Reporter properties in the HTTP Sink connecter.

Reporter と Kerberos のセキュリティ

The following configuration example shows a sink connector with all the necessary configuration properties for Reporter and Kerberos security. This example shows the Prometheus Metrics Sink connector, but can be modified for any applicable sink connector.

{

  "name" : "prometheus-connector",
  "config" : {
    "topics":"prediction-metrics",
    "connector.class" : "io.confluent.connect.prometheus.PrometheusMetricsSinkConnector",
    "tasks.max" : "1",
    "confluent.topic.bootstrap.servers":"localhost:9092",
    "confluent.topic.ssl.truststore.location":"/etc/pki/hadoop/kafkalab.jks",
    "confluent.topic.ssl.truststore.password":"xxxx",
    "confluent.topic.ssl.keystore.location":"/etc/pki/hadoop/kafkalab.jks",
    "confluent.topic.ssl.keystore.password":"xxxx",
    "confluent.topic.ssl.key.password":"xxxx",
    "confluent.topic.security.protocol":"SASL_SSL",
    "confluent.topic.replication.factor": "3",
    "confluent.topic.sasl.kerberos.service.name":"kafka",
    "confluent.topic.sasl.jaas.config":"com.sun.security.auth.module.Krb5LoginModule required \nuseKeyTab=true \nstoreKey=true \nkeyTab=\"/etc/security/keytabs/svc.kfkconnect.lab.keytab\" \nprincipal=\"svc.kfkconnect.lab@DS.DTVENG.NET\";",
    "prometheus.scrape.url": "http://localhost:8889/metrics",
    "value.converter": "org.apache.kafka.connect.json.JsonConverter",
    "value.converter.schemas.enable": "false",
    "behavior.on.error": "LOG",
    "reporter.result.topic.replication.factor": "3",
    "reporter.error.topic.replication.factor": "3",
    "reporter.bootstrap.servers":"localhost:9092",
    "reporter.producer.ssl.truststore.location":"/etc/pki/hadoop/kafkalab.jks",
    "reporter.producer.ssl.truststore.password":"xxxx",
    "reporter.producer.ssl.keystore.location":"/etc/pki/hadoop/kafkalab.jks",
    "reporter.producer.ssl.keystore.password":"xxxx",
    "reporter.producer.ssl.key.password":"xxxx",
    "reporter.producer.security.protocol":"SASL_SSL",
    "reporter.producer.sasl.kerberos.service.name":"kafka",
    "reporter.producer.sasl.jaas.config":"com.sun.security.auth.module.Krb5LoginModule required \nuseKeyTab=true \nstoreKey=true \nkeyTab=\"/etc/security/keytabs/svc.kfkconnect.lab.keytab\" \nprincipal=\"svc.kfkconnect.lab@DS.DTVENG.NET\";",
    "reporter.admin.ssl.truststore.location":"/etc/pki/hadoop/kafkalab.jks",
    "reporter.admin.ssl.truststore.password":"xxxx",
    "reporter.admin.ssl.keystore.location":"/etc/pki/hadoop/kafkalab.jks",
    "reporter.admin.ssl.keystore.password":"xxxx",
    "reporter.admin.ssl.key.password":"xxxx",
    "reporter.admin.security.protocol":"SASL_SSL",
    "reporter.admin.sasl.kerberos.service.name":"kafka",
    "reporter.admin.sasl.jaas.config":"com.sun.security.auth.module.Krb5LoginModule required \nuseKeyTab=true \nstoreKey=true \nkeyTab=\"/etc/security/keytabs/svc.kfkconnect.lab.keytab\" \nprincipal=\"svc.kfkconnect.lab@DS.DTVENG.NET\";",
    "confluent.license":"eyJ0eXAiOiJK ...omitted"

  }

ロールベースアクセス制御

組織で ロールベースアクセス制御(RBAC) が有効になっている場合は、Kafka Connect または Apache Kafka® クラスターの操作を行う前に、ユーザープリンシパル、RBAC ロール、および RBAC ロールのアクセス許可を確認する必要があります。Kafka Connect で Kafka クラスターを保護するための RBAC の構成方法については、「Kafka Connect と RBAC」を参照してください。

シークレットの外部化

ConfigProvider クラスインターフェイスを使用することにより、コネクターの構成でシークレットがクリアテキストで表示されることを防止できます。

The ConfigProvider class interface allows you to use variables in your worker configuration that are dynamically resolved upon startup. It also allows you to use variables in your connector configurations that are dynamically resolved when the connector is (re)started. You can use variables within configuration property values in place of secrets, or in place of any information that should be resolved dynamically at runtime.

注釈

コネクターの構成は、Connect REST API の変数で保持され、共有されます。コネクターの起動時にのみ、メモリー内の変数の一時的な解決や置き換えが行われます。シークレットは、コネクター構成、ログ、または REST API のリクエストや応答の中には 保持されません

Connect ワーカーは、変数の解決に、ワーカー構成で定義されている名前付きの ConfigProviders に依存します。各変数では、使用するべき ConfigProvider の名前と、ConfigProvider が変数を置換文字列に解決するために使用する情報を指定します。

All ConfigProvider implementations are discovered using the standard Java ServiceLoader mechanism. To create a custom implementation of ConfigProvider, implement the ConfigProvider interface. Package the implementation class(es) and a file named META-INF/services/org.apache.kafka.common.config.provider.ConfigProvider containing the fully qualified name of the ConfigProvider implementation class into a JAR file. Note that the JAR file can use third-party libraries other than those provided by the Connect framework, but they must be installed with the JAR file as described below.

ConfigProvider のカスタム実装をインストールするには、JAR ファイルを含む新しいサブディレクトリを Connect の plugin.path にあるディレクトリに追加し、Connect ワーカーを(再)起動します。Connect ワーカーが起動すると、ワーカー構成で指定された ConfigProvider のすべての実装がインスタンス化されます。config.providers.[provider].param. のプレフィックスが付加されているプロパティはすべて、ConfigProviderconfigure() メソッドに渡されます。Connect ワーカーがシャットダウンされると、ConfigProviderclose() メソッドが呼び出されます。

重要

Connect クラスター内のすべてのワーカーは、ワーカー構成内のすべての変数と、すべてのコネクター構成で使用されるすべての変数を解決できなければなりません。そのためには、以下の要件を満たしている必要があります。

  • Connect クラスター内のすべてのワーカーに、同じ一連の名前付き構成プロバイダーが必要です。
  • 各ワーカーの各プロバイダーには、ワーカー構成またはコネクター構成で使用される変数を解決するために必要なリソースへのアクセス権限が付与されている必要があります。

これを実現するため、分散ワーカーの構成には以下の構成プロパティが追加されています。

  • config.providers: プロバイダーの名前のコンマ区切りリスト。
  • config.providers.{name}.class: プロバイダーの Java クラス名。
  • config.providers.{name}.param.{param-name}: 初期化時に上記の Java クラスに渡すパラメーター。

FileConfigProvider

Kafka は、変数の参照を各ワーカーのローカルファイルの値に置き換えることができる、FileConfigProvider と呼ばれる ConfigProvider の実装を提供しています。たとえば、構成プロパティにシークレットを設定するのではなく、ローカルファイルにシークレットを保存して、コネクター構成の変数を使用することができます。コネクターが起動されると、Connect は、ファイル構成プロバイダーを使用して変数を解決し、実際のシークレットに置き換えます。このようにすることで、コネクターの構成が Connect REST API で保持され、共有される場合に、コネクターの構成にシークレットを含めないようにすることができます。

重要

Connect クラスター内の すべてのワーカー は、構成プロバイダーを参照するすべての変数によって参照されるファイルにアクセスできる必要があります。

FileConfigProvider を名前で参照する変数は、${provider:[path:]key} という形式を取る必要があります。path は、各 Connect ワーカーのプロパティファイルの完全修飾パス、key はそのプロパティファイル内のキーの名前です。FileConfigProvider では、各変数にパス(およびそのファイルのプロパティキー)が指定されているすべてのファイルの読み取りがサポートされています。Connect は、このような変数を解決するとき、プロパティファイルを読み取り、対応するキーの値を抽出して、変数全体をその値に置き換えます。

データベースの URL、ユーザー名、パスワードを含む JDBC コネクター構成を以下に示します。

connection.url=jdbc:oracle:thin:@myhost:1521:orcl
connection.user=scott
connection.password=<my-secret-password>

これらの詳細をコネクター構成で公開する代わりに、FileConfigProvider を使用して、Connect の各ワーカーがアクセスできるファイルに保管することで、他の OS ユーザーから保護することができます。以下の例では、/opt/connect-secrets.properties という名前の別のファイルを示します。/opt/connect-secrets.properties に追加されているプロパティは次のとおりです。

productsdb-url=jdbc:oracle:thin:@myhost:1521:orcl
productsdb-username=scott
productsdb-password=my-secret-password
other-connector-url=jdbc:oracle:thin:@myhost:1521:orcl
other-connector-username=customers
other-connector-password=superSecret!

各 Connect ワーカーが FileConfigProvider を使用するように構成することができます。ワーカーの構成には、以下のプロパティが含まれます。

# Additional properties added to the worker configuration

config.providers=file
config.providers.file.class=org.apache.kafka.common.config.provider.FileConfigProvider

JDBC コネクターの構成で、シークレットの代わりに "変数" を使用できるようになりました。

# Additional properties added to the connector configuration

connection.url=${file:/opt/connect-secrets.properties:productsdb-url}
connection.user=${file:/opt/connect-secrets.properties:productsdb-username}
connection.password=${file:/opt/connect-secrets.properties:productsdb-password}

別のコネクターの構成では、別のファイルの変数を使用することや、同じファイル内の別のプロパティを使用する変数を使用することができます。

# Additional properties added to another connector configuration

connection.url=${file:/opt/connect-secrets.properties:other-connector-url}
connection.user=${file:/opt/connect-secrets.properties:other-connector-username}
connection.password=${file:/opt/connect-secrets.properties:other-connector-password}

InternalSecretConfigProvider

Confluent Platform provides another implementation of ConfigProvider named InternalSecretConfigProvider which is used with the Connect Secret Registry. The Secret Registry is a secret serving layer that enables Connect to store encrypted Connect credentials in a topic exposed through a REST API. This eliminates any unencrypted credentials being located in the actual connector configuration. The following example shows how InternalSecretConfigProvider is configured in the worker configuration file:

### Secret Provider

config.providers=secret
config.providers.secret.class=io.confluent.connect.secretregistry.rbac.config.provider.InternalSecretConfigProvider

config.providers.secret.param.master.encryption.key=<encryption key>
config.providers.secret.param.kafkastore.bootstrap.servers=SASL_PLAINTEXT://<Kafka broker URLs>
config.providers.secret.param.kafkastore.security.protocol=SASL_PLAINTEXT
config.providers.secret.param.kafkastore.sasl.mechanism=OAUTHBEARER
config.providers.secret.param.kafkastore.sasl.login.callback.handler.class=io.confluent.kafka.clients.plugins.auth.token.TokenUserLoginCallbackHandler
config.providers.secret.param.kafkastore.sasl.jaas.config=org.apache.kafka.common.security.oauthbearer.OAuthBearerLoginModule required \
  username="<service-principal-username>" \
  password="<service-principal-password>" \
  metadataServerUrls="<metadata server URLs>";

HTTP または HTTPS を使用するための Connect REST API の構成

Kafka Connect では、デフォルトで HTTP を使用して REST API の呼び出しができます。HTTP と HTTPS のどちらか、または両方を使用できるように、Connect を構成することもできます。

リスナーの構成パラメーターで、Kafka Connect で使用されるプロトコルを指定します。この構成には、リスナーのリストを protocol://host:port,protocol2://host2:port2 の形式で含める必要があります。以下に例を示します。

listeners=http://localhost:8080,https://localhost:8443

デフォルトでは、リスナーが指定されていない場合、REST サーバーは HTTP プロトコルを使用して、ポート 8083 で実行されます。HTTPS を使用する場合は、構成に SSL の構成を含める必要があります。デフォルトでは、ssl.* の設定が使用されます。listeners.https プレフィックスを使用することにより、REST API では、Kafka ブローカーとは異なる構成を使用できます。listeners.https プレフィックスを使用する場合、ssl.* オプションは無視されます。

REST API で HTTPS を構成するには、以下のフィールドを使用できます。

  • ssl.keystore.location
  • ssl.keystore.password
  • ssl.keystore.type
  • ssl.key.password
  • ssl.truststore.location
  • ssl.truststore.password
  • ssl.truststore.type
  • ssl.enabled.protocols
  • ssl.provider
  • ssl.protocol
  • ssl.cipher.suites
  • ssl.keymanager.algorithm
  • ssl.secure.random.implementation
  • ssl.trustmanager.algorithm
  • ssl.endpoint.identification.algorithm
  • ssl.client.auth

詳細については、「分散ワーカーの構成」を参照してください。

REST API は、Kafka Connect のモニタリングと管理、および Kafka Connect のクラスター間通信に使用されます。フォロワーノードの REST API で受信されたリクエストは、リーダーノードの REST API に転送されます。URI ホストが、リッスンしている URI と異なる場合は、rest.advertised.host.namerest.advertised.portrest.advertised.listener 構成オプションを使用して URI を変更できます。この URI は、フォロワーノードがリーダーと接続するために使用されます。

HTTP リスナーと HTTPS リスナーの両方を使用する場合は、rest.advertised.listener オプションを使用して、クラスター間通信に使用するリスナーを指定できます。ノード間の通信に HTTPS を使用する場合は、同じ ssl.* または listeners.https オプションを使用して HTTPS クライアントを構成できます。

現在サポートされている REST API のエンドポイントは以下のとおりです。

  • GET /connectors - アクティブなコネクターのリストを返します。
  • POST /connectors - 新しいコネクターを作成します。リクエスト本文は、string 型の名前フィールドと、コネクターの構成パラメーターが指定されたオブジェクト構成フィールドを含む JSON オブジェクトである必要があります。
  • GET /connectors/{name} - 指定したコネクターの情報を取得します。
  • GET /connectors/{name}/config - 指定したコネクターの構成パラメーターを取得します。
  • PUT /connectors/{name}/config - 指定したコネクターの構成パラメーターを更新します。
  • GET /connectors/{name}/status - コネクターの現在のステータスを取得します。このステータスには、実行中、エラー発生または一時停止中に加え、どのワーカーに割り当てられているか、エラーが発生していた場合はエラー情報、そのコネクターのすべてのタスクの状態が含まれます。
  • GET /connectors/{name}/tasks - コネクターで現在実行中のタスクのリストを取得します。
  • GET /connectors/{name}/tasks/{taskid}/status - タスクの現在のステータスを取得します。このステータスには、実行中、エラー発生または一時停止中に加え、どのワーカーに割り当てられているか、エラーが発生していた場合はエラー情報が含まれます。
  • PUT /connectors/{name}/pause - コネクターおよびそのコネクターのタスクを一時停止します。コネクターが再開されるまで、メッセージ処理は停止します。
  • PUT /connectors/{name}/resume - 一時停止しているコネクターを再開させます。コネクターが一時停止していない場合は、何もしません。
  • POST /connectors/{name}/restart - コネクターを再起動します。通常は、コネクターで障害が発生した場合に使用します。
  • POST /connectors/{name}/tasks/{taskId}/restart - 個々のタスクを再開します。通常は、コネクターで障害が発生した場合に使用します。
  • DELETE /connectors/{name} - コネクターを削除します。すべてのタスクを中止し、構成を削除します。

また、Kafka Connect REST API を使用して、コネクタープラグインの情報を取得することもできます。

  • GET /connector-plugins - Kafka Connect クラスターにインストールされているコネクタープラグインのリストを返します。この API では、リクエストを処理するワーカー上のコネクターのみがチェックされます。そのため、新しいコネクター JAR を追加した場合のローリングアップグレード中などには、結果に不整合が生じることがあります。
  • PUT /connector-plugins/{connector-type}/config/validate - 指定された構成値を、構成の定義と比較して検証します。この API は、構成の項目ごとに検証を行い、推奨値と、検証中のエラーメッセージを返します。

詳細については、「REST API」を参照してください。

HTTPS エンドポイントを使用するように Kafka Connect を構成し、Confluent Control Center で接続する場合のデモについては、「Confluent Platform デモ」を参照してください。