Confluent Cloud 開発者向けの REST API のクイックスタート¶
このクイックスタートでは、Confluent Cloud REST API を使用した Apache Kafka® クラスターの管理を開始する方法について説明します。
クイックスタートの内容¶
このクイックスタートでは、REST 管理用エンドポイント を使用して以下の操作を行う方法について説明します。
- 既存のトピックの表示
- トピックの作成
- トピック構成のアップデート
- トピックのプロパティと値のリストの取得
- トピックに指定されているプロパティの値の取得
- トピックの削除
クイックスタートでは、使用可能な一部のエンドポイントのデモを示し、さまざまなタイプの呼び出しの使用方法を示します。これは入門レベルであるため、包括的ではありません。API ではもっと幅広い機能を使用できます。
前提条件¶
前提条件として、Confluent Cloud に Kafka クラスターが必要です。
既存のクラスターを使用するか、Confluent Cloud を使用した Apache Kafka のクイックスタートのステップ 1 の説明に従って新しいクラスターを作成できます。
例の使用上のヒント¶
クイックスタートでは、curl コマンドを使用して API 呼び出しをテストする方法を示します。
API のテストを行う場合、
curl
の--silent
フラグを使用して jq でコマンド全体をパイプしてみてください。きれいに書式設定された出力結果が得られます。以下に例を示します:curl --silent -H "Authorization: Basic <BASE64-encoded-key-and-secret>" \ --request GET --url 'https://<REST-endpoint>/kafka/v3/clusters/<cluster-id>/topics/<topic-name>/configs' | jq
出力をファイルに送信し、画面にも表示するには、
<command> | tee notes.txt
を使用して、新しいファイルを作成するか、同じ名前のファイルを上書きします。それ以降、既存のファイルに追加する場合は、tee
コマンドで-a
フラグを使用します。たとえば、<command> | tee -a notes.txt
のようにします。これをjq
パイプの後に連結できます。
ちなみに
一部の例では、複数行からなるコマンドに改行を入れるために、バックスラッシュ(\
)を使用しています。バックスラッシュを使用すると、シェルやターミナルで 1 つのコマンドを複数行に入力できます。ここでは、ドキュメントの表示を最適化するために使用しています。ただし、バックスラッシュを含む例をコピーアンドペーストした場合、デプロイに合わせた値を変更するコマンドを実行できないことがあります。これは、使用するシェルのタイプとその設定によって異なります。
ステップ 1: REST エンドポイントのアドレスとクラスター ID を見つける¶
Confluent CLI または Cloud Console UI から REST エンドポイントとクラスター ID を取得します。
confluent login
を使用して Confluent CLI にログオンします。confluent environment list
、confluent environment use
を使用して、必要な環境が表示されることを確認します。confluent kafka cluster list
コマンドでクラスターを一覧表示し、ID を取得します。- 次のコマンドを入力してクラスターの詳細を取得します。
confluent kafka cluster describe <cluster-id>
ちなみに
CLI のインストールまたはアップデートについては、CLI のインストールガイド を参照してください。
Confluent Cloud ウェブ UI にログオンします。
使用するクラスターに移動し、Cluster settings をクリックします。
REST エンドポイントをメモします。
これは、REST API がホストされているサーバーです。後述の例で、サーバー例の代わりにこのエンドポイント ID を使用します。
クラスター ID をメモします。
これは、REST API を使用して管理する Kafka クラスターの ID です。後述の例で、クラスター ID 例の代わりにこれを使用する必要があります。
ステップ 2: Kafka クラスターリソースにアクセスするための認証情報を作成する¶
Confluent Cloud のクラスタースコープの API キーとシークレットを作成します。
REST API とやり取りするには、Confluent Cloud の API キーと API シークレットが必要です。API キーペアを生成するには、CLI または Cloud Console を使用します。
confluent login
で Confluent CLI にログオンします(まだログオンしていない場合)。以下のコマンドを実行して、クラスターの API キーとシークレットを作成します。
confluent api-key create --resource <cluster-id> --description <key-description>
出力は以下のようになります。
It may take a couple of minutes for the API key to be ready. Save the API key and secret. The secret is not retrievable later. +---------+------------------------------------------------------------------+ | API Key | 1234WXYZ4321ZYXW | | Secret | 1aBcDEFG234Hy7CeGBoBDoBBsSttOMJ5oFUwwkhj7g7MlS3p01c99C6ao84pQb8X | +---------+------------------------------------------------------------------+
Confluent Cloud ウェブ UI で、使用するクラスターに移動し、左側のメニューから API access を選択します。
Add key をクリックし、画面の指示に従ってグローバル アクセス API キーを作成します。
詳細については、「API キーを使用したアクセス制御」を参照してください。
API キー ID と対応するシークレットを使用して、Confluent Cloud API への REST 呼び出しに含まれる認可ヘッダーで使用する、base64 でエンコードされた文字列を作成します。
詳細については、API ドキュメントの「認証」を参照してください。クラウドとクラスターの API キーと base64 エンコーディングについて記載されています。
たとえば、Mac OS で API キーから base64 ヘッダーを生成するには、以下の手順に従います。
echo -n "<api-key>:<api-secret>" | base64
上記のコマンドからの出力は、長い英数字の文字列(base64 でエンコードされたキーとシークレット)になります。これを以降の REST API 呼び出しで基本認証として使用します。これを次のステップで使用できるように保存します。
ステップ 3: Kafka クラスター上で既に使用可能なトピックをリストする¶
Kafka クラスターのトピックをリストします。
curl -H "Authorization: Basic <BASE64-encoded-key-and-secret>" --request GET --url 'https://<REST-endpoint>/kafka/v3/clusters/<cluster-id>/topics'
以下に例を示します。
curl -H "Authorization: Basic ABC123ABC" --request GET --url 'https://pkc-lzvrd.us-west4.gcp.confluent.cloud:443/kafka/v3/clusters/lkc-vo9pz/topics'
応答で既存のトピックがリストされます。
この例では、この Kafka クラスターに既存のトピックがありません。
{"kind":"KafkaTopicList",
"metadata":{
"self":"https://pkc-lzvrd.us-west4.gcp.confluent.cloud/kafka/v3/clusters/lkc-vo9pz/topics",
"next":null
},
"data":[]
}
ステップ 4: Kafka API のクラスター管理を使用してトピックを作成する¶
トピックを作成します。
curl -H "Authorization: Basic <BASE64-encoded-key-and-secret>" -H 'Content-Type: application/json' \
--request POST --url 'https://<REST-endpoint>/kafka/v3/clusters/<cluster-id>/topics' \
-d '{"topic_name": "<topic-name>", "partitions_count": <Partitions count>, "replication_factor": <Replication factor>}'
以下に例を示します。
curl -H "Authorization: Basic ABC123ABC" -H 'Content-Type: application/json' \
--request POST --url 'https://pkc-lzvrd.us-west4.gcp.confluent.cloud:443/kafka/v3/clusters/lkc-vo9pz/topics' \
-d '{"topic_name": "testTopic1", "partitions_count": 5, "replication_factor": 3}'
応答で新しいトピックに関する情報が示されます:
{
"kind": "KafkaTopic",
"metadata": {
"self": "https://pkc-lzvrd.us-west4.gcp.confluent.cloud:443/kafka/v3/clusters/lkc-vo9pz/topics/testTopic1”,
"resource_name": "crn:///kafka=lkc-vo9pz/topic=testTopic1"
},
"cluster_id": "lkc-vo9pz",
"topic_name": "testTopic1",
"is_internal": false,
"replication_factor": 3,
"partitions_count": 5,
"partitions": {
"related": "https://pkc-lzvrd.us-west4.gcp.confluent.cloud:443/kafka/v3/clusters/lkc-vo9pz/topics/testTopic1/partitions"
},
"configs": {
"related": "https://pkc-lzvrd.us-west4.gcp.confluent.cloud:443/kafka/v3/clusters/lkc-vo9pz/topics/testTopic1/configs"
},
"partition_reassignments": {
"related": "https://pkc-lzvrd.us-west4.gcp.confluent.cloud:443/kafka/v3/clusters/lkc-vo9pz/topics/testTopic1//partitions/-/reassignment"
}
}
ステップ 5: トピック構成を管理する¶
ステップ 4 で作成されたトピックのデフォルトの構成を表示します。
API 呼び出しでトピック構成のプロパティのリストを取得できます。
curl -H "Authorization: Basic <BASE64-encoded-key-and-secret>" \ --request GET --url 'https://<REST-endpoint>/kafka/v3/clusters/<cluster-id>/topics/<topic-name>/configs'
たとえば、以下のコマンドでは、
testTopic1
の構成のリストを取得し、さらに読みやすい出力結果を得るためにパイプで jq に連結します。そしてtee
を使用して出力を新しいファイル(testTopic1-configs.txt
)に送信し、同時に画面に表示します。出力をファイルに送信すると、元のすべての構成のブラウズや検索が可能になり、以降のステップで役に立つ場合があります。curl --silent -H "Authorization: Basic ABC123ABC" \ --request GET --url 'https://pkc-lzvrd.us-west4.gcp.confluent.cloud:443/kafka/v3/clusters/lkc-vo9pz/topics/testTopic1/configs' | jq | tee testTopic1-configs.txt
応答でトピック構成プロパティとその設定値のリストが表示されます:
{ "kind": "KafkaTopicConfigList", "metadata": { "self": "https://pkc-lzvrd.us-west4.gcp.confluent.cloud:443/kafka/v3/clusters/lkc-vo9pz/topics/testTopic1/configs", "next": null }, "data": [ { "kind": "KafkaTopicConfig", "metadata": { "self": "https://pkc-lzvrd.us-west4.gcp.confluent.cloud:443/kafka/v3/clusters/lkc-vo9pz/topics/testTopic1/configs/cleanup.policy", "resource_name": "crn:///kafka=lkc-vo9pz/topic=testTopic1/config=cleanup.policy" }, "cluster_id": "lkc-vo9pz", "name": "cleanup.policy", "value": "delete", "is_read_only": false, "is_sensitive": false, "source": "DYNAMIC_TOPIC_CONFIG", "synonyms": [ { "name": "cleanup.policy", "value": "delete", "source": "DYNAMIC_TOPIC_CONFIG" }, { "name": "log.cleanup.policy", "value": "delete", "source": "DEFAULT_CONFIG" } ], "topic_name": "testTopic1", "is_default": false }, ...
トピック構成を編集します。
構成プロパティのリストは、Confluent Platform ドキュメントの「トピック構成」にあります。
トピックプロパティを編集するには、URL の最後にプロパティ名を追加します。
curl -H "Authorization: Basic <BASE64-encoded-key-and-secret>" -H 'Content-Type: application/json' \ --request PUT --url 'https://<REST-endpoint>/kafka/v3/clusters/<cluster-id>/topics/‘<topic-name>/configs/<property-name> \ -d '{"value": “<New value>”}’
たとえば、retention.ms の値をデフォルト値の 604800000(7 日間)から 259200000(3 日間)に変更するには、以下の手順に従います。
curl -H "Authorization: Basic ABC123ABC" -H 'Content-Type: application/json' --request PUT \ --url 'https://pkc-lzvrd.us-west4.gcp.confluent.cloud:443/kafka/v3/clusters/lkc-vo9pz/topics/testTopic1/configs/retention.ms' \ -d '{"value": "259200000"}'
この呼び出しでは、応答ペイロードは返されません。
アップデートしたトピック構成を表示します。
単一のプロパティの値を表示するには、URL でプロパティ名を指定します:
curl -H "Authorization: Basic <BASE64-encoded-key-and-secret>" --request GET --url 'https://<REST-endpoint>/kafka/v3/clusters/<cluster-id>/topics/<topic-name>/configs/<property-name>’
以下に例を示します。
curl -H "Authorization: Basic ABC123ABC" --request GET --url 'https://pkc-lzvrd.us-west4.gcp.confluent.cloud:443/kafka/v3/clusters/lkc-vo9pz/topics/testTopic1/configs/retention.ms'
応答ペイロードでトピック構成プロパティ
retention.ms
の現在の値が示されます:{ "kind": "KafkaTopicConfig", "metadata": { "self": “https://pkc-lzvrd.us-west4.gcp.confluent.cloud:443/kafka/v3/clusters/lkc-vo9pz/topics/testTopic1/configs/retention.ms", "resource_name": "crn:///kafka=lkc-vo9pz/topic=testTopic1/config=retention.ms" }, "cluster_id": "lkc-vo9pz", "name": "retention.ms", "value": "259200000", "is_read_only": false, "is_sensitive": false, "source": "DYNAMIC_TOPIC_CONFIG", "synonyms": [ { "name": "retention.ms", "value": "259200000", "source": "DYNAMIC_TOPIC_CONFIG" } ], "topic_name": "testTopic1", "is_default": false }
トピック構成をバッチアップデートします。
複数の構成プロパティを単一の REST 呼び出しでアップデートするには、バッチアップデートを使用します:
curl -H "Authorization: Basic <BASE64-encoded-key-and-secret>" -H 'Content-Type: application/json' \ --request POST --url 'https://<REST-endpoint>/kafka/v3/clusters/<cluster-id>/topics/<topic-name>/configs:alter' \ -d '{"data": [{"name": “<property-name>”, "value": "<new-value>", {"name": “<property-name>”, "value": “<new-value>}…]}’
たとえば、以下の API 呼び出しでは、retention.ms を再びアップデートしますが、今度は 172800000(2 日間)にアップデートし、segment.bytes をデフォルト(1073741824)から 123456789 にアップデートします:
curl -H "Authorization: Basic ABC123ABC" -H 'Content-Type: application/json' --request POST --url 'https://pkc-lzvrd.us-west4.gcp.confluent.cloud:443/kafka/v3/clusters/lkc-vo9pz/topics/testTopic1/configs:alter' -d '{"data": [{"name": "retention.ms", "value": "172800000"}, {"name": "segment.bytes", "value": "123456789"}]}'
トピック構成のアップデートした値を表示します。
前の手順で示したように、
retention.ms
、次にsegment.bytes
を以下の API 呼び出しの<property-name>
の代わりに使用してアップデートを表示できます:curl -H "Authorization: Basic <BASE64-encoded-key-and-secret>" --request GET --url 'https://<REST-endpoint>/kafka/v3/clusters/<cluster-id>/topics/<topic-name>/configs/<property-name>’
または、Confluent Cloud ウェブ UI にログオンし、クラスター上の
testTopic1
に移動して Configuration タブをクリックしてから Show full config をクリックします。
ステップ 6: トピックの作成とプロパティの構成を同時に行う¶
トピック構成をトピックの作成の一部としてアップデートすることもできます。
これを試すには、新しいトピックを作成します。以下に例を示します。
curl --silent -H "Authorization: Basic TOKEN" -H 'Content-Type: application/json' --request POST --url \
'https://pkc-0wg55.us-central1.gcp.devel.cpdev.cloud:443/kafka/v3/clusters/lkc-nyk52d/topics' \
-d '{"topic_name": "testTopic2", "partitions_count": 4, "replication_factor": 3, "configs":[{"name": "retention.ms", "value": 98765},{"name": "segment.bytes", "value":"98765432"}]}' | jq
この例では、トピック testTopic2
を作成し、replication_factor
と partitions_count
(前の例と同様)を指定して、トピックの作成時に retention.ms
と segment.bytes
の新しい値も指定しています。
ステップ 7: トピックを削除する¶
クイックスタートデモを終了する準備が完了するか、トピックが不要になったら、サンプルトピックを削除できます。
トピックを削除するには、以下の API 呼び出しを使用します。
curl -H "Authorization: Basic <BASE64-encoded-key-and-secret>" -H 'Content-Type: application/json' --request DELETE --url 'https://<REST-endpoint>/kafka/v3/clusters/<cluster-id>/topics/<topic-name>'
以下に例を示します:
curl -H "Authorization: Basic ABC123ABC" -H 'Content-Type: application/json' --request DELETE --url 'https://pkc-lzvrd.us-west4.gcp.confluent.cloud:443/kafka/v3/clusters/lkc-vo9pz/topics/‘testTopic1'
curl -H "Authorization: Basic ABC123ABC" -H 'Content-Type: application/json' --request DELETE --url 'https://pkc-lzvrd.us-west4.gcp.confluent.cloud:443/kafka/v3/clusters/lkc-vo9pz/topics/‘testTopic2'
この呼び出しでは、応答ペイロードはありません。
クラスターのトピックをリストして、トピックが削除されたことを確認します。
curl -H "Authorization: Basic <BASE64-encoded-key-and-secret>" --request GET --url 'https://<REST-endpoint>/kafka/v3/clusters/<cluster-id>/topics'
以下に例を示します。
curl -H "Authorization: Basic ABC123ABC" --request GET --url 'https://pkc-lzvrd.us-west4.gcp.confluent.cloud:443/kafka/v3/clusters/lkc-vo9pz/topics'
応答では、クラスターにトピックがないことが示されます:
{ "kind": "KafkaTopicList", "metadata": { "self": "https://pkc-lzvrd.us-west4.gcp.confluent.cloud:443/kafka/v3/clusters/lkc-vo9pz/topics", "next": null }, "data": [] }