Kafka Connect のセキュリティの基礎¶
暗号化¶
Apache Kafka® クラスターで SSL 暗号化を有効にした場合は、Kafka Connect でもセキュリティが構成されていることを確認する必要があります。Kafka Connect で暗号化を構成する方法については、以下のセクションをクリックして参照してください。
認証¶
Kafka クラスターで認証を有効にした場合は、Kafka Connect でもセキュリティが構成されていることを確認する必要があります。Kafka Connect で認証を構成する方法については、以下のセクションをクリックして参照してください。
プリンシパルの分割¶
Connect ワーカーの構成で、producer.
および consumer.
のプレフィックスが付いているすべてのプロパティは、そのワーカー内で作成されたすべてのソースコネクターおよびシンクコネクターに適用されます。admin.
プレフィックスは、シンクコネクターのエラーレポートに使用します。これらのプレフィックスの使用方法は、以下のとおりです。
consumer.
プレフィックスでは、シンクコネクターに関するコンシューマーの動作を制御できます。producer.
プレフィックスでは、ソースコネクターに関するプロデューサーの動作を制御できます。producer.
とadmin.
のプレフィックスはどちらも、シンクコネクターのエラーレポートに関するプロデューサーおよびクライアントの動作を制御します。
個々のコネクターでこれらのプロパティをオーバーライドするには、producer.override.
、consumer.override.
、admin.override.
のプレフィックスを使用します。これには、ワーカーのサービスプリンシパルの構成のオーバーライドも含まれ、コネクターごとに個別のサービスプリンシパルが作成されます。オーバーライドは、デフォルトでは無効です。有効にするには、connector.client.config.override.policy
ワーカープロパティを使用します。このプロパティでは、ワーカーで許可されるコネクターごとのオーバーライドを設定します。オーバーライドポリシーの初期設定(OOTB)のオプションは次のとおりです。
connector.client.config.override.policy=None
- デフォルトです。構成のオーバーライドを許可しません。
connector.client.config.override.policy=Principal
producer.override.
、consumer.override
、admin.override
プレフィックスを使用したsecurity.protocol
、sasl.jaas.config
、sasl.mechanism
構成プロパティのオーバーライドを許可します。
connector.client.config.override.policy=All
producer.override.
、consumer.override
、admin.override
プレフィックスを使用したすべての構成プロパティのオーバーライドを許可します。
ちなみに
OOTB のポリシーではニーズが満たされない場合は、ConnectorClientConfigOverridePolicy クラスを独自に記述して実装することができます。
Kafka ブローカーで SSL を使用したクライアント認証がサポートされている場合は、ワーカーとコネクターで別々のプリンシパルを構成できます。この場合、それぞれについて、 別々の証明書を生成 し、その証明書を別々のキーストアにインストールする必要があります。
Connect の構成の主な違いは以下のとおりです。固有のパスワード、キーストアの場所、キーストアのパスワードが使用されることに注意してください。
# Authentication settings for Connect workers
ssl.keystore.location=/var/private/ssl/kafka.worker.keystore.jks
ssl.keystore.password=worker1234
ssl.key.password=worker1234
Connect ワーカーでは、ソースコネクターで使用されるプロデューサーおよびシンクコネクターで使用されるコンシューマーを管理します。つまり、セキュリティを活用するコネクターでは、ワーカーが使用するプロデューサーやコンシューマーのデフォルト構成をオーバーライドする必要もあります。
# Authentication settings for Connect producers used with source connectors
producer.ssl.keystore.location=/var/private/ssl/kafka.source.keystore.jks
producer.ssl.keystore.password=connector1234
producer.ssl.key.password=connector1234
# Authentication settings for Connect consumers used with sink connectors
consumer.ssl.keystore.location=/var/private/ssl/kafka.sink.keystore.jks
consumer.ssl.keystore.password=connector1234
consumer.ssl.key.password=connector1234
ACL に関する考慮事項¶
コネクターごとに個別のプリンシパルを使用することにより、きめ細かな アクセス制御リスト (ACL)を定義できます。たとえば、この機能を使用して、Connect クラスターで使用される内部トピックにコネクター自体が書き込むことを禁止することができます。また、ソースコネクターとシンクコネクターで別々のキーストアを使用して、ソースコネクターにはトピックに対する書き込みの権限のみを付与し、ソースコネクターには同じトピックに対する読み取りの権限のみを付与することができます。
ワーカーの ACL 要件¶
Workers must be given access to the common group that all workers in a cluster join, and to all the internal topics required by Connect. Read and write access to the internal topics are always required, but create access is only required if the internal topics don't yet exist and Kafka Connect is to automatically create them. The table below shows each required permission and the relevant configuration setting used to define its value.
操作 | リソース | 構成する項目 |
---|---|---|
作成 | クラスター | config.storage.topic |
作成 | クラスター | config.storage.replication.factor |
作成 | クラスター | offset.storage.topic |
作成 | クラスター | offset.storage.partitions |
作成 | クラスター | offset.storage.replication.factor |
作成 | クラスター | status.storage.topic |
作成 | クラスター | status.storage.partitions |
作成 | クラスター | status.storage.replication.factor |
読み取り/書き込み | トピック | config.storage.topic |
読み取り/書き込み | トピック | offsets.storage.topic |
読み取り/書き込み | トピック | status.storage.topic |
読み取り | グループ | group.id |
コマンドラインから新しい ACL を作成する方法の詳細については、「ACL の追加」を参照してください。
コネクターの ACL 要件¶
ソースコネクターには、そのコネクターが書き込む必要があるすべてのトピックに対する WRITE
(書き込み)のアクセス許可を付与する必要があります。同様に、シンクコネクターには、そのコネクターが読み取るすべてのトピックに対する READ
(読み取り)のアクセス許可が必要です。また、シンクタスクは内部的にコンシューマーグループに依存するため、グループの READ
(読み取り)のアクセス許可も必要です。Connect はコンシューマーの group.id
をシンクコネクターごとに規則に従って connect-{name}
と定義します。この {name}
はコネクターの名前に置き換えられます。たとえば、シンクコネクターの名前が "hdfs-logs" で、読み取り元のトピックの名前が "logs" の場合、次のコマンドを使用して ACL を追加することができます。
bin/kafka-acls --bootstrap-server localhost:9092 --command-config adminclient-configs.conf \
--add --allow-principal User:<Sink Connector Principal> \
--consumer --topic logs --group connect-hdfs-logs
_confluent-command
トピックにアクセスするコネクターには、以下の ACL を構成する必要があります。
- コネクターでトピックの作成が必要な場合は、リソースクラスターでの CREATE および DESCRIBE。
_confluent-command
トピックでの DESCRIBE、READ、および WRITE。
_confluent-command
トピックの ACL の構成の詳細については、「ライセンストピックの ACL」を参照してください。
エンタープライズコネクターの ACL 要件¶
_confluent-command
トピックには、confluent.license
プロパティで指定されたライセンスキーに対応するライセンスが格納されます。このトピックはデフォルトで作成されます。_confluent-command
トピックにアクセスするコネクターには、以下の ACL を構成する必要があります。
- コネクターでトピックの作成が必要な場合は、リソースクラスターでの
CREATE
およびDESCRIBE
。 _confluent-command
トピックでのDESCRIBE
、READ
、およびWRITE
。
Connect Reporter¶
Kafka Connect Reporter は、シンク操作の結果をレポータートピックに送信します。レコードのシンクに成功した後、またはエラー条件に従って、結果レポートを提出するために Connect Reporter が呼び出されます。このレポートは、シンクイベントに関する追加情報とともに、元のレコードの処理方法に関する詳細が含められるように構成されています。それらのレコードは、消費できるように、構成可能な成功トピックおよびエラートピックに書き込まれます。シンクコネクター構成に追加されている基本的な Connect Reporter 構成プロパティの例を以下に示します。
reporter.bootstrap.servers=localhost:9092
reporter.result.topic.name=success-responses
reporter.result.topic.replication.factor=1
reporter.error.topic.name=error-responses
reporter.error.topic.replication.factor=1
To completely disable Connect Reporter, see Disabling Connect Reporter.
セキュアな環境の場合は、管理クライアントとプロデューサーの両方に対して構成ブロックを使用します。プロデューサーは、レポータートピックにレコードを送信するように構築されています。管理クライアントは、トピックを作成します。認証情報は、セキュアな環境で追加する必要があります。管理プロパティとプロデューサープロパティの例を以下に示します。
reporter.admin.bootstrap.servers=localhost:9092
reporter.admin.sasl.jaas.config=org.apache.kafka.common.security.plain.PlainLoginModule \
required username="<username>" password="<password>";
reporter.admin.security.protocol=SASL_SSL
reporter.admin.sasl.mechanism=PLAIN
reporter.producer.bootstrap.servers=localhost:9092
reporter.producer.sasl.jaas.config=org.apache.kafka.common.security.plain.PlainLoginModule \
required username="<username>" password="<password>";
reporter.producer.security.protocol=SASL_SSL
reporter.producer.sasl.mechanism=PLAIN
Additional Reporter configuration property examples are provided in each applicable Kafka Connect sink connector document. For an example, see the Reporter properties in the HTTP Sink connector.
Reporter と Kerberos のセキュリティ¶
The following configuration example shows a sink connector with all the necessary configuration properties for Reporter and Kerberos security. This example shows the Prometheus Metrics Sink connector, but can be modified for any applicable sink connector.
{
"name" : "prometheus-connector",
"config" : {
"topics":"prediction-metrics",
"connector.class" : "io.confluent.connect.prometheus.PrometheusMetricsSinkConnector",
"tasks.max" : "1",
"confluent.topic.bootstrap.servers":"localhost:9092",
"confluent.topic.ssl.truststore.location":"/etc/pki/hadoop/kafkalab.jks",
"confluent.topic.ssl.truststore.password":"xxxx",
"confluent.topic.ssl.keystore.location":"/etc/pki/hadoop/kafkalab.jks",
"confluent.topic.ssl.keystore.password":"xxxx",
"confluent.topic.ssl.key.password":"xxxx",
"confluent.topic.security.protocol":"SASL_SSL",
"confluent.topic.replication.factor": "3",
"confluent.topic.sasl.kerberos.service.name":"kafka",
"confluent.topic.sasl.jaas.config":"com.sun.security.auth.module.Krb5LoginModule required \nuseKeyTab=true \nstoreKey=true \nkeyTab=\"/etc/security/keytabs/svc.kfkconnect.lab.keytab\" \nprincipal=\"svc.kfkconnect.lab@DS.DTVENG.NET\";",
"prometheus.scrape.url": "http://localhost:8889/metrics",
"value.converter": "org.apache.kafka.connect.json.JsonConverter",
"value.converter.schemas.enable": "false",
"behavior.on.error": "LOG",
"reporter.result.topic.replication.factor": "3",
"reporter.error.topic.replication.factor": "3",
"reporter.bootstrap.servers":"localhost:9092",
"reporter.producer.ssl.truststore.location":"/etc/pki/hadoop/kafkalab.jks",
"reporter.producer.ssl.truststore.password":"xxxx",
"reporter.producer.ssl.keystore.location":"/etc/pki/hadoop/kafkalab.jks",
"reporter.producer.ssl.keystore.password":"xxxx",
"reporter.producer.ssl.key.password":"xxxx",
"reporter.producer.security.protocol":"SASL_SSL",
"reporter.producer.sasl.kerberos.service.name":"kafka",
"reporter.producer.sasl.jaas.config":"com.sun.security.auth.module.Krb5LoginModule required \nuseKeyTab=true \nstoreKey=true \nkeyTab=\"/etc/security/keytabs/svc.kfkconnect.lab.keytab\" \nprincipal=\"svc.kfkconnect.lab@DS.DTVENG.NET\";",
"reporter.admin.ssl.truststore.location":"/etc/pki/hadoop/kafkalab.jks",
"reporter.admin.ssl.truststore.password":"xxxx",
"reporter.admin.ssl.keystore.location":"/etc/pki/hadoop/kafkalab.jks",
"reporter.admin.ssl.keystore.password":"xxxx",
"reporter.admin.ssl.key.password":"xxxx",
"reporter.admin.security.protocol":"SASL_SSL",
"reporter.admin.sasl.kerberos.service.name":"kafka",
"reporter.admin.sasl.jaas.config":"com.sun.security.auth.module.Krb5LoginModule required \nuseKeyTab=true \nstoreKey=true \nkeyTab=\"/etc/security/keytabs/svc.kfkconnect.lab.keytab\" \nprincipal=\"svc.kfkconnect.lab@DS.DTVENG.NET\";",
"confluent.license":"eyJ0eXAiOiJK ...omitted"
}
ロールベースアクセス制御¶
組織で ロールベースアクセス制御(RBAC) が有効になっている場合は、Kafka Connect または Apache Kafka® クラスターの操作を行う前に、ユーザープリンシパル、RBAC ロール、および RBAC ロールのアクセス許可を確認する必要があります。Kafka Connect で Kafka クラスターを保護するための RBAC の構成方法については、「Kafka Connect と RBAC」を参照してください。
シークレットの外部化¶
ConfigProvider クラスインターフェイスを使用することにより、コネクターの構成でシークレットがクリアテキストで表示されることを防止できます。
The ConfigProvider class interface allows you to use variables in your worker configuration that are dynamically resolved upon startup. It also allows you to use variables in your connector configurations that are dynamically resolved when the connector is (re)started. You can use variables within configuration property values in place of secrets, or in place of any information that should be resolved dynamically at runtime.
注釈
コネクターの構成は、Connect REST API の変数で保持され、共有されます。コネクターの起動時にのみ、メモリー内の変数の一時的な解決や置き換えが行われます。シークレットは、コネクター構成、ログ、または REST API のリクエストや応答の中には 保持されません。
Connect ワーカーは、変数の解決に、ワーカー構成で定義されている名前付きの ConfigProviders に依存します。各変数では、使用するべき ConfigProvider の名前と、ConfigProvider が変数を置換文字列に解決するために使用する情報を指定します。
All ConfigProvider
implementations are discovered using the standard Java
ServiceLoader
mechanism. To create a custom implementation of
ConfigProvider
, implement the ConfigProvider
interface. Package the implementation class(es) and a file named
META-INF/services/org.apache.kafka.common.config.provider.ConfigProvider
containing the fully qualified name of the ConfigProvider
implementation
class into a JAR file. Note that the JAR file can use third-party libraries
other than those provided by the Connect framework, but they must be
installed with the JAR file as described below.
ConfigProvider
のカスタム実装をインストールするには、JAR ファイルを含む新しいサブディレクトリを Connect の plugin.path
にあるディレクトリに追加し、Connect ワーカーを(再)起動します。Connect ワーカーが起動すると、ワーカー構成で指定された ConfigProvider
のすべての実装がインスタンス化されます。config.providers.[provider].param.
のプレフィックスが付加されているプロパティはすべて、ConfigProvider
の configure()
メソッドに渡されます。Connect ワーカーがシャットダウンされると、ConfigProvider
の close()
メソッドが呼び出されます。
重要
Connect クラスター内のすべてのワーカーは、ワーカー構成内のすべての変数と、すべてのコネクター構成で使用されるすべての変数を解決できなければなりません。そのためには、以下の要件を満たしている必要があります。
- Connect クラスター内のすべてのワーカーに、同じ一連の名前付き構成プロバイダーが必要です。
- 各ワーカーの各プロバイダーには、ワーカー構成またはコネクター構成で使用される変数を解決するために必要なリソースへのアクセス権限が付与されている必要があります。
これを実現するため、分散ワーカーの構成には以下の構成プロパティが追加されています。
config.providers
: プロバイダーの名前のコンマ区切りリスト。config.providers.{name}.class
: プロバイダーの Java クラス名。config.providers.{name}.param.{param-name}
: 初期化時に上記の Java クラスに渡すパラメーター。
FileConfigProvider¶
Kafka は、変数の参照を各ワーカーのローカルファイルの値に置き換えることができる、FileConfigProvider
と呼ばれる ConfigProvider
の実装を提供しています。たとえば、構成プロパティにシークレットを設定するのではなく、ローカルファイルにシークレットを保存して、コネクター構成の変数を使用することができます。コネクターが起動されると、Connect は、ファイル構成プロバイダーを使用して変数を解決し、実際のシークレットに置き換えます。このようにすることで、コネクターの構成が Connect REST API で保持され、共有される場合に、コネクターの構成にシークレットを含めないようにすることができます。
重要
Connect クラスター内の すべてのワーカー は、構成プロバイダーを参照するすべての変数によって参照されるファイルにアクセスできる必要があります。
Variables that use FileConfigProvider
should be in the form
${provider:[path:]key}
. The path is the fully-qualified path of the
property file on a Connect worker; the key is the name of the key within
that property file. Note that the FileConfigProvider
supports reading any
file, where the path (and property key in that file) is specified in each
variable. When Connect resolves one of these variables, it will read the
properties file, extract the value for the corresponding key, and replace the
whole variable with that value.
For example:
The following shows a JDBC connector configuration with properties that you do not want exposed:
connection.url=jdbc:oracle:thin:@myhost:1521:orcl
connection.user=scott
connection.password=<my-secret-password>
Using FileConfigProvider
, you store these secrets in a separate file
accessible to each Connect worker. For this example, the separate file you
create is named /opt/connect-secrets.properties
. Example properties you
include in this file are shown below. Note the additional properties for another
connector (other-connector
).
connector-url=jdbc:oracle:thin:@myhost:1521:orcl
connector-username=<username>
connector-password=<jdbc-password>
other-connector-url=jdbc:oracle:thin:@myhost:1521:orcl
other-connector-username=<username>
other-connector-password=<other-password>
Then, you add the properties config.providers=file
and
FileConfigProvider
to all Connect workers with connectors that require
these secrets. The following example worker configuration shows these entries:
# Additional properties added to the worker configuration
config.providers=file
config.providers.file.class=org.apache.kafka.common.config.provider.FileConfigProvider
You can now add variables in the JDBC connector configuration that point to
/opt/connect-secrets.properties
. For example:
# Additional properties added to the connector configuration
connection.url=${file:/opt/connect-secrets.properties:connector-url}
connection.user=${file:/opt/connect-secrets.properties:connector-username}
connection.password=${file:/opt/connect-secrets.properties:connector-password}
You can also add variables to the other connector configuration that point to the same /opt/connect-secrets.properties
file.
# Additional properties added to another connector configuration
connection.url=${file:/opt/connect-secrets.properties:other-connector-url}
connection.user=${file:/opt/connect-secrets.properties:other-connector-username}
connection.password=${file:/opt/connect-secrets.properties:other-connector-password}
InternalSecretConfigProvider¶
Confluent Platform provides another implementation of ConfigProvider
named
InternalSecretConfigProvider
which is used with the Connect Secret
Registry. The Secret Registry is a secret serving
layer that enables Connect to store encrypted Connect credentials in a
topic exposed through a REST API. This eliminates any unencrypted credentials
being located in the actual connector configuration. The following example shows how InternalSecretConfigProvider
is configured in the worker configuration file:
### Secret Provider
config.providers=secret
config.providers.secret.class=io.confluent.connect.secretregistry.rbac.config.provider.InternalSecretConfigProvider
config.providers.secret.param.master.encryption.key=<encryption key>
config.providers.secret.param.kafkastore.bootstrap.servers=SASL_PLAINTEXT://<Kafka broker URLs>
config.providers.secret.param.kafkastore.security.protocol=SASL_PLAINTEXT
config.providers.secret.param.kafkastore.sasl.mechanism=OAUTHBEARER
config.providers.secret.param.kafkastore.sasl.login.callback.handler.class=io.confluent.kafka.clients.plugins.auth.token.TokenUserLoginCallbackHandler
config.providers.secret.param.kafkastore.sasl.jaas.config=org.apache.kafka.common.security.oauthbearer.OAuthBearerLoginModule required \
username="<service-principal-username>" \
password="<service-principal-password>" \
metadataServerUrls="<metadata server URLs>";
Using Secrets with a JSON Connector Configuration¶
Use the following steps to use encrypted secrets for a connector JSON configuration.
Create the
security.properties
file to store the master encryption key. You can use an existingsecurity.properties
file if the Secrets feature is already enabled for the Confluent Platform environment. Note that secrets are managed using confluent secret commands.If you created a new
security.properties
file, create a directory to store the file. For example:mkdir /usr/secrets
Choose a location for the secrets file on the local host and not a location where Confluent Platform services run. The secrets file contains encrypted secrets for the master encryption key, data encryption key, configuration parameters, and metadata such as the cipher used for encryption.
Generate the master encryption key based on a passphrase. For example:
confluent secret master-key generate \ --local-secrets-file /usr/secrets/security.properties \ --passphrase @<passphrase.txt>
Save the master key. It cannot be retrieved later.
+------------+----------------------------------------------+ | Master Key | abc123def456ghi789JKLMNOP012346Qrst789123ab= | +------------+----------------------------------------------+
Export the master key using the following environment variable. You can also add the master key to a bash script. If the master key is not exported, any subsequent secret commands will fail.
export CONFLUENT_SECURITY_MASTER_KEY=abc123def456ghi789JKLMNOP012346Qrst789123ab=
Create a working properties file that contains only the properties you want to encrypt. The name of the file is not important. It's used to create the property key and encrypted secret added to the
security.properties
file. For example, createmy-jdbc-connector.properties
with the following secret:database.password=super-secret
Encrypt the properties in the working
my-jdbc-connector.properties
file.confluent secret file encrypt --config-file my-jdbc-connect.properties \ --local-secrets-file /usr/secrets/security.properties \ --remote-secrets-file /usr/secrets/security.properties \ --config "database.password"
This command updates the
/usr/secrets/security.properties
file with the encrypteddatabase.password
. For example:my-jdbc-connector.properties/database.password = ENC[AES/CBC/PKCS5Padding,data:CUpHh5lRDfIfqaL49V3iGw==,iv:vPBmPkctA+yYGVQuOFmQJw==,type:str]
You use the property key (that is,
my-jdbc-connector.properties/database.password
) in the connector configuration. The working properties filemy-jdbc-connector.properties
file can be discarded.Distribute the updated
security.properties
file to all Connect worker nodes where you want the connector to run. This assumes that Connect is configured and restarted using the master password.Add the variable in place of the secret to the JSON connector configuration file. The variable is in the form
{{$ {securepass:<path-to-secret-file>:<secret-key>} }}
, where<path-to-secret-file>
is the path to the sharedsecurity.properties
file containing the encrypted secret. For example:{ "name": "my-jdbc-connector", "config": { "connector.class": "io.confluent.connect.jdbc.JdbcSourceConnector", "connection.url": "jdbc:oracle:thin:@//xx.xx.xx.xx:xxxx/xe", "connection.user": "<username>", "connection.password": "${securepass:/usr/secrets/security.properties:my-jdbc-connector.properties/database.password}", "mode": "bulk", "query": "select * from Foo" } }
You can use any number of secrets in the connector configuration file. The variables are generally set to the entire configuration property value as shown in the example configuration above.
Enable externalized secrets in the distributed Connect worker properties.
### enable Externalized Secrets ### config.providers=securepass config.providers.securepass.class=io.confluent.kafka.security.config.provider.SecurePassConfigProvider
Deploy the connector using the JSON configuration file.
When Connect starts the connector, it resolves the variable by looking up the secret with a matching secret key in the secrets file, uses the master key to decrypt the secret, replaces the variable(s) with the decrypted secret(s), and passes this configuration to the connector.
HTTP または HTTPS を使用するための Connect REST API の構成¶
Kafka Connect では、デフォルトで HTTP を使用して REST API の呼び出しができます。HTTP と HTTPS のどちらか、または両方を使用できるように、Connect を構成することもできます。
リスナーの構成パラメーターで、Kafka Connect で使用されるプロトコルを指定します。この構成には、リスナーのリストを protocol://host:port,protocol2://host2:port2
の形式で含める必要があります。以下に例を示します。
listeners=http://localhost:8080,https://localhost:8443
デフォルトでは、リスナーが指定されていない場合、REST サーバーは HTTP プロトコルを使用して、ポート 8083 で実行されます。HTTPS を使用する場合は、構成に SSL の構成を含める必要があります。デフォルトでは、ssl.*
の設定が使用されます。listeners.https
プレフィックスを使用することにより、REST API では、Kafka ブローカーとは異なる構成を使用できます。listeners.https
プレフィックスを使用する場合、ssl.*
オプションは無視されます。
REST API で HTTPS を構成するには、以下のフィールドを使用できます。
ssl.keystore.location
ssl.keystore.password
ssl.keystore.type
ssl.key.password
ssl.truststore.location
ssl.truststore.password
ssl.truststore.type
ssl.enabled.protocols
ssl.provider
ssl.protocol
ssl.cipher.suites
ssl.keymanager.algorithm
ssl.secure.random.implementation
ssl.trustmanager.algorithm
ssl.endpoint.identification.algorithm
ssl.client.auth
詳細については、「分散ワーカーの構成」を参照してください。
REST API は、Kafka Connect のモニタリングと管理、および Kafka Connect のクラスター間通信に使用されます。フォロワーノードの REST API で受信されたリクエストは、リーダーノードの REST API に転送されます。URI ホストが、リッスンしている URI と異なる場合は、rest.advertised.host.name
、rest.advertised.port
、rest.advertised.listener
構成オプションを使用して URI を変更できます。この URI は、フォロワーノードがリーダーと接続するために使用されます。
HTTP リスナーと HTTPS リスナーの両方を使用する場合は、rest.advertised.listener
オプションを使用して、クラスター間通信に使用するリスナーを指定できます。ノード間の通信に HTTPS を使用する場合は、同じ ssl.*
または listeners.https
オプションを使用して HTTPS クライアントを構成できます。
現在サポートされている REST API のエンドポイントは以下のとおりです。
GET /connectors
- アクティブなコネクターのリストを返します。POST /connectors
- 新しいコネクターを作成します。リクエスト本文は、string 型の名前フィールドと、コネクターの構成パラメーターが指定されたオブジェクト構成フィールドを含む JSON オブジェクトである必要があります。GET /connectors/{name}
- 指定したコネクターの情報を取得します。GET /connectors/{name}/config
- 指定したコネクターの構成パラメーターを取得します。PUT /connectors/{name}/config
- 指定したコネクターの構成パラメーターを更新します。GET /connectors/{name}/status
- コネクターの現在のステータスを取得します。このステータスには、実行中、エラー発生または一時停止中に加え、どのワーカーに割り当てられているか、エラーが発生していた場合はエラー情報、そのコネクターのすべてのタスクの状態が含まれます。GET /connectors/{name}/tasks
- コネクターで現在実行中のタスクのリストを取得します。GET /connectors/{name}/tasks/{taskid}/status
- タスクの現在のステータスを取得します。このステータスには、実行中、エラー発生または一時停止中に加え、どのワーカーに割り当てられているか、エラーが発生していた場合はエラー情報が含まれます。PUT /connectors/{name}/pause
- コネクターおよびそのコネクターのタスクを一時停止します。コネクターが再開されるまで、メッセージ処理は停止します。PUT /connectors/{name}/resume
- 一時停止しているコネクターを再開させます。コネクターが一時停止していない場合は、何もしません。POST /connectors/{name}/restart
- コネクターを再起動します。通常は、コネクターで障害が発生した場合に使用します。POST /connectors/{name}/tasks/{taskId}/restart
- 個々のタスクを再開します。通常は、コネクターで障害が発生した場合に使用します。DELETE /connectors/{name}
- コネクターを削除します。すべてのタスクを中止し、構成を削除します。
また、Kafka Connect REST API を使用して、コネクタープラグインの情報を取得することもできます。
GET /connector-plugins
- Kafka Connect クラスターにインストールされているコネクタープラグインのリストを返します。この API では、リクエストを処理するワーカー上のコネクターのみがチェックされます。そのため、新しいコネクター JAR を追加した場合のローリングアップグレード中などには、結果に不整合が生じることがあります。PUT /connector-plugins/{connector-type}/config/validate
- 指定された構成値を、構成の定義と比較して検証します。この API は、構成の項目ごとに検証を行い、推奨値と、検証中のエラーメッセージを返します。
詳細については、「REST API」を参照してください。
HTTPS エンドポイントを使用するように Kafka Connect を構成し、Confluent Control Center で接続する場合のデモについては、「Confluent Platform デモ」を参照してください。