Connect REST インターフェイス¶
Kafka Connect は、サービスとして実行できるように設計されており、コネクターを管理するための REST API もサポートされています。デフォルトでは、このサービスはポート 8083
で実行されます。分散モードで実行する場合、REST API はクラスターに対するプライマリインターフェイスになります。リクエストは、どのクラスターメンバーに対しても行うことができます。REST API は必要に応じてリクエストを自動的に転送します。
コマンドラインでコネクターを送信するだけでスタンドアロンモードを使用することができますが、それによって REST インターフェイスも実行されます。これは、処理を停止せずにステータス情報を取得したり、コネクターを追加したり削除したりするのに便利です。
現在、トップレベルのリソースは、connector
および connector-plugins
です。connector
のサブリソースでは、構成設定およびタスクの一覧が表示され、connector-plugins
のサブリソースでは、構成の検証と推奨が行われます。
connector
のリソースの変更、更新、または削除を行う場合、リクエストをリーダーに転送する必要が生じる可能性があります。ワーカーグループのバランス調整が行われている間は、そのバランス調整によりリーダーが変更される可能性があるため、Connect からステータスコード 409 が返されます。
注釈
For common activities that you can do using the REST API and curl, see Common REST examples.
コンテンツタイプ¶
現在、REST API では、リクエストと応答の両方のエンティティのコンテンツタイプとして application/json
のみがサポートされています。リクエストでは、HTTP の Accept
ヘッダーで、想定される応答のコンテンツタイプを指定する必要があります。
Accept: application/json
また、リクエストエンティティを含める場合は、Content-Type
ヘッダーでリクエストエンティティのコンテンツタイプを指定する必要があります。
Content-Type: application/json
ログレベル¶
Connect API エンドポイントを使用すると、ログレベルの確認やログレベルの変更ができます。詳細については、「Connect API を使用したログレベルの変更」を参照してください。
ステータスとエラー¶
REST API は、標準に準拠した HTTP ステータスを返します。クライアントでは、特に、応答エンティティを解析して使用する前に、HTTP ステータスを確認する必要があります。現在、API ではリダイレクト(300 番台のステータス)は使用されません。ただし、これらのコードの使用は、将来の使用に向けて予約されているため、クライアントではこれらのコードに対処する必要があります。
可能な場合は、すべてのエンドポイントですべてのエラー(ステータスコード 400 番台、500 番台)について標準のエラーメッセージフォーマットを使用します。たとえば、リクエストエンティティの必須フィールドに漏れがあると、次のような応答が生成されることがあります。
HTTP/1.1 422 Unprocessable Entity
Content-Type: application/json
{
"error_code": 422,
"message": "config may not be empty"
}
Connect クラスター¶
-
GET
/
¶ REST リクエストに対応する Connect ワーカーのバージョン、ソースコードの git コミット ID、ワーカーが接続されている Kafka クラスターの ID を取得するトップレベル(ルート)リクエスト。
レスポンスの JSON オブジェクト: - version (string) -- Connect ワーカーのバージョン
- commit ID (string) -- git コミット ID
- cluster ID (string) -- Kafka クラスター ID
リクエストの例:
GET / HTTP/1.1 Host: connect.example.com Accept: application/json
応答の例 :
HTTP/1.1 200 OK Content-Type: application/json { "version":"5.5.0", "commit":"e5741b90cde98052", "kafka_cluster_id":"I4ZmrWqfT2e-upky_4fdPA" }
コネクター¶
-
GET
/connectors
¶ アクティブなコネクターのリストを取得します。
レスポンスの JSON オブジェクト: - connectors (array) -- コネクター名のリスト
リクエストの例:
GET /connectors HTTP/1.1 Host: connect.example.com Accept: application/json
応答の例 :
HTTP/1.1 200 OK Content-Type: application/json ["my-jdbc-source", "my-hdfs-sink"]
-
POST
/connectors
¶ 新しいコネクターを作成し、作成が成功した場合は現在のコネクター情報を返します。バランス調整を実行中の場合は、
409 (Conflict)
を返します。リクエストの JSON オブジェクト: - name (string) -- 作成するコネクターの名前
- config (map) -- コネクターの構成パラメーター。すべての値を文字列にする必要があります。
レスポンスの JSON オブジェクト: - name (string) -- 作成されたコネクターの名前
- config (map) -- コネクターの構成パラメーター。
- tasks (array) -- コネクターによって生成されたアクティブなタスクのリスト
- tasks[i].connector (string) -- タスクが属するコネクターの名前
- tasks[i].task (int) -- コネクター内のタスク ID
リクエストの例:
POST /connectors HTTP/1.1 Host: connect.example.com Content-Type: application/json Accept: application/json { "name": "hdfs-sink-connector", "config": { "connector.class": "io.confluent.connect.hdfs.HdfsSinkConnector", "tasks.max": "10", "topics": "test-topic", "hdfs.url": "hdfs://fakehost:9000", "hadoop.conf.dir": "/opt/hadoop/conf", "hadoop.home": "/opt/hadoop", "flush.size": "100", "rotate.interval.ms": "1000" } }
応答の例 :
HTTP/1.1 201 Created Content-Type: application/json { "name": "hdfs-sink-connector", "config": { "connector.class": "io.confluent.connect.hdfs.HdfsSinkConnector", "tasks.max": "10", "topics": "test-topic", "hdfs.url": "hdfs://fakehost:9000", "hadoop.conf.dir": "/opt/hadoop/conf", "hadoop.home": "/opt/hadoop", "flush.size": "100", "rotate.interval.ms": "1000" }, "tasks": [ { "connector": "hdfs-sink-connector", "task": 1 }, { "connector": "hdfs-sink-connector", "task": 2 }, { "connector": "hdfs-sink-connector", "task": 3 } ] }
-
GET
/connectors/
(string: name)¶ コネクターの情報を取得します。
レスポンスの JSON オブジェクト: - name (string) -- 作成されたコネクターの名前
- config (map) -- コネクターの構成パラメーター。
- tasks (array) -- コネクターによって生成されたアクティブなタスクのリスト
- tasks[i].connector (string) -- タスクが属するコネクターの名前
- tasks[i].task (int) -- コネクター内のタスク ID
リクエストの例:
GET /connectors/hdfs-sink-connector HTTP/1.1 Host: connect.example.com Accept: application/json
応答の例 :
HTTP/1.1 200 OK Content-Type: application/json { "name": "hdfs-sink-connector", "config": { "connector.class": "io.confluent.connect.hdfs.HdfsSinkConnector", "tasks.max": "10", "topics": "test-topic", "hdfs.url": "hdfs://fakehost:9000", "hadoop.conf.dir": "/opt/hadoop/conf", "hadoop.home": "/opt/hadoop", "flush.size": "100", "rotate.interval.ms": "1000" }, "tasks": [ { "connector": "hdfs-sink-connector", "task": 1 }, { "connector": "hdfs-sink-connector", "task": 2 }, { "connector": "hdfs-sink-connector", "task": 3 } ] }
-
GET
/connectors/
(string: name)/config
¶ コネクターの構成を取得します。
レスポンスの JSON オブジェクト: - config (map) -- コネクターの構成パラメーター。
リクエストの例:
GET /connectors/hdfs-sink-connector/config HTTP/1.1 Host: connect.example.com Accept: application/json
応答の例 :
HTTP/1.1 200 OK Content-Type: application/json { "connector.class": "io.confluent.connect.hdfs.HdfsSinkConnector", "tasks.max": "10", "topics": "test-topic", "hdfs.url": "hdfs://fakehost:9000", "hadoop.conf.dir": "/opt/hadoop/conf", "hadoop.home": "/opt/hadoop", "flush.size": "100", "rotate.interval.ms": "1000" }
-
PUT
/connectors/
(string: name)/config
¶ 指定された構成を使用して新しいコネクターを作成するか、既存のコネクターの構成を更新します。変更後のコネクターの情報を返します。バランス調整を実行中の場合は、
409 (Conflict)
を返します。注釈
POST リクエストのようにペイロードを
{"config": {}}
で囲みません。構成は直接指定します。リクエストの JSON オブジェクト: - config (map) -- コネクターの構成パラメーター。すべての値を文字列にする必要があります。
レスポンスの JSON オブジェクト: - name (string) -- 作成されたコネクターの名前
- config (map) -- コネクターの構成パラメーター。
- tasks (array) -- コネクターによって生成されたアクティブなタスクのリスト
- tasks[i].connector (string) -- タスクが属するコネクターの名前
- tasks[i].task (int) -- コネクター内のタスク ID
リクエストの例:
PUT /connectors/hdfs-sink-connector/config HTTP/1.1 Host: connect.example.com Accept: application/json { "connector.class": "io.confluent.connect.hdfs.HdfsSinkConnector", "tasks.max": "10", "topics": "test-topic", "hdfs.url": "hdfs://fakehost:9000", "hadoop.conf.dir": "/opt/hadoop/conf", "hadoop.home": "/opt/hadoop", "flush.size": "100", "rotate.interval.ms": "1000" }
応答の例 :
HTTP/1.1 201 Created Content-Type: application/json { "name": "hdfs-sink-connector", "config": { "connector.class": "io.confluent.connect.hdfs.HdfsSinkConnector", "tasks.max": "10", "topics": "test-topic", "hdfs.url": "hdfs://fakehost:9000", "hadoop.conf.dir": "/opt/hadoop/conf", "hadoop.home": "/opt/hadoop", "flush.size": "100", "rotate.interval.ms": "1000" }, "tasks": [ { "connector": "hdfs-sink-connector", "task": 1 }, { "connector": "hdfs-sink-connector", "task": 2 }, { "connector": "hdfs-sink-connector", "task": 3 } ] }
この例では、
Created
のステータスが返され、コネクターが作成されたことがわかります。構成の更新の場合、ステータスは200 OK
になります。
-
GET
/connectors/
(string: name)/status
¶ コネクターの現在のステータスを取得します。このステータスには、実行中、エラー発生または一時停止中に加え、どのワーカーに割り当てられているか、エラーが発生していた場合はエラー情報、そのコネクターのすべてのタスクの状態が含まれます。
レスポンスの JSON オブジェクト: - name (string) -- コネクターの名前
- connector (map) -- コネクターのステータスを含むマップ
- tasks[i] (map) -- タスクのステータスを含むマップ
リクエストの例:
GET /connectors/hdfs-sink-connector/status HTTP/1.1 Host: connect.example.com
応答の例 :
HTTP/1.1 200 OK { "name": "hdfs-sink-connector", "connector": { "state": "RUNNING", "worker_id": "fakehost:8083" }, "tasks": [ { "id": 0, "state": "RUNNING", "worker_id": "fakehost:8083" }, { "id": 1, "state": "FAILED", "worker_id": "fakehost:8083", "trace": "org.apache.kafka.common.errors.RecordTooLargeException\n" } ] }
-
POST
/connectors/
(string: name)/restart
¶ コネクターを再起動します。バランス調整を実行中の場合は、
409 (Conflict)
を返します。リクエストの例:
POST /connectors/hdfs-sink-connector/restart HTTP/1.1 Host: connect.example.com
応答の例 :
HTTP/1.1 200 OK
重要
このエンドポイントに対して呼び出しを行っても、タスクは再開されません。タスクを再開する方法については、「タスクの再開」を参照してください。
-
PUT
/connectors/
(string: name)/pause
¶ コネクターおよびそのコネクターのタスクを一時停止します。コネクターが再開されるまで、メッセージ処理は停止します。この呼び出しは非同期であり、タスクはこれと同時に
PAUSED
ステートに移行しません。リクエストの例:
PUT /connectors/hdfs-sink-connector/pause HTTP/1.1 Host: connect.example.com
応答の例 :
HTTP/1.1 202 Accepted
-
PUT
/connectors/
(string: name)/resume
¶ 一時停止しているコネクターを再開させます。コネクターが一時停止していない場合は、何もしません。この呼び出しは非同期であり、タスクはこれと同時に
RUNNING
ステートに移行しません。リクエストの例:
PUT /connectors/hdfs-sink-connector/resume HTTP/1.1 Host: connect.example.com
応答の例 :
HTTP/1.1 202 Accepted
-
DELETE
/connectors/
(string: name)/
¶ コネクターを削除します。すべてのタスクを中止し、構成を削除します。バランス調整を実行中の場合は、
409 (Conflict)
を返します。リクエストの例:
DELETE /connectors/hdfs-sink-connector HTTP/1.1 Host: connect.example.com
応答の例 :
HTTP/1.1 204 No Content
タスク¶
-
GET
/connectors/
(string: name)/tasks
¶ コネクターで現在実行中のタスクのリストを取得します。
レスポンスの JSON オブジェクト: - tasks (array) -- コネクターによって作成されたアクティブなタスクの構成のリスト
- tasks[i].id (string) -- タスクの ID
- tasks[i].id.connector (string) -- タスクが属するコネクターの名前
- tasks[i].id.task (int) -- コネクター内のタスク ID
- tasks[i].config (map) -- タスクの構成パラメーター
リクエストの例:
GET /connectors/hdfs-sink-connector/tasks HTTP/1.1 Host: connect.example.com
応答の例 :
HTTP/1.1 200 OK [ { "id": { "connector": "hdfs-sink-connector", "task": 0 }, { "task.class": "io.confluent.connect.hdfs.HdfsSinkTask", "topics": "test-topic", "hdfs.url": "hdfs://fakehost:9000", "hadoop.conf.dir": "/opt/hadoop/conf", "hadoop.home": "/opt/hadoop", "flush.size": "100", "rotate.interval.ms": "1000" } ]
-
GET
/connectors/
(string: name)/tasks/
(int: taskid)/status
¶ タスクのステータスを取得します。
リクエストの例:
GET /connectors/hdfs-sink-connector/tasks/1/status HTTP/1.1 Host: connect.example.com
応答の例 :
HTTP/1.1 200 OK {"state":"RUNNING","id":1,"worker_id":"192.168.86.101:8083"}
-
POST
/connectors/
(string: name)/tasks/
(int: taskid)/restart
¶ 1 つのタスクを再開します。
リクエストの例:
POST /connectors/hdfs-sink-connector/tasks/1/restart HTTP/1.1 Host: connect.example.com
応答の例 :
HTTP/1.1 200 OK
トピック¶
-
GET
/connectors/
(string: name)/topics
¶ コネクターのトピック名のリストを返します。トピックが返される順番に決まりはありません。連続して呼び出しを行うと、同じトピック名が違う順番で返されることがあります。このリクエストは、コネクターの実行状況と関係なく処理が行われます。コネクターにアクティブなトピックが存在しない場合も、コネクター自体が存在しない場合も、トピックなしという結果が返されます。
レスポンスの JSON オブジェクト: - topics (array) -- コネクターの作成以降、またはコネクターのアクティブなトピックが最後にリセットされて以降、コネクターで使用されている一連のトピック名
リクエストの例:
GET /connectors/hdfs-sink-connector/topics HTTP/1.1 Host: connect.example.com
応答の例 :
HTTP/1.1 200 OK { "hdfs-sink-connector": { "topics": [ "test-topic-1", "test-topic-2", "test-topic-3", ] } }
-
PUT
/connectors/
(string: name)/topics/reset
¶ コネクターの作成以降、またはコネクターのアクティブなトピックが最後にリセットされて以降、コネクターで使用されている一連のトピック名をリセットします。
リクエストの例:
PUT /connectors/hdfs-sink-connector/topics/reset HTTP/1.1 Host: connect.example.com
応答の例 :
HTTP/1.1 200 OK
コネクタープラグイン¶
-
GET
/connector-plugins/
¶ Kafka Connect クラスターにインストールされているコネクタープラグインのリストを返します。この API では、リクエストを処理するワーカー上のコネクターのみがチェックされます。そのため、新しいコネクター JAR を追加した場合のローリングアップグレード中などには、結果に不整合が生じることがあります。
レスポンスの JSON オブジェクト: - class (string) -- コネクタークラスの名前
リクエストの例:
GET /connector-plugins/ HTTP/1.1 Host: connect.example.com
応答の例 :
HTTP/1.1 200 OK [ { "class": "io.confluent.connect.hdfs.HdfsSinkConnector" }, { "class": "io.confluent.connect.jdbc.JdbcSourceConnector" } ]
-
PUT
/connector-plugins/
(string: name)/config/validate
¶ 指定された構成値を、構成の定義と比較して検証します。この API は、構成の項目ごとに検証を行い、推奨値と、検証中のエラーメッセージを返します。
リクエストの JSON オブジェクト: - config (map) -- コネクターの構成パラメーター。すべての値を文字列にする必要があります。
レスポンスの JSON オブジェクト: - name (string) -- コネクタープラグインのクラス名
- error_count (int) -- 構成の検証中に発生したエラーの総数
- groups (array) -- 構成の定義で使用されているグループのリスト
- configs[i].definition (map) -- 名前、型、重要度など、コネクタープラグインの構成の定義
- configs[i].value (map) -- 名前、値、推奨値など、構成の現在の値
リクエストの例:
PUT /connector-plugins/FileStreamSinkConnector/config/validate/ HTTP/1.1 Host: connect.example.com Accept: application/json { "connector.class": "org.apache.kafka.connect.file.FileStreamSinkConnector", "tasks.max": "1", "topics": "test-topic" }
応答の例 :
HTTP/1.1 200 OK { "name": "FileStreamSinkConnector", "error_count": 1, "groups": [ "Common" ], "configs": [ { "definition": { "name": "topics", "type": "LIST", "required": false, "default_value": "", "importance": "HIGH", "documentation": "", "group": "Common", "width": "LONG", "display_name": "Topics", "dependents": [], "order": 4 }, "value": { "name": "topics", "value": "test-topic", "recommended_values": [], "errors": [], "visible": true } }, { "definition": { "name": "file", "type": "STRING", "required": true, "default_value": "", "importance": "HIGH", "documentation": "Destination filename.", "group": null, "width": "NONE", "display_name": "file", "dependents": [], "order": -1 }, "value": { "name": "file", "value": null, "recommended_values": [], "errors": [ "Missing required configuration \"file\" which has no default value." ], "visible": true } }, { "definition": { "name": "name", "type": "STRING", "required": true, "default_value": "", "importance": "HIGH", "documentation": "Globally unique name to use for this connector.", "group": "Common", "width": "MEDIUM", "display_name": "Connector name", "dependents": [], "order": 1 }, "value": { "name": "name", "value": "test", "recommended_values": [], "errors": [], "visible": true } }, { "definition": { "name": "tasks.max", "type": "INT", "required": false, "default_value": "1", "importance": "HIGH", "documentation": "Maximum number of tasks to use for this connector.", "group": "Common", "width": "SHORT", "display_name": "Tasks max", "dependents": [], "order": 3 }, "value": { "name": "tasks.max", "value": "1", "recommended_values": [], "errors": [], "visible": true } }, { "definition": { "name": "connector.class", "type": "STRING", "required": true, "default_value": "", "importance": "HIGH", "documentation": "Name or alias of the class for this connector. Must be a subclass of org.apache.kafka.connect.connector.Connector. If the connector is org.apache.kafka.connect.file.FileStreamSinkConnector, you can either specify this full name, or use \"FileStreamSink\" or \"FileStreamSinkConnector\" to make the configuration a bit shorter", "group": "Common", "width": "LONG", "display_name": "Connector class", "dependents": [], "order": 2 }, "value": { "name": "connector.class", "value": "org.apache.kafka.connect.file.FileStreamSinkConnector", "recommended_values": [], "errors": [], "visible": true } } ] }