REST Proxy: Apache Kafka® のサンプル¶
このチュートリアルでは、Confluent REST Proxy を使用して Apache Kafka® クラスターにメッセージを生成し、このクラスターからメッセージを消費します。
チュートリアルの実行後に、提供された ソースコード をリファレンスとして使用して、独自の Kafka クライアントアプリケーションを開発します。
前提条件¶
クライアント¶
- Docker バージョン 17.06.1-ce
- Docker Compose バージョン 1.25.4
wget
Kafka クラスター¶
- このチュートリアルは、どの環境の Kafka クラスターにも対応しています。
- Confluent Cloud 内
- ローカルホスト 上
- 任意のリモート Kafka クラスター
- Confluent Cloud で実行している場合は、API キーおよびシークレットで Confluent Cloud クラスターにアクセスできることが必要です。
- Confluent Cloud にサインアップし、プロモーションコード
C50INTEG
を使用した最初の 20 名のユーザーは、$50 相当を無料で使用できます(詳細)。 - Confluent Cloud で Kafka クラスター、資格情報、および ACL を自動的に作成する方法については、「Confluent Cloud 向け ccloud-stack ユーティリティ」を参照してください。
- Confluent Cloud にサインアップし、プロモーションコード
セットアップ¶
confluentinc/examples GitHub リポジトリのクローンを作成し、
6.0.6-post
ブランチをチェックアウトします。git clone https://github.com/confluentinc/examples cd examples git checkout 6.0.6-post
REST Proxy のサンプルのディレクトリに変更します。
cd clients/cloud/rest-proxy/
Kafka クラスターに接続するための構成パラメーターを含むローカルファイル(
$HOME/.confluent/java.config
など)を作成します。以下のテンプレートのいずれかをベースとして、クラスターへの接続情報でこのファイルをカスタマイズします。{{ BROKER_ENDPOINT }}
、{{CLUSTER_API_KEY }}
、および{{ CLUSTER_API_SECRET }}
に値を代入します(この値を手作業で見つける方法については、「Configure Confluent Cloud Clients」を参照してください。 または、Confluent Cloud 向け ccloud-stack ユーティリティ を使用して値を自動作成します)。Confluent Cloud のテンプレート構成ファイル
# Required connection configs for Kafka producer, consumer, and admin bootstrap.servers={{ BROKER_ENDPOINT }} security.protocol=SASL_SSL sasl.jaas.config=org.apache.kafka.common.security.plain.PlainLoginModule required username='{{ CLUSTER_API_KEY }}' password='{{ CLUSTER_API_SECRET }}'; sasl.mechanism=PLAIN # Required for correctness in Apache Kafka clients prior to 2.6 client.dns.lookup=use_all_dns_ips # Best practice for Kafka producer to prevent data loss acks=all
ローカルホストのテンプレート構成ファイル
# Kafka bootstrap.servers=localhost:9092
ENV 変数のファイルを生成します。Docker は、これを使用してブートストラップサーバーとセキュリティ構成を設定します。
../../../ccloud/ccloud-generate-cp-configs.sh $HOME/.confluent/java.config
生成した
ENV
変数のファイルをソースにします。source ./delta_configs/env.delta
cp-all-in-one-cloud docker-compose.yml ファイルを入手します。このファイルは、ローカルホストのコンテナーで Confluent Platform を実行し、Confluent Cloud に接続するようにコンテナーを自動的に構成します。
wget -O docker-compose.yml https://raw.githubusercontent.com/confluentinc/cp-all-in-one/6.0.6/cp-all-in-one-cloud/docker-compose.yml
REST Proxy 構成全体については、前の手順でダウンロードした
docker-compose.yml
ファイルの REST Proxy セクションを参照してください。cat docker-compose.yml
次のコマンドを実行して REST Proxy Docker コンテナーを起動します。
docker-compose up -d rest-proxy
Docker の REST Proxy ログを表示し、REST Proxy が起動したことを確認するログメッセージ
Server started, listening for requests
が表示されるまで待ちます。docker-compose logs -f rest-proxy
基本プロデューサーおよびコンシューマー¶
このサンプルでは、プロデューサーアプリケーションが Kafka クラスターのトピックに Kafka データを書き込みます。Kafka クラスターにトピックがまだ存在しない場合、プロデューサーアプリケーションは Kafka Admin Client API を使用してトピックを作成します。Kafka に書き込まれる各レコードには、ユーザー名(alice
など)を表すキーと、json フォーマットのカウント値({"count": 0}
など)があります。コンシューマーアプリケーションは、同じ Kafka トピックを読み取り、各レコードの処理中にカウントのローリング合計を保持します。
レコードの生成¶
REST Proxy が接続されている Kafka のクラスター ID を入手します。
KAFKA_CLUSTER_ID=$(docker-compose exec rest-proxy curl -X GET \ "http://localhost:8082/v3/clusters/" | jq -r ".data[0].cluster_id")
パラメーター
KAFKA_CLUSTER_ID
の値が有効であることを確認します。このチュートリアルのサンプルの場合は、lkc-56ngz
と表示されますが、実際の出力では異なります。echo $KAFKA_CLUSTER_ID
REST Proxy API v3 の
AdminClient
機能を使用して Kafka トピックtest1
を作成します。REST Proxy が Confluent Cloud にバックアップされる場合は、レプリケーション係数を3
に構成します。docker-compose exec rest-proxy curl -X POST \ -H "Content-Type: application/json" \ -d "{\"topic_name\":\"test1\",\"partitions_count\":6,\"replication_factor\":3,\"configs\":[]}" \ "http://localhost:8082/v3/clusters/${KAFKA_CLUSTER_ID}/topics" | jq .
以下のように出力されることを確認します。
{ "kind": "KafkaTopic", "metadata": { "self": "http://rest-proxy:8082/v3/clusters/lkc-56ngz/topics/test1", "resource_name": "crn:///kafka=lkc-56ngz/topic=test1" }, "cluster_id": "lkc-56ngz", "topic_name": "test2", "is_internal": false, "replication_factor": 3, "partitions": { "related": "http://rest-proxy:8082/v3/clusters/lkc-56ngz/topics/test2/partitions" }, "configs": { "related": "http://rest-proxy:8082/v3/clusters/lkc-56ngz/topics/test2/configs" }, "partition_reassignments": { "related": "http://rest-proxy:8082/v3/clusters/lkc-56ngz/topics/test1/partitions/-/reassignment" } }
キー
alice
、および値{"count":0}
、{"count":1}
、{"count":2}
で、3 つの JSON メッセージをトピックに生成します。docker-compose exec rest-proxy curl -X POST \ -H "Content-Type: application/vnd.kafka.json.v2+json" \ -H "Accept: application/vnd.kafka.v2+json" \ --data '{"records":[{"key":"alice","value":{"count":0}},{"key":"alice","value":{"count":1}},{"key":"alice","value":{"count":2}}]}' \ "http://localhost:8082/topics/test1" | jq .
以下のように出力されることを確認します。
{ "offsets": [ { "partition": 0, "offset": 0, "error_code": null, "error": null }, { "partition": 0, "offset": 1, "error_code": null, "error": null }, { "partition": 0, "offset": 2, "error_code": null, "error": null } ], "key_schema_id": null, "value_schema_id": null }
プロデューサーコード を表示します。
レコードの消費¶
コンシューマーグループ
cg1
に属するコンシューマーci1
を作成します。トピックの先頭で開始されるように、auto.offset.reset
にearliest
を指定します。docker-compose exec rest-proxy curl -X POST \ -H "Content-Type: application/vnd.kafka.v2+json" \ --data '{"name": "ci1", "format": "json", "auto.offset.reset": "earliest"}' \ http://localhost:8082/consumers/cg1 | jq .
以下のように出力されることを確認します。
{ "instance_id": "ci1", "base_uri": "http://rest-proxy:8082/consumers/cg1/instances/ci1" }
コンシューマーをトピック
test1
にサブスクライブします。docker-compose exec rest-proxy curl -X POST \ -H "Content-Type: application/vnd.kafka.v2+json" \ --data '{"topics":["test1"]}' \ http://localhost:8082/consumers/cg1/instances/ci1/subscription | jq .
最初の応答にあるベース URL を使用してデータを消費します。間に 10 秒間のスリープを挟んで curl コマンドを 2 回発行します。これは、https://github.com/confluentinc/kafka-rest/issues/432 に記載された理由による、意図的な操作です。
docker-compose exec rest-proxy curl -X GET \ -H "Accept: application/vnd.kafka.json.v2+json" \ http://localhost:8082/consumers/cg1/instances/ci1/records | jq . sleep 10 docker-compose exec rest-proxy curl -X GET \ -H "Accept: application/vnd.kafka.json.v2+json" \ http://localhost:8082/consumers/cg1/instances/ci1/records | jq .
以下のように出力されることを確認します。
[] [ { "topic": "test1", "key": "alice", "value": { "count": 0 }, "partition": 0, "offset": 0 }, { "topic": "test1", "key": "alice", "value": { "count": 1 }, "partition": 0, "offset": 1 }, { "topic": "test1", "key": "alice", "value": { "count": 2 }, "partition": 0, "offset": 2 } ]
コンシューマーインスタンスを削除して、そのリソースをクリーンアップします。
docker-compose exec rest-proxy curl -X DELETE \ -H "Content-Type: application/vnd.kafka.v2+json" \ http://localhost:8082/consumers/cg1/instances/ci1 | jq .
コンシューマーコード を表示します。
Avro と Confluent Cloud Schema Registry¶
このサンプルは、前のサンプルと似ていますが、値は Avro フォーマットです。また、Confluent Cloud Schema Registry と統合されています。Confluent Cloud Schema Registry を使用する前に、その 可用性と制限 を確認してください。
Confluent Cloud GUI の「Quick Start for Schema Management on Confluent Cloud」にある説明に従って、Confluent Cloud Schema Registry を有効にし、API キーおよびシークレットを作成して接続します。
ご使用の VPC が Confluent Cloud Schema Registry 公衆インターネットエンドポイントに接続できることを確認します。
Schema Registry に接続するための構成パラメーターを含むローカル構成ファイル(
$HOME/.confluent/java.config
など)をアップデートします。Confluent Cloud のテンプレート構成ファイル
# Required connection configs for Kafka producer, consumer, and admin bootstrap.servers={{ BROKER_ENDPOINT }} security.protocol=SASL_SSL sasl.jaas.config=org.apache.kafka.common.security.plain.PlainLoginModule required username='{{ CLUSTER_API_KEY }}' password='{{ CLUSTER_API_SECRET }}'; sasl.mechanism=PLAIN # Required for correctness in Apache Kafka clients prior to 2.6 client.dns.lookup=use_all_dns_ips # Best practice for Kafka producer to prevent data loss acks=all # Required connection configs for Confluent Cloud Schema Registry schema.registry.url=https://{{ SR_ENDPOINT }} basic.auth.credentials.source=USER_INFO basic.auth.user.info={{ SR_API_KEY }}:{{ SR_API_SECRET }}
ローカルホストのテンプレート構成ファイル
# Kafka bootstrap.servers=localhost:9092 # Confluent Schema Registry schema.registry.url=http://localhost:8081
Schema Registry サブジェクトのリストを表示して Confluent Cloud Schema Registry の資格情報を確認します。次の例の
{{ SR_API_KEY }}
、{{ SR_API_SECRET }}
、および{{ SR_ENDPOINT }}
に値を代入します。curl -u {{ SR_API_KEY }}:{{ SR_API_SECRET }} https://{{ SR_ENDPOINT }}/subjects
Avro レコードの生成¶
REST Proxy が接続されている Kafka のクラスター ID を入手します。
KAFKA_CLUSTER_ID=$(docker-compose exec rest-proxy curl -X GET \ "http://localhost:8082/v3/clusters/" | jq -r ".data[0].cluster_id")
パラメーター
KAFKA_CLUSTER_ID
の値が有効であることを確認します。このチュートリアルのサンプルの場合は、lkc-56ngz
と表示されますが、実際の出力では異なります。REST Proxy API v3 の
AdminClient
機能を使用して Kafka トピックtest2
を作成します。REST Proxy が Confluent Cloud にバックアップされる場合は、レプリケーション係数を3
に構成します。docker-compose exec rest-proxy curl -X POST \ -H "Content-Type: application/json" \ -d "{\"topic_name\":\"test2\",\"partitions_count\":6,\"replication_factor\":3,\"configs\":[]}" \ "http://localhost:8082/v3/clusters/${KAFKA_CLUSTER_ID}/topics" | jq .
以下のように出力されることを確認します。
{ "kind": "KafkaTopic", "metadata": { "self": "http://rest-proxy:8082/v3/clusters/lkc-56ngz/topics/test2", "resource_name": "crn:///kafka=lkc-56ngz/topic=test2" }, "cluster_id": "lkc-56ngz", "topic_name": "test2", "is_internal": false, "replication_factor": 3, "partitions": { "related": "http://rest-proxy:8082/v3/clusters/lkc-56ngz/topics/test2/partitions" }, "configs": { "related": "http://rest-proxy:8082/v3/clusters/lkc-56ngz/topics/test2/configs" }, "partition_reassignments": { "related": "http://rest-proxy:8082/v3/clusters/lkc-56ngz/topics/test2/partitions/-/reassignment" } }
トピック
test2
の新しい Avro スキーマを Confluent Cloud Schema Registry に登録します。docker-compose exec rest-proxy curl -X POST -H "Content-Type: application/vnd.schemaregistry.v1+json" --data '{ "schema": "[ { \"type\":\"record\", \"name\":\"countInfo\", \"fields\": [ {\"name\":\"count\",\"type\":\"long\"}]} ]" }' -u "$SCHEMA_REGISTRY_BASIC_AUTH_USER_INFO" "$SCHEMA_REGISTRY_URL/subjects/test2-value/versions"
新しいスキーマ ID が出力に表示されることを確認します。
{"id":100001}
変数
schemaid
をスキーマ ID の値に設定します。schemaid=$(docker-compose exec rest-proxy curl -X GET -u "$SCHEMA_REGISTRY_BASIC_AUTH_USER_INFO" "$SCHEMA_REGISTRY_URL/subjects/test2-value/versions/latest" | jq '.id')
値
{"count":0}
、{"count":1}
、{"count":2}
で、3 つの Avro メッセージをトピックに生成します。リクエストの本体にスキーマ ID が含まれることに注意してください。docker-compose exec rest-proxy curl -X POST \ -H "Content-Type: application/vnd.kafka.avro.v2+json" \ -H "Accept: application/vnd.kafka.v2+json" \ --data '{"value_schema_id": '"$schemaid"', "records": [{"value": {"countInfo":{"count": 0}}},{"value": {"countInfo":{"count": 1}}},{"value": {"countInfo":{"count": 2}}}]}' \ "http://localhost:8082/topics/test2" | jq .
以下のように出力されることを確認します。
{ "offsets": [ { "partition": 4, "offset": 0, "error_code": null, "error": null }, { "partition": 4, "offset": 1, "error_code": null, "error": null }, { "partition": 4, "offset": 2, "error_code": null, "error": null } ], "key_schema_id": null, "value_schema_id": 100001 }
プロデューサー Avro コード を表示します。
Avro レコードの消費¶
コンシューマーグループ
cg2
に属するコンシューマーci2
を作成します。トピックの先頭で開始されるように、auto.offset.reset
にearliest
を指定します。docker-compose exec rest-proxy curl -X POST \ -H "Content-Type: application/vnd.kafka.v2+json" \ --data '{"name": "ci2", "format": "avro", "auto.offset.reset": "earliest"}' \ http://localhost:8082/consumers/cg2 | jq .
以下のように出力されることを確認します。
{ "instance_id": "ci2", "base_uri": "http://rest-proxy:8082/consumers/cg2/instances/ci2" }
コンシューマーをトピック
test2
にサブスクライブします。docker-compose exec rest-proxy curl -X POST \ -H "Content-Type: application/vnd.kafka.v2+json" \ --data '{"topics":["test2"]}' \ http://localhost:8082/consumers/cg2/instances/ci2/subscription | jq .
最初の応答にあるベース URL を使用してデータを消費します。間に 10 秒間のスリープを挟んで curl コマンドを 2 回発行します。これは、https://github.com/confluentinc/kafka-rest/issues/432 に記載された理由による、意図的な操作です。
docker-compose exec rest-proxy curl -X GET \ -H "Accept: application/vnd.kafka.avro.v2+json" \ http://localhost:8082/consumers/cg2/instances/ci2/records | jq . sleep 10 docker-compose exec rest-proxy curl -X GET \ -H "Accept: application/vnd.kafka.avro.v2+json" \ http://localhost:8082/consumers/cg2/instances/ci2/records | jq .
以下のように出力されることを確認します。
[] [ { "topic": "test2", "key": null, "value": { "count": 0 }, "partition": 0, "offset": 0 }, { "topic": "test2", "key": null, "value": { "count": 1 }, "partition": 0, "offset": 1 }, { "topic": "test2", "key": null, "value": { "count": 2 }, "partition": 0, "offset": 2 } ]
コンシューマーインスタンスを削除して、そのリソースをクリーンアップします。
docker-compose exec rest-proxy curl -X DELETE \ -H "Content-Type: application/vnd.kafka.v2+json" \ http://localhost:8082/consumers/cg2/instances/ci2 | jq .
コンシューマー Avro コード を表示します。
Confluent Cloud Schema Registry¶
Confluent Cloud Schema Registry に登録されたスキーマサブジェクトを表示します。次の出力の
<SR API KEY>
、<SR API SECRET>
、および<SR ENDPOINT>
に値を代入します。curl -u <SR API KEY>:<SR API SECRET> https://<SR ENDPOINT>/subjects
サブジェクト
test2-value
が存在することを確認します。["test2-value"]
サブジェクト test2-value のスキーマ情報を表示します。次の出力の
<SR API KEY>
、<SR API SECRET>
、および<SR ENDPOINT>
に値を代入します。curl -u <SR API KEY>:<SR API SECRET> https://<SR ENDPOINT>/subjects/test2-value/versions/1
サブジェクト
test2-value
のスキーマ情報を確認します。{"subject":"test2-value","version":1,"id":100001,"schema":"[{\"type\":\"record\",\"name\":\"countInfo\",\"fields\":[{\"name\":\"count\",\"type\":\"long\"}]}]"}
停止¶
次のコマンドを実行して Docker を停止します。
docker-compose down