重要
このページの日本語コンテンツは古くなっている可能性があります。最新の英語版コンテンツをご覧になるには、こちらをクリックしてください。
Confluent Cloud API for Connect¶
フルマネージド型コネクター用の Confluent Cloud API により、Confluent Cloud API を使用してコネクターを操作することができます。これはクエリ可能な HTTP API です。たとえば、JSON で記述したクエリを POST すると、クエリで指定したコネクターの情報が返されます。Confluent Cloud API の使用方法について詳しくは、以下のサンプルを参考にしてください。
ちなみに
このドキュメントのサンプルコマンドでは、Confluent Cloud API のバージョン 1 が使用されています。API の最新バージョン、Connect API リクエストの網羅的なリスト、すべての応答コード、その他の詳細については、Confluent Cloud API 開発者ドキュメント を参照してください。
例¶
- 前提条件
Confluent Cloud へのアクセスを許可されていること。
Confluent CLI :ccloud-cli:` がインストールされ、構成されている|install.html` こと。コマンド例では Confluent CLI バージョン 2 を使用しています。詳細については、「Confluent CLI v2 への移行 <https://docs.confluent.io/confluent-cli/current/migrate.html#cli-migrate>`__」を参照してください。
このドキュメントの API リクエストのサンプルを使用するには、cURL と jq がインストールされていること。
Confluent Cloud API で認証を行うための Confluent Cloud API キー。すべての API リクエストに、base64 でエンコードされたリソースキーとシークレットが含まれている必要があります。キーを作成し、base64 でエンコードするには、以下の手順を実行します。
アカウントにログインします。
confluent login
cloud
リソース用の API キーとシークレットを作成します。confluent api-key create --resource cloud
重要
Confluent Cloud API を使用するには、
--resource cloud
用の Confluent Cloud API キーを作成する必要があります。Confluent Cloud クラスター用に作成した Kafka クラスター API キー(--resource <cluster-ID>
)を使用すると、API リクエストの実行時に認証エラーが発生します。Confluent Cloud API では、基本アクセス認証 が使用されます。API キーとシークレットを使用するには、API とシークレットを
Authorization: Basic <Base64-credentials>
の形式でヘッダーとして送信します。この形式は curl のサンプルで示されています。base64 でエンコードされた API キーペアを表示するには、次のコマンドを入力します。echo -n "<api-key>:<secret>" | base64
以下に例を示します。
echo -n "ABCDEFGPZROVP:z6yyH3LEEWdrAAamfue9mIeTAyocCMjO/oSKzg0UMoXA0x3CXjVglPJHYC/" | base64 HIJKLMNOPYRlBaUk9TNjVWUDp6Nnl5SDNMRUVXQmtQN1lkckFBYW1mdWU5bUllVEF5b2NDTWpPL29TS3pnMFVNb1hBMHgzQ1hq
base64 でエンコードされたこの結果を以下のサンプルで使用します。
注釈
サンプルでは、curl コマンドを使用して Confluent Cloud API を操作する方法を示します。
コネクターのリストの取得¶
クラスター内のコネクターのリストを取得するには、次の API リクエストを使用します。実行が成功すると、コネクターのリストが返されます。
curl --request GET 'https://api.confluent.cloud/connect/v1/environments/<my-environment-ID>/clusters/<my-cluster-ID>/connectors' /
--header 'authorization: Basic <base64-encoded-key-and-secret>' | jq
以下に例を示します。
curl --request GET 'https://api.confluent.cloud/connect/v1/environments/env-*****/clusters/lkc-*****/connectors' \
--header 'authorization: Basic HIJKLMNOPYRlB9TNjVWUDp6Nnl5SDNMRUVXQmtQN1dWU5bUllVEF5b2NDTWpPL29TS3pnMFVNb1hBMHgzQ1hqVmdsUEpIWUMv' | jq
コネクターのリストが出力として表示されます。以下に例を示します。
[
"DatagenSourceConnector_0",
"S3_SINKConnector_0"
]
コネクターの作成¶
Confluent Cloud API を使用してコネクターを使用する場合、ペイロードとして使用するコネクター構成の JSON ファイルを作成するか、curl コマンドでコネクター構成の JSON を使用することができます。実行が成功すると、コネクター構成が返されます。
注釈
以下の点を検討してください。
- Confluent Cloud Kafka クラスター API キーとシークレットを JSON のコネクター構成で使用します。
--resource cloud
API キーとシークレットを API リクエストで使用します。 - 必須のコネクター構成プロパティをすべて指定する必要があります。各コネクターの必須のコネクタープロパティについては、個々の Cloud コネクターのドキュメント を参照してください。
以下の curl コマンドのサンプルは、ペイロードのコネクター構成を指定する 2 つの方法を示しています。
未加工の JSON のペイロードのサンプル
次のコマンドでは、curl コマンドでコネクター構成を使用しています。次のサンプルでは、Amazon S3 Sink Connector の構成を示しています。
curl --request POST 'https://api.confluent.cloud/connect/v1/environments/<my-environment-ID>/clusters/<my-cluster-ID>/connectors' \
--header 'authorization: Basic <base64-encoded-key-and-secret>' \
--header 'Content-Type: application/json' \
--data-raw '{
"name": "S3_SINKConnector_0",
"config": {
"topics": "pageviews",
"input.data.format": "AVRO",
"connector.class": "S3_SINK",
"name": "S3_SINKConnector_0",
"kafka.api.key": "<my-kafka-api-key>",
"kafka.api.secret" : "<my-kafka-api-secret>",
"aws.access.key.id" : "<my-aws-access-key>",
"aws.secret.access.key": "<my-aws-secret>",
"s3.bucket.name": "<my-s3-bucket-name>",
"output.data.format": "AVRO",
"time.interval": "HOURLY",
"flush.size": "1000",
"tasks.max": "1"
}
}' | jq
JSON ファイルのペイロードのサンプル
次のコマンドでは、JSON ファイルでコネクター構成をアップロードします。
curl --request POST 'https://api.confluent.cloud/connect/v1/environments/<my-environment-ID>/clusters/<my-cluster-ID>/connectors' \
--header 'authorization: Basic <base64-encoded-key-and-secret>' \
--header 'Content-Type: application/json' \
--data "@<my-connector-config>.json" | jq
注釈
それぞれのフルマネージド型コネクターのドキュメント内の CLI セクションに、ペイロードファイルで使用する正しい形式の JSON のサンプルがあります。たとえば、Amazon S3 Sink Connector ドキュメント に JSON のサンプルがあります。
次のサンプルコマンドでは、my-s3-connector.json
という名前の JSON ファイルがアップロードされます。このファイルは、Amazon S3 コネクターを作成するために使用されます。
curl --request POST 'https://api.confluent.cloud/connect/v1/environments/env-*****/clusters/lkc-*****/connectors' \
--header 'authorization: Basic RUVSTEVYRlBaUk9TNjVWUDp6Nnl51lkckFBYW1mdWU5bUllVEF5b2NDTWpPL29TS3pnMFVNb1hBMHgzQ1hqVmdsUEpIWUMv' \
--header 'Content-Type: application/json' \
--data "@s3-connector-config.json" | jq
コネクター構成が出力として表示されます。以下に例を示します。
% Total % Received % Xferd Average Speed Time Time Time Current
Dload Upload Total Spent Left Speed
100 1467 100 878 100 589 35 23 0:00:25 0:00:24 0:00:01 214
{
"name": "S3_SINKConnector_0",
"type": "sink",
"config": {
"aws.access.key.id": "****************",
"aws.secret.access.key": "****************",
"cloud.environment": "prod",
"cloud.provider": "aws",
"connector.class": "S3_SINK",
"flush.size": "1000",
"input.data.format": "AVRO",
"kafka.api.key": "****************",
"kafka.api.secret": "****************",
"kafka.dedicated": "false",
"kafka.endpoint": "SASL_SSL://pkc-*****.us-west-2.aws.confluent.cloud:9092",
"kafka.region": "us-west-2",
"name": "S3_SINKConnector_0",
"output.data.format": "AVRO",
"s3.bucket.name": "datagen-to-s3",
"schema.registry.url": "https://psrc-*****.us-east-2.aws.confluent.cloud",
"tasks.max": "1",
"time.interval": "HOURLY",
"topics": "pageviews"
},
"tasks": []
}
ちなみに
このドキュメントのサンプルコマンドでは、Confluent Cloud API のバージョン 1 が使用されています。API の最新バージョン、Connect API リクエストの網羅的なリスト、すべての応答コード、その他の詳細については、Confluent Cloud API 開発者ドキュメント を参照してください。
コネクター構成の読み取り¶
コネクター構成を読み取るには、次の API リクエストを使用します。実行が成功すると、コネクター構成が返されます。
curl --request GET 'https://api.confluent.cloud/connect/v1/environments/<my-environment-ID>/clusters/<my-cluster-ID>/connectors/<my-connector-name>/config' \
--header 'authorization: Basic <base64-encoded-key-and-secret>' | jq
以下に例を示します。
curl --request GET 'https://api.confluent.cloud/connect/v1/environments/env-****/clusters/lkc-*****/connectors/S3_SINKConnector_0/config' \
--header 'authorization: Basic HIJKLMNOPYRlB9TNjVWUVXQmtQN1lkckFBYW1m5b2NDTWpPL29TS3pnMFVNb1hBMHgzQ1hqVmdsUEpIWUMv' | jq
コネクター構成が出力として表示されます。以下に例を示します。
% Total % Received % Xferd Average Speed Time Time Time Current
Dload Upload Total Spent Left Speed
100 814 100 814 0 0 1477 0 --:--:-- --:--:-- --:--:-- 1477
{
"aws.access.key.id": "****************",
"aws.secret.access.key": "****************",
"cloud.environment": "prod",
"cloud.provider": "aws",
"connector.class": "S3_SINK",
"flush.size": "1000",
"input.data.format": "AVRO",
"kafka.api.key": "****************",
"kafka.api.secret": "****************",
"kafka.dedicated": "false",
"kafka.endpoint": "SASL_SSL://pkc-****.us-west-2.aws.confluent.cloud:9092",
"kafka.region": "us-west-2",
"name": "S3_SINKConnector_0",
"output.data.format": "AVRO",
"s3.bucket.name": "datagen-to-s3",
"schema.registry.url": "https://psrc-*****.us-east-2.aws.confluent.cloud",
"tasks.max": "1",
"time.interval": "HOURLY",
"topics": "pageviews"
}
ちなみに
このドキュメントのサンプルコマンドでは、Confluent Cloud API のバージョン 1 が使用されています。API の最新バージョン、Connect API リクエストの網羅的なリスト、すべての応答コード、その他の詳細については、Confluent Cloud API 開発者ドキュメント を参照してください。
コネクター構成のアップデート¶
Confluent Cloud API を使用してコネクター構成をアップデートする場合、ペイロードとして使用するコネクター構成の JSON ファイルをアップデートするか、curl コマンドでアップデートされたコネクター構成の JSON を使用することができます。2 種類の curl コマンドを構成する方法を示すサンプルについては、「コネクターの作成」を参照してください。
注釈
以下の点を検討してください。
- Confluent Cloud Kafka クラスター API キーとシークレットを JSON のコネクター構成で使用します。
--resource cloud
API キーとシークレットを API リクエストで使用します。 - 必須のコネクター構成プロパティをすべて指定する必要があります。各コネクターの必須のコネクタープロパティについては、個々の Cloud コネクターのドキュメント を参照してください。
次の curl コマンドのサンプルは、コネクターのフラッシュ間隔のプロパティを HOURLY から DAILY にアップデートする方法を示しています。実行が成功すると、アップデートされたコネクター構成が返されます。
コネクターでは、すぐに新しい構成の使用が開始されます。
curl --request PUT 'https://api.confluent.cloud/connect/v1/environments/<my-environment-ID>/clusters/<my-cluster-ID>/connectors/<my-connector-name>/config' \
--header 'authorization: Basic <base64-encoded-key-and-secret>' \
--header 'Content-Type: application/json' \
--data-raw '{
"topics": "pageviews",
"input.data.format": "AVRO",
"connector.class": "S3_SINK",
"name": "<my-connector-name>",
"kafka.api.key": "<my-kafka-api-key>",
"kafka.api.secret" : "<my-kafka-api-secret>",
"aws.access.key.id" : "<my-aws-access-key>",
"aws.secret.access.key": "<my-aws-secret>",
"s3.bucket.name": "<my-s3-bucket-name>",
"output.data.format": "AVRO",
"time.interval": "DAILY",
"flush.size": "1000",
"tasks.max": "1"
}' | jq
また、JSON ファイルを指定して次のコマンドを使用して、コネクター構成をアップデートすることもできます。
curl --request PUT https://api.confluent.cloud/connect/v1/environments/<my-environment>/clusters/<my-cluster>/connectors/<my-connector-name>/config \
--header 'authorization: Basic <base64-encoded-key-and-secret>' \
--header 'Content-Type: application/json' \
--data "@s3-sink-update.json" | jq
注釈
それぞれのフルマネージド型コネクターのドキュメント内の CLI セクションに、ペイロードファイルで使用する正しい形式の JSON のサンプルがあります。たとえば、Amazon S3 Sink Connector ドキュメント に JSON のサンプルがあります。
以下に例を示します。
curl --request PUT https://api.confluent.cloud/connect/v1/environments/env-*****/clusters/lkc-*****/connectors/S3_SINKConnector_5/config \
--header 'authorization: Basic ABCDEFGZaNjNPV0QzSlRNUjpHVll3UjmUwK3JRUk5HQi82YkdtZlRCb1lYMDZaSTJEMHNMSnBwalBocjNLN1JORWQ2VWo2' \
--header 'Content-Type: application/json' \
--data "@s3-sink-update.json" | jq
アップデートされたコネクター構成が出力として表示されます。以下に例を示します。
% Total % Received % Xferd Average Speed Time Time Time Current
Dload Upload Total Spent Left Speed
100 1415 100 877 100 538 36 22 0:00:24 0:00:23 0:00:01 205
{
"name": "S3_SINKConnector_5",
"type": "sink",
"config": {
"aws.access.key.id": "****************",
"aws.secret.access.key": "****************",
"cloud.environment": "prod",
"cloud.provider": "aws",
"connector.class": "S3_SINK",
"flush.size": "1000",
"input.data.format": "AVRO",
"kafka.api.key": "****************",
"kafka.api.secret": "****************",
"kafka.dedicated": "false",
"kafka.endpoint": "SASL_SSL://pkc-*****.us-west-2.aws.confluent.cloud:9092",
"kafka.region": "us-west-2",
"name": "S3_SINKConnector_1",
"output.data.format": "AVRO",
"s3.bucket.name": "datagen-to-s3",
"schema.registry.url": "https://psrc-*****.us-east-2.aws.confluent.cloud",
"tasks.max": "1",
"time.interval": "DAILY",
"topics": "pageviews"
},
"tasks": []
}
ちなみに
このドキュメントのサンプルコマンドでは、Confluent Cloud API のバージョン 1 が使用されています。API の最新バージョン、Connect API リクエストの網羅的なリスト、すべての応答コード、その他の詳細については、Confluent Cloud API 開発者ドキュメント を参照してください。
シンクコネクターにメトリクスを問い合わせるクエリ¶
シンクコネクターにメトリクスを問い合わせるには、以下の手順を実行します。
API リクエストのペイロードとして使用するため、
query-connector-metrics.json
という名前の JSON ファイルを作成します。次のサンプルをコピーして貼り付けると、指定の期間内にコネクターが受信したレコード数を取得できます。value
に適切なコネクターリソース ID を、intervals
に有効な時間間隔を入力してください。{ "aggregations": [ { "metric": "io.confluent.kafka.connect/received_records" } ], "filter": { "field": "resource.connector.id", "op": "EQ", "value": "lcc-k2q7v" }, "granularity": "PT1H", "intervals": [ "2021-03-02T00:00:00/2021-03-02T23:00:00" ] }
次の POST クエリコマンドを入力します。
curl --silent --request POST 'https://api.telemetry.confluent.cloud/v2/metrics/cloud/query' \ --header 'authorization: Basic <base64-encoded-key-and-secret>' \ --header 'Content-Type: application/json' \ --data "<my-json-filename>.json" | jq
以下に例を示します。
curl --silent --request POST 'https://api.telemetry.confluent.cloud/v2/metrics/cloud/query' \ --header 'authorization: Basic ABCDEFGZaNjNPV0QzSeEZCemUwK3JRUk5HQi82YkdtZlRCb1lYMDZaSTJEMHNMSnBwalBocjNLN1JORWQ2VWo2' \ --header 'Content-Type: application/json' \ --data "@query-metrics.json" | jq
このリクエストでは、指定した期間にコネクター
lcc-k2q7v
が受信したレコード数が返されます。その例を次に示します。{ "data": [ { "timestamp": "2021-03-02T18:00:00Z", "value": 44027, }, { "timestamp": "2021-03-02T19:00:00Z", "value": 7227, }, { "timestamp": "2021-03-02T20:00:00Z", "value": 7222, }, { "timestamp": "2021-03-02T21:00:00Z", "value": 7253, }, { "timestamp": "2021-03-02T22:00:00Z", "value": 7258, } ] }
Confluent Cloud Metrics API の詳細については、「Confluent Cloud のメトリクス」のドキュメントを参照してください。
コネクターの削除¶
コネクターを削除するには、次の API リクエストを使用します。
curl --request DELETE https://api.confluent.cloud/connect/v1/environments/<my-environment-ID>/clusters/<my-cluster-ID>/connectors/<my-connector-name> \
--header 'authorization: Basic <base64-encoded-key-and-secret>' | jq
以下に例を示します。
curl --request DELETE https://api.confluent.cloud/connect/v1/environments/env-*****/clusters/lkc-*****/connectors/S3_SINKConnector_1 \
--header 'authorization: Basic HIJKLMNOPQCSEFUNUJWNjdONjpOc3RyWE5kamlzZE05VTdOSk05T3FuSTcyQzlIb2ZRaWhURWtiOWlkVTFtdTB6' | jq
以下に示すのは、コネクターが正常に削除された場合の出力のサンプルです。
% Total % Received % Xferd Average Speed Time Time Time Current
Dload Upload Total Spent Left Speed
100 14 0 14 0 0 0 0 --:--:-- 0:00:15 --:--:-- 3
{
"error": null
}
次のステップ¶
API の最新バージョン、Connect API リクエストの網羅的なリスト、すべての応答コード、その他の詳細については、Confluent Cloud API 開発者ドキュメント を参照してください。
参考
フルマネージド型の Confluent Cloud コネクターが Confluent Cloud ksqlDB でどのように動作するかを示す例については、「Cloud ETL のデモ」を参照してください。この例では、Confluent CLI を使用して Confluent Cloud のリソースを管理する方法についても説明しています。