REST Proxy: Apache Kafka® のサンプル¶
このチュートリアルでは、Confluent REST Proxy を使用して Apache Kafka® クラスターにメッセージを生成し、このクラスターからメッセージを消費します。
チュートリアルの実行後に、提供された ソースコード をリファレンスとして使用して、独自の Kafka クライアントアプリケーションを開発します。
前提条件¶
クライアント¶
- Docker バージョン 17.06.1-ce
- Docker Compose バージョン 1.25.4
wget
Kafka クラスター¶
- このチュートリアルは、どの環境の Kafka クラスターにも対応しています。
- Confluent Cloud 内
- ローカルホスト 上
- 任意のリモート Kafka クラスター
- Confluent Cloud で実行している場合は、API キーおよびシークレットで Confluent Cloud クラスターにアクセスできることが必要です。
- Confluent Cloud にサインアップし、プロモーションコード
C50INTEG
を使用した最初の 20 名のユーザーは、$50 相当を無料で使用できます(詳細)。 - Confluent Cloud で Kafka クラスター、資格情報、および ACL を自動的に作成する方法については、「Confluent Cloud 向け ccloud-stack ユーティリティ」を参照してください。
- Confluent Cloud にサインアップし、プロモーションコード
セットアップ¶
confluentinc/examples GitHub リポジトリのクローンを作成し、
6.1.5-post
ブランチをチェックアウトします。git clone https://github.com/confluentinc/examples cd examples git checkout 6.1.5-post
REST Proxy のサンプルのディレクトリに変更します。
cd clients/cloud/rest-proxy/
Kafka クラスターに接続するための構成パラメーターを含むローカルファイル(
$HOME/.confluent/java.config
など)を作成します。以下のテンプレートのいずれかをベースとして、クラスターへの接続情報でこのファイルをカスタマイズします。{{ BROKER_ENDPOINT }}
、{{CLUSTER_API_KEY }}
、および{{ CLUSTER_API_SECRET }}
に値を代入します(この値を手作業で見つける方法については、「Configure Confluent Cloud Clients」を参照してください。 または、Confluent Cloud 向け ccloud-stack ユーティリティ を使用して値を自動作成します)。Confluent Cloud のテンプレート構成ファイル
# Required connection configs for Kafka producer, consumer, and admin bootstrap.servers={{ BROKER_ENDPOINT }} security.protocol=SASL_SSL sasl.jaas.config=org.apache.kafka.common.security.plain.PlainLoginModule required username='{{ CLUSTER_API_KEY }}' password='{{ CLUSTER_API_SECRET }}'; sasl.mechanism=PLAIN # Required for correctness in Apache Kafka clients prior to 2.6 client.dns.lookup=use_all_dns_ips # Best practice for Kafka producer to prevent data loss acks=all
ローカルホストのテンプレート構成ファイル
# Kafka bootstrap.servers=localhost:9092
ENV 変数のファイルを生成します。Docker は、これを使用してブートストラップサーバーとセキュリティ構成を設定します。
../../../ccloud/ccloud-generate-cp-configs.sh $HOME/.confluent/java.config
生成した
ENV
変数のファイルをソースにします。source ./delta_configs/env.delta
cp-all-in-one-cloud docker-compose.yml ファイルを入手します。このファイルは、ローカルホストのコンテナーで Confluent Platform を実行し、Confluent Cloud に接続するようにコンテナーを自動的に構成します。
wget -O docker-compose.yml https://raw.githubusercontent.com/confluentinc/cp-all-in-one/6.1.5-post/cp-all-in-one-cloud/docker-compose.yml
REST Proxy 構成全体については、前の手順でダウンロードした
docker-compose.yml
ファイルの REST Proxy セクションを参照してください。cat docker-compose.yml
基本プロデューサーおよびコンシューマー¶
このサンプルでは、プロデューサーアプリケーションが Kafka クラスターのトピックに Kafka データを書き込みます。Kafka クラスターにトピックがまだ存在しない場合、プロデューサーアプリケーションは Kafka Admin Client API を使用してトピックを作成します。Kafka に書き込まれる各レコードには、ユーザー名(alice
など)を表すキーと、json フォーマットのカウント値({"count": 0}
など)があります。コンシューマーアプリケーションは、同じ Kafka トピックを読み取り、各レコードの処理中にカウントのローリング合計を保持します。
レコードの生成¶
このセクションでは Schema Registry を使用しないため、
docker-compose.yml
ファイルの以下の行はコメントアウトしてください。#KAFKA_REST_SCHEMA_REGISTRY_URL: $SCHEMA_REGISTRY_URL #KAFKA_REST_CLIENT_BASIC_AUTH_CREDENTIALS_SOURCE: $BASIC_AUTH_CREDENTIALS_SOURCE #KAFKA_REST_CLIENT_SCHEMA_REGISTRY_BASIC_AUTH_USER_INFO: $SCHEMA_REGISTRY_BASIC_AUTH_USER_INFO
次のコマンドを実行して REST Proxy Docker コンテナーを起動します。
docker-compose up -d rest-proxy
REST Proxy のログを表示し、サーバーが起動されたことを確認するログメッセージ
Server started, listening for requests
が表示されるまで待ちます。docker-compose logs -f rest-proxy
REST Proxy が接続されている Kafka のクラスター ID を入手します。
KAFKA_CLUSTER_ID=$(docker-compose exec rest-proxy curl -X GET \ "http://localhost:8082/v3/clusters/" | jq -r ".data[0].cluster_id")
パラメーター
KAFKA_CLUSTER_ID
の値が有効であることを確認します。このチュートリアルのサンプルの場合は、lkc-56ngz
と表示されますが、実際の出力では異なります。echo $KAFKA_CLUSTER_ID
REST Proxy API v3 の
AdminClient
機能を使用して Kafka トピックtest1
を作成します。docker-compose exec rest-proxy curl -X POST \ -H "Content-Type: application/json" \ -d "{\"topic_name\":\"test1\",\"partitions_count\":6,\"configs\":[]}" \ "http://localhost:8082/v3/clusters/${KAFKA_CLUSTER_ID}/topics" | jq .
以下のように出力されることを確認します。
{ "kind": "KafkaTopic", "metadata": { "self": "http://rest-proxy:8082/v3/clusters/lkc-56ngz/topics/test1", "resource_name": "crn:///kafka=lkc-56ngz/topic=test1" }, "cluster_id": "lkc-56ngz", "topic_name": "test2", "is_internal": false, "replication_factor": 0, "partitions": { "related": "http://rest-proxy:8082/v3/clusters/lkc-56ngz/topics/test2/partitions" }, "configs": { "related": "http://rest-proxy:8082/v3/clusters/lkc-56ngz/topics/test2/configs" }, "partition_reassignments": { "related": "http://rest-proxy:8082/v3/clusters/lkc-56ngz/topics/test1/partitions/-/reassignment" } }
キー
alice
、および値{"count":0}
、{"count":1}
、{"count":2}
で、3 つの JSON メッセージをトピックに生成します。docker-compose exec rest-proxy curl -X POST \ -H "Content-Type: application/vnd.kafka.json.v2+json" \ -H "Accept: application/vnd.kafka.v2+json" \ --data '{"records":[{"key":"alice","value":{"count":0}},{"key":"alice","value":{"count":1}},{"key":"alice","value":{"count":2}}]}' \ "http://localhost:8082/topics/test1" | jq .
以下のように出力されることを確認します。
{ "offsets": [ { "partition": 0, "offset": 0, "error_code": null, "error": null }, { "partition": 0, "offset": 1, "error_code": null, "error": null }, { "partition": 0, "offset": 2, "error_code": null, "error": null } ], "key_schema_id": null, "value_schema_id": null }
プロデューサーコード を表示します。
レコードの消費¶
コンシューマーグループ
cg1
に属するコンシューマーci1
を作成します。トピックの先頭で開始されるように、auto.offset.reset
にearliest
を指定します。docker-compose exec rest-proxy curl -X POST \ -H "Content-Type: application/vnd.kafka.v2+json" \ --data '{"name": "ci1", "format": "json", "auto.offset.reset": "earliest"}' \ http://localhost:8082/consumers/cg1 | jq .
以下のように出力されることを確認します。
{ "instance_id": "ci1", "base_uri": "http://rest-proxy:8082/consumers/cg1/instances/ci1" }
コンシューマーをトピック
test1
にサブスクライブします。docker-compose exec rest-proxy curl -X POST \ -H "Content-Type: application/vnd.kafka.v2+json" \ --data '{"topics":["test1"]}' \ http://localhost:8082/consumers/cg1/instances/ci1/subscription | jq .
最初の応答にあるベース URL を使用してデータを消費します。間に 10 秒間のスリープを挟んで curl コマンドを 2 回発行します。これは、https://github.com/confluentinc/kafka-rest/issues/432 に記載された理由による、意図的な操作です。
docker-compose exec rest-proxy curl -X GET \ -H "Accept: application/vnd.kafka.json.v2+json" \ http://localhost:8082/consumers/cg1/instances/ci1/records | jq . sleep 10 docker-compose exec rest-proxy curl -X GET \ -H "Accept: application/vnd.kafka.json.v2+json" \ http://localhost:8082/consumers/cg1/instances/ci1/records | jq .
以下のように出力されることを確認します。
[] [ { "topic": "test1", "key": "alice", "value": { "count": 0 }, "partition": 0, "offset": 0 }, { "topic": "test1", "key": "alice", "value": { "count": 1 }, "partition": 0, "offset": 1 }, { "topic": "test1", "key": "alice", "value": { "count": 2 }, "partition": 0, "offset": 2 } ]
コンシューマーインスタンスを削除して、そのリソースをクリーンアップします。
docker-compose exec rest-proxy curl -X DELETE \ -H "Content-Type: application/vnd.kafka.v2+json" \ http://localhost:8082/consumers/cg1/instances/ci1 | jq .
コンシューマーコード を表示します。
REST Proxy の停止¶
次のコマンドを実行して Docker を停止します。
docker-compose down
Avro と Confluent Cloud Schema Registry¶
このサンプルは、前のサンプルと似ていますが、値は Avro フォーマットです。また、Confluent Cloud Schema Registry と統合されています。Confluent Cloud Schema Registry を使用する前に、その 可用性と制限 を確認してください。
Confluent Cloud GUI の「Confluent Cloud におけるスキーマ管理のクイックスタート」にある説明に従って、Confluent Cloud Schema Registry を有効にし、API キーおよびシークレットを作成して接続します。
ご使用の VPC が Confluent Cloud Schema Registry 公衆インターネットエンドポイントに接続できることを確認します。
Schema Registry に接続するための構成パラメーターを含むローカル構成ファイル(
$HOME/.confluent/java.config
など)をアップデートします。Confluent Cloud のテンプレート構成ファイル
# Required connection configs for Kafka producer, consumer, and admin bootstrap.servers={{ BROKER_ENDPOINT }} security.protocol=SASL_SSL sasl.jaas.config=org.apache.kafka.common.security.plain.PlainLoginModule required username='{{ CLUSTER_API_KEY }}' password='{{ CLUSTER_API_SECRET }}'; sasl.mechanism=PLAIN # Required for correctness in Apache Kafka clients prior to 2.6 client.dns.lookup=use_all_dns_ips # Best practice for Kafka producer to prevent data loss acks=all # Required connection configs for Confluent Cloud Schema Registry schema.registry.url=https://{{ SR_ENDPOINT }} basic.auth.credentials.source=USER_INFO basic.auth.user.info={{ SR_API_KEY }}:{{ SR_API_SECRET }}
ローカルホストのテンプレート構成ファイル
# Kafka bootstrap.servers=localhost:9092 # Confluent Schema Registry schema.registry.url=http://localhost:8081
Schema Registry サブジェクトのリストを表示して Confluent Cloud Schema Registry の資格情報を確認します。次の例の
{{ SR_API_KEY }}
、{{ SR_API_SECRET }}
、および{{ SR_ENDPOINT }}
に値を代入します。curl -u {{ SR_API_KEY }}:{{ SR_API_SECRET }} https://{{ SR_ENDPOINT }}/subjects
ENV 変数のファイルを再生成します。Docker は、これを使用してブートストラップサーバーとセキュリティ構成を設定します。
../../../ccloud/ccloud-generate-cp-configs.sh $HOME/.confluent/java.config
source コマンドで、再生成した
ENV
変数のファイルの内容を反映させます。source ./delta_configs/env.delta
Avro レコードの生成¶
このセクションでは Schema Registry を使用するため、
docker-compose.yml
ファイルの以下の行のコメントを解除してください。KAFKA_REST_SCHEMA_REGISTRY_URL: $SCHEMA_REGISTRY_URL KAFKA_REST_CLIENT_BASIC_AUTH_CREDENTIALS_SOURCE: $BASIC_AUTH_CREDENTIALS_SOURCE KAFKA_REST_CLIENT_SCHEMA_REGISTRY_BASIC_AUTH_USER_INFO: $SCHEMA_REGISTRY_BASIC_AUTH_USER_INFO
次のコマンドを実行して REST Proxy Docker コンテナーを起動します。
docker-compose up -d rest-proxy
REST Proxy のログを表示し、サーバーが起動されたことを確認するログメッセージ
Server started, listening for requests
が表示されるまで待ちます。docker-compose logs -f rest-proxy
REST Proxy が接続されている Kafka のクラスター ID を入手します。
KAFKA_CLUSTER_ID=$(docker-compose exec rest-proxy curl -X GET \ "http://localhost:8082/v3/clusters/" | jq -r ".data[0].cluster_id")
パラメーター
KAFKA_CLUSTER_ID
の値が有効であることを確認します。このチュートリアルのサンプルの場合は、lkc-56ngz
と表示されますが、実際の出力では異なります。REST Proxy API v3 の
AdminClient
機能を使用して Kafka トピックtest2
を作成します。docker-compose exec rest-proxy curl -X POST \ -H "Content-Type: application/json" \ -d "{\"topic_name\":\"test2\",\"partitions_count\":6,\"configs\":[]}" \ "http://localhost:8082/v3/clusters/${KAFKA_CLUSTER_ID}/topics" | jq .
以下のように出力されることを確認します。
{ "kind": "KafkaTopic", "metadata": { "self": "http://rest-proxy:8082/v3/clusters/lkc-56ngz/topics/test2", "resource_name": "crn:///kafka=lkc-56ngz/topic=test2" }, "cluster_id": "lkc-56ngz", "topic_name": "test2", "is_internal": false, "replication_factor": 0, "partitions": { "related": "http://rest-proxy:8082/v3/clusters/lkc-56ngz/topics/test2/partitions" }, "configs": { "related": "http://rest-proxy:8082/v3/clusters/lkc-56ngz/topics/test2/configs" }, "partition_reassignments": { "related": "http://rest-proxy:8082/v3/clusters/lkc-56ngz/topics/test2/partitions/-/reassignment" } }
トピック
test2
の新しい Avro スキーマを Confluent Cloud Schema Registry に登録します。docker-compose exec rest-proxy curl -X POST -H "Content-Type: application/vnd.schemaregistry.v1+json" --data '{ "schema": "[ { \"type\":\"record\", \"name\":\"countInfo\", \"fields\": [ {\"name\":\"count\",\"type\":\"long\"}]} ]" }' -u "$SCHEMA_REGISTRY_BASIC_AUTH_USER_INFO" "$SCHEMA_REGISTRY_URL/subjects/test2-value/versions"
新しいスキーマ ID が出力に表示されることを確認します。
{"id":100001}
変数
schemaid
をスキーマ ID の値に設定します。schemaid=$(docker-compose exec rest-proxy curl -X GET -u "$SCHEMA_REGISTRY_BASIC_AUTH_USER_INFO" "$SCHEMA_REGISTRY_URL/subjects/test2-value/versions/latest" | jq '.id')
値
{"count":0}
、{"count":1}
、{"count":2}
で、3 つの Avro メッセージをトピックに生成します。リクエストの本体にスキーマ ID が含まれることに注意してください。docker-compose exec rest-proxy curl -X POST \ -H "Content-Type: application/vnd.kafka.avro.v2+json" \ -H "Accept: application/vnd.kafka.v2+json" \ --data '{"value_schema_id": '"$schemaid"', "records": [{"value": {"countInfo":{"count": 0}}},{"value": {"countInfo":{"count": 1}}},{"value": {"countInfo":{"count": 2}}}]}' \ "http://localhost:8082/topics/test2" | jq .
以下のように出力されることを確認します。
{ "offsets": [ { "partition": 4, "offset": 0, "error_code": null, "error": null }, { "partition": 4, "offset": 1, "error_code": null, "error": null }, { "partition": 4, "offset": 2, "error_code": null, "error": null } ], "key_schema_id": null, "value_schema_id": 100001 }
プロデューサー Avro コード を表示します。
Avro レコードの消費¶
コンシューマーグループ
cg2
に属するコンシューマーci2
を作成します。トピックの先頭で開始されるように、auto.offset.reset
にearliest
を指定します。docker-compose exec rest-proxy curl -X POST \ -H "Content-Type: application/vnd.kafka.v2+json" \ --data '{"name": "ci2", "format": "avro", "auto.offset.reset": "earliest"}' \ http://localhost:8082/consumers/cg2 | jq .
以下のように出力されることを確認します。
{ "instance_id": "ci2", "base_uri": "http://rest-proxy:8082/consumers/cg2/instances/ci2" }
コンシューマーをトピック
test2
にサブスクライブします。docker-compose exec rest-proxy curl -X POST \ -H "Content-Type: application/vnd.kafka.v2+json" \ --data '{"topics":["test2"]}' \ http://localhost:8082/consumers/cg2/instances/ci2/subscription | jq .
最初の応答にあるベース URL を使用してデータを消費します。間に 10 秒間のスリープを挟んで curl コマンドを 2 回発行します。これは、https://github.com/confluentinc/kafka-rest/issues/432 に記載された理由による、意図的な操作です。
docker-compose exec rest-proxy curl -X GET \ -H "Accept: application/vnd.kafka.avro.v2+json" \ http://localhost:8082/consumers/cg2/instances/ci2/records | jq . sleep 10 docker-compose exec rest-proxy curl -X GET \ -H "Accept: application/vnd.kafka.avro.v2+json" \ http://localhost:8082/consumers/cg2/instances/ci2/records | jq .
以下のように出力されることを確認します。
[] [ { "topic": "test2", "key": null, "value": { "count": 0 }, "partition": 0, "offset": 0 }, { "topic": "test2", "key": null, "value": { "count": 1 }, "partition": 0, "offset": 1 }, { "topic": "test2", "key": null, "value": { "count": 2 }, "partition": 0, "offset": 2 } ]
コンシューマーインスタンスを削除して、そのリソースをクリーンアップします。
docker-compose exec rest-proxy curl -X DELETE \ -H "Content-Type: application/vnd.kafka.v2+json" \ http://localhost:8082/consumers/cg2/instances/ci2 | jq .
コンシューマー Avro コード を表示します。
Confluent Cloud Schema Registry¶
Confluent Cloud Schema Registry に登録されたスキーマサブジェクトを表示します。次の出力の
<SR API KEY>
、<SR API SECRET>
、および<SR ENDPOINT>
に値を代入します。curl -u <SR API KEY>:<SR API SECRET> https://<SR ENDPOINT>/subjects
サブジェクト
test2-value
が存在することを確認します。["test2-value"]
サブジェクト test2-value のスキーマ情報を表示します。次の出力の
<SR API KEY>
、<SR API SECRET>
、および<SR ENDPOINT>
に値を代入します。curl -u <SR API KEY>:<SR API SECRET> https://<SR ENDPOINT>/subjects/test2-value/versions/1
サブジェクト
test2-value
のスキーマ情報を確認します。{"subject":"test2-value","version":1,"id":100001,"schema":"[{\"type\":\"record\",\"name\":\"countInfo\",\"fields\":[{\"name\":\"count\",\"type\":\"long\"}]}]"}
REST Proxy の停止¶
次のコマンドを実行して Docker を停止します。
docker-compose down