重要

このページの日本語コンテンツは古くなっている可能性があります。最新の英語版コンテンツをご覧になるには、こちらをクリックしてください。

Confluent Cloud API for Connect

フルマネージド型コネクター用の Confluent Cloud API により、Confluent Cloud API を使用してコネクターを操作することができます。これはクエリ可能な HTTP API です。たとえば、JSON で記述したクエリを POST すると、クエリで指定したコネクターの情報が返されます。Confluent Cloud API の使用方法について詳しくは、以下のサンプルを参考にしてください。

ちなみに

このドキュメントのサンプルコマンドでは、Confluent Cloud API のバージョン 1 が使用されています。API の最新バージョン、Connect API リクエストの網羅的なリスト、すべての応答コード、その他の詳細については、Confluent Cloud API 開発者ドキュメント を参照してください。

前提条件
  • Confluent Cloud へのアクセスを許可されていること。

  • Confluent CLI :ccloud-cli:` がインストールされ、構成されている|install.html` こと。コマンド例では Confluent CLI バージョン 2 を使用しています。詳細については、「Confluent CLI v2 への移行 <https://docs.confluent.io/confluent-cli/current/migrate.html#cli-migrate>`__」を参照してください。

  • このドキュメントの API リクエストのサンプルを使用するには、cURL と jq がインストールされていること。

  • Confluent Cloud API で認証を行うための Confluent Cloud API キー。すべての API リクエストに、base64 でエンコードされたリソースキーとシークレットが含まれている必要があります。キーを作成し、base64 でエンコードするには、以下の手順を実行します。

    1. アカウントにログインします。

      confluent login
      
    2. cloud リソース用の API キーとシークレットを作成します。

      confluent api-key create --resource cloud
      

      重要

      Confluent Cloud API を使用するには、--resource cloud 用の Confluent Cloud API キーを作成する必要があります。Confluent Cloud クラスター用に作成した Kafka クラスター API キー(--resource <cluster-ID>)を使用すると、API リクエストの実行時に認証エラーが発生します。

    3. Confluent Cloud API では、基本アクセス認証 が使用されます。API キーとシークレットを使用するには、API とシークレットを Authorization: Basic <Base64-credentials> の形式でヘッダーとして送信します。この形式は curl のサンプルで示されています。base64 でエンコードされた API キーペアを表示するには、次のコマンドを入力します。

      echo -n "<api-key>:<secret>" | base64
      

      以下に例を示します。

      echo -n "ABCDEFGPZROVP:z6yyH3LEEWdrAAamfue9mIeTAyocCMjO/oSKzg0UMoXA0x3CXjVglPJHYC/" | base64
      HIJKLMNOPYRlBaUk9TNjVWUDp6Nnl5SDNMRUVXQmtQN1lkckFBYW1mdWU5bUllVEF5b2NDTWpPL29TS3pnMFVNb1hBMHgzQ1hq
      

      base64 でエンコードされたこの結果を以下のサンプルで使用します。

注釈

サンプルでは、curl コマンドを使用して Confluent Cloud API を操作する方法を示します。

コネクターのリストの取得

クラスター内のコネクターのリストを取得するには、次の API リクエストを使用します。実行が成功すると、コネクターのリストが返されます。

curl --request GET 'https://api.confluent.cloud/connect/v1/environments/<my-environment-ID>/clusters/<my-cluster-ID>/connectors' /
--header 'authorization: Basic <base64-encoded-key-and-secret>' | jq

以下に例を示します。

curl --request GET 'https://api.confluent.cloud/connect/v1/environments/env-*****/clusters/lkc-*****/connectors' \
--header 'authorization: Basic HIJKLMNOPYRlB9TNjVWUDp6Nnl5SDNMRUVXQmtQN1dWU5bUllVEF5b2NDTWpPL29TS3pnMFVNb1hBMHgzQ1hqVmdsUEpIWUMv' | jq

コネクターのリストが出力として表示されます。以下に例を示します。

[
  "DatagenSourceConnector_0",
  "S3_SINKConnector_0"
]

コネクターの作成

Confluent Cloud API を使用してコネクターを使用する場合、ペイロードとして使用するコネクター構成の JSON ファイルを作成するか、curl コマンドでコネクター構成の JSON を使用することができます。実行が成功すると、コネクター構成が返されます。

注釈

以下の点を検討してください。

  • Confluent Cloud Kafka クラスター API キーとシークレットを JSON のコネクター構成で使用します。--resource cloud API キーとシークレットを API リクエストで使用します。
  • 必須のコネクター構成プロパティをすべて指定する必要があります。各コネクターの必須のコネクタープロパティについては、個々の Cloud コネクターのドキュメント を参照してください。

以下の curl コマンドのサンプルは、ペイロードのコネクター構成を指定する 2 つの方法を示しています。

未加工の JSON のペイロードのサンプル

次のコマンドでは、curl コマンドでコネクター構成を使用しています。次のサンプルでは、Amazon S3 Sink Connector の構成を示しています。

curl --request POST 'https://api.confluent.cloud/connect/v1/environments/<my-environment-ID>/clusters/<my-cluster-ID>/connectors' \
--header 'authorization: Basic <base64-encoded-key-and-secret>' \
--header 'Content-Type: application/json' \
--data-raw '{
  "name": "S3_SINKConnector_0",
  "config": {
    "topics": "pageviews",
    "input.data.format": "AVRO",
    "connector.class": "S3_SINK",
    "name": "S3_SINKConnector_0",
    "kafka.api.key": "<my-kafka-api-key>",
    "kafka.api.secret" : "<my-kafka-api-secret>",
    "aws.access.key.id" : "<my-aws-access-key>",
    "aws.secret.access.key": "<my-aws-secret>",
    "s3.bucket.name": "<my-s3-bucket-name>",
    "output.data.format": "AVRO",
    "time.interval": "HOURLY",
    "flush.size": "1000",
    "tasks.max": "1"
  }
}' | jq

JSON ファイルのペイロードのサンプル

次のコマンドでは、JSON ファイルでコネクター構成をアップロードします。

curl --request POST 'https://api.confluent.cloud/connect/v1/environments/<my-environment-ID>/clusters/<my-cluster-ID>/connectors' \
--header 'authorization: Basic <base64-encoded-key-and-secret>' \
--header 'Content-Type: application/json' \
--data "@<my-connector-config>.json" | jq

注釈

それぞれのフルマネージド型コネクターのドキュメント内の CLI セクションに、ペイロードファイルで使用する正しい形式の JSON のサンプルがあります。たとえば、Amazon S3 Sink Connector ドキュメント に JSON のサンプルがあります。

次のサンプルコマンドでは、my-s3-connector.json という名前の JSON ファイルがアップロードされます。このファイルは、Amazon S3 コネクターを作成するために使用されます。

curl --request POST 'https://api.confluent.cloud/connect/v1/environments/env-*****/clusters/lkc-*****/connectors' \
--header 'authorization: Basic RUVSTEVYRlBaUk9TNjVWUDp6Nnl51lkckFBYW1mdWU5bUllVEF5b2NDTWpPL29TS3pnMFVNb1hBMHgzQ1hqVmdsUEpIWUMv' \
--header 'Content-Type: application/json' \
--data "@s3-connector-config.json" | jq

コネクター構成が出力として表示されます。以下に例を示します。

% Total    % Received % Xferd    Average Speed   Time    Time     Time  Current
                                 Dload   Upload  Total   Spent    Left  Speed
100  1467  100   878  100   589     35     23  0:00:25  0:00:24  0:00:01  214
{
  "name": "S3_SINKConnector_0",
  "type": "sink",
  "config": {
    "aws.access.key.id": "****************",
    "aws.secret.access.key": "****************",
    "cloud.environment": "prod",
    "cloud.provider": "aws",
    "connector.class": "S3_SINK",
    "flush.size": "1000",
    "input.data.format": "AVRO",
    "kafka.api.key": "****************",
    "kafka.api.secret": "****************",
    "kafka.dedicated": "false",
    "kafka.endpoint": "SASL_SSL://pkc-*****.us-west-2.aws.confluent.cloud:9092",
    "kafka.region": "us-west-2",
    "name": "S3_SINKConnector_0",
    "output.data.format": "AVRO",
    "s3.bucket.name": "datagen-to-s3",
    "schema.registry.url": "https://psrc-*****.us-east-2.aws.confluent.cloud",
    "tasks.max": "1",
    "time.interval": "HOURLY",
    "topics": "pageviews"
  },
  "tasks": []
}

ちなみに

このドキュメントのサンプルコマンドでは、Confluent Cloud API のバージョン 1 が使用されています。API の最新バージョン、Connect API リクエストの網羅的なリスト、すべての応答コード、その他の詳細については、Confluent Cloud API 開発者ドキュメント を参照してください。

コネクター構成の読み取り

コネクター構成を読み取るには、次の API リクエストを使用します。実行が成功すると、コネクター構成が返されます。

curl --request GET 'https://api.confluent.cloud/connect/v1/environments/<my-environment-ID>/clusters/<my-cluster-ID>/connectors/<my-connector-name>/config' \
--header 'authorization: Basic <base64-encoded-key-and-secret>' | jq

以下に例を示します。

curl --request GET 'https://api.confluent.cloud/connect/v1/environments/env-****/clusters/lkc-*****/connectors/S3_SINKConnector_0/config' \
--header 'authorization: Basic HIJKLMNOPYRlB9TNjVWUVXQmtQN1lkckFBYW1m5b2NDTWpPL29TS3pnMFVNb1hBMHgzQ1hqVmdsUEpIWUMv' | jq

コネクター構成が出力として表示されます。以下に例を示します。

 % Total    % Received % Xferd  Average Speed   Time    Time     Time Current
                                Dload  Upload   Total   Spent    Left  Speed
100   814  100   814    0     0   1477      0 --:--:-- --:--:-- --:--:-- 1477
{
  "aws.access.key.id": "****************",
  "aws.secret.access.key": "****************",
  "cloud.environment": "prod",
  "cloud.provider": "aws",
  "connector.class": "S3_SINK",
  "flush.size": "1000",
  "input.data.format": "AVRO",
  "kafka.api.key": "****************",
  "kafka.api.secret": "****************",
  "kafka.dedicated": "false",
  "kafka.endpoint": "SASL_SSL://pkc-****.us-west-2.aws.confluent.cloud:9092",
  "kafka.region": "us-west-2",
  "name": "S3_SINKConnector_0",
  "output.data.format": "AVRO",
  "s3.bucket.name": "datagen-to-s3",
  "schema.registry.url": "https://psrc-*****.us-east-2.aws.confluent.cloud",
  "tasks.max": "1",
  "time.interval": "HOURLY",
  "topics": "pageviews"
}

ちなみに

このドキュメントのサンプルコマンドでは、Confluent Cloud API のバージョン 1 が使用されています。API の最新バージョン、Connect API リクエストの網羅的なリスト、すべての応答コード、その他の詳細については、Confluent Cloud API 開発者ドキュメント を参照してください。

コネクター構成のアップデート

Confluent Cloud API を使用してコネクター構成をアップデートする場合、ペイロードとして使用するコネクター構成の JSON ファイルをアップデートするか、curl コマンドでアップデートされたコネクター構成の JSON を使用することができます。2 種類の curl コマンドを構成する方法を示すサンプルについては、「コネクターの作成」を参照してください。

注釈

以下の点を検討してください。

  • Confluent Cloud Kafka クラスター API キーとシークレットを JSON のコネクター構成で使用します。--resource cloud API キーとシークレットを API リクエストで使用します。
  • 必須のコネクター構成プロパティをすべて指定する必要があります。各コネクターの必須のコネクタープロパティについては、個々の Cloud コネクターのドキュメント を参照してください。

次の curl コマンドのサンプルは、コネクターのフラッシュ間隔のプロパティを HOURLY から DAILY にアップデートする方法を示しています。実行が成功すると、アップデートされたコネクター構成が返されます。

コネクターでは、すぐに新しい構成の使用が開始されます。

curl --request PUT 'https://api.confluent.cloud/connect/v1/environments/<my-environment-ID>/clusters/<my-cluster-ID>/connectors/<my-connector-name>/config' \
--header 'authorization: Basic <base64-encoded-key-and-secret>' \
--header 'Content-Type: application/json' \
--data-raw '{
    "topics": "pageviews",
    "input.data.format": "AVRO",
    "connector.class": "S3_SINK",
    "name": "<my-connector-name>",
    "kafka.api.key": "<my-kafka-api-key>",
    "kafka.api.secret" : "<my-kafka-api-secret>",
    "aws.access.key.id" : "<my-aws-access-key>",
    "aws.secret.access.key": "<my-aws-secret>",
    "s3.bucket.name": "<my-s3-bucket-name>",
    "output.data.format": "AVRO",
    "time.interval": "DAILY",
    "flush.size": "1000",
    "tasks.max": "1"
}' | jq

また、JSON ファイルを指定して次のコマンドを使用して、コネクター構成をアップデートすることもできます。

curl --request PUT https://api.confluent.cloud/connect/v1/environments/<my-environment>/clusters/<my-cluster>/connectors/<my-connector-name>/config \
--header 'authorization: Basic <base64-encoded-key-and-secret>' \
--header 'Content-Type: application/json' \
--data "@s3-sink-update.json" | jq

注釈

それぞれのフルマネージド型コネクターのドキュメント内の CLI セクションに、ペイロードファイルで使用する正しい形式の JSON のサンプルがあります。たとえば、Amazon S3 Sink Connector ドキュメント に JSON のサンプルがあります。

以下に例を示します。

curl --request PUT https://api.confluent.cloud/connect/v1/environments/env-*****/clusters/lkc-*****/connectors/S3_SINKConnector_5/config \
--header 'authorization: Basic ABCDEFGZaNjNPV0QzSlRNUjpHVll3UjmUwK3JRUk5HQi82YkdtZlRCb1lYMDZaSTJEMHNMSnBwalBocjNLN1JORWQ2VWo2' \
--header 'Content-Type: application/json' \
--data "@s3-sink-update.json" | jq

アップデートされたコネクター構成が出力として表示されます。以下に例を示します。

% Total    % Received % Xferd  Average Speed   Time    Time     Time Current
                               Dload   Upload  Total   Spent    Left  Speed
100  1415  100   877  100   538     36  22  0:00:24  0:00:23  0:00:01   205
   {
     "name": "S3_SINKConnector_5",
     "type": "sink",
     "config": {
       "aws.access.key.id": "****************",
       "aws.secret.access.key": "****************",
       "cloud.environment": "prod",
       "cloud.provider": "aws",
       "connector.class": "S3_SINK",
       "flush.size": "1000",
       "input.data.format": "AVRO",
       "kafka.api.key": "****************",
       "kafka.api.secret": "****************",
       "kafka.dedicated": "false",
       "kafka.endpoint": "SASL_SSL://pkc-*****.us-west-2.aws.confluent.cloud:9092",
       "kafka.region": "us-west-2",
       "name": "S3_SINKConnector_1",
       "output.data.format": "AVRO",
       "s3.bucket.name": "datagen-to-s3",
       "schema.registry.url": "https://psrc-*****.us-east-2.aws.confluent.cloud",
       "tasks.max": "1",
       "time.interval": "DAILY",
       "topics": "pageviews"
     },
     "tasks": []
   }

ちなみに

このドキュメントのサンプルコマンドでは、Confluent Cloud API のバージョン 1 が使用されています。API の最新バージョン、Connect API リクエストの網羅的なリスト、すべての応答コード、その他の詳細については、Confluent Cloud API 開発者ドキュメント を参照してください。

シンクコネクターにメトリクスを問い合わせるクエリ

シンクコネクターにメトリクスを問い合わせるには、以下の手順を実行します。

  1. API リクエストのペイロードとして使用するため、query-connector-metrics.json という名前の JSON ファイルを作成します。次のサンプルをコピーして貼り付けると、指定の期間内にコネクターが受信したレコード数を取得できます。value に適切なコネクターリソース ID を、intervals に有効な時間間隔を入力してください。

    {
        "aggregations": [
            {
                "metric": "io.confluent.kafka.connect/received_records"
            }
        ],
        "filter":
        {
            "field": "resource.connector.id",
            "op": "EQ",
            "value": "lcc-k2q7v"
        },
        "granularity": "PT1H",
        "intervals": [
            "2021-03-02T00:00:00/2021-03-02T23:00:00"
        ]
    }
    
  2. 次の POST クエリコマンドを入力します。

    curl --silent --request POST 'https://api.telemetry.confluent.cloud/v2/metrics/cloud/query' \
    --header 'authorization: Basic <base64-encoded-key-and-secret>' \
    --header 'Content-Type: application/json' \
    --data "<my-json-filename>.json" | jq
    

    以下に例を示します。

    curl --silent --request POST 'https://api.telemetry.confluent.cloud/v2/metrics/cloud/query' \
    --header 'authorization: Basic ABCDEFGZaNjNPV0QzSeEZCemUwK3JRUk5HQi82YkdtZlRCb1lYMDZaSTJEMHNMSnBwalBocjNLN1JORWQ2VWo2' \
    --header 'Content-Type: application/json' \
    --data "@query-metrics.json" | jq
    

    このリクエストでは、指定した期間にコネクター lcc-k2q7v が受信したレコード数が返されます。その例を次に示します。

    {
      "data": [
        {
          "timestamp": "2021-03-02T18:00:00Z",
          "value": 44027,
        },
        {
          "timestamp": "2021-03-02T19:00:00Z",
          "value": 7227,
        },
        {
          "timestamp": "2021-03-02T20:00:00Z",
          "value": 7222,
        },
        {
          "timestamp": "2021-03-02T21:00:00Z",
          "value": 7253,
        },
        {
          "timestamp": "2021-03-02T22:00:00Z",
          "value": 7258,
        }
      ]
    }
    

Confluent Cloud Metrics API の詳細については、「Confluent Cloud のメトリクス」のドキュメントを参照してください。

コネクターの削除

コネクターを削除するには、次の API リクエストを使用します。

curl --request DELETE https://api.confluent.cloud/connect/v1/environments/<my-environment-ID>/clusters/<my-cluster-ID>/connectors/<my-connector-name> \
--header 'authorization: Basic <base64-encoded-key-and-secret>' | jq

以下に例を示します。

curl --request DELETE https://api.confluent.cloud/connect/v1/environments/env-*****/clusters/lkc-*****/connectors/S3_SINKConnector_1 \
--header 'authorization: Basic HIJKLMNOPQCSEFUNUJWNjdONjpOc3RyWE5kamlzZE05VTdOSk05T3FuSTcyQzlIb2ZRaWhURWtiOWlkVTFtdTB6' | jq

以下に示すのは、コネクターが正常に削除された場合の出力のサンプルです。

  % Total    % Received % Xferd  Average Speed   Time    Time     Time    Current
                                 Dload   Upload  Total   Spent    Left    Speed
100    14    0    14    0     0      0      0 --:--:--  0:00:15 --:--:--     3
{
  "error": null
}

次のステップ

API の最新バージョン、Connect API リクエストの網羅的なリスト、すべての応答コード、その他の詳細については、Confluent Cloud API 開発者ドキュメント を参照してください。

参考

フルマネージド型の Confluent Cloud コネクターが Confluent Cloud ksqlDB でどのように動作するかを示す例については、「Cloud ETL のデモ」を参照してください。この例では、Confluent CLI を使用して Confluent Cloud のリソースを管理する方法についても説明しています。

../_images/topology.ja.png