REST Proxy のクイックスタート¶
まず、REST Proxy と、それが依存するサービス(ZooKeeper、Kafka、および Schema Registry)を実行します。Confluent CLI confluent local コマンドを使用すると、1 つのコマンドでこの操作を実行できます。
confluent local コマンドは、単一ノードの開発環境向けであり、本稼働環境には適していません。生成されるデータは一過性で、一時的なものです。本稼働環境対応のワークフローについては、「Confluent Platform のインストールおよびアップグレード」を参照してください。
- 前提条件
- Confluent Platform
- Confluent CLI (個別のインストールが必要)
confluent local services kafka-rest start
bin/zookeeper-server-start ./etc/kafka/
bin/kafka-server-start ./etc/kafka/
bin/kafka-rest-start ./etc/kafka-rest/
# optional, if you want to use the Avro, JSON Schema, or Protobuf data format
bin/schema-registry-start ./etc/schema-registry/
これらのサービスを開始する方法について詳しくは、「Confluent Platform のクイックスタート」を参照してください。
JSON メッセージの生成と消費¶
# Produce a message using JSON with the value '{ "foo": "bar" }' to the topic jsontest
curl -X POST -H "Content-Type: application/vnd.kafka.json.v2+json" \
--data '{"records":[{"value":{"foo":"bar"}}]}' "http://localhost:8082/topics/jsontest"
# Expected output from preceding command
# Create a consumer for JSON data, starting at the beginning of the topic's
# log and subscribe to a topic. Then consume some data using the base URL in the first response.
# Finally, close the consumer with a DELETE to make it leave the group and clean up
# its resources.
curl -X POST -H "Content-Type: application/vnd.kafka.v2+json" \
--data '{"name": "my_consumer_instance", "format": "json", "auto.offset.reset": "earliest"}' \
# Expected output from preceding command
curl -X POST -H "Content-Type: application/vnd.kafka.v2+json" --data '{"topics":["jsontest"]}' \
# No content in response
curl -X GET -H "Accept: application/vnd.kafka.json.v2+json" \
# Expected output from preceding command
curl -X DELETE -H "Content-Type: application/vnd.kafka.v2+json" \
# No content in response
Avro メッセージの生成と消費¶
# Produce a message using Avro embedded data, including the schema which will
# be registered with schema registry and used to validate and serialize
# before storing the data in Kafka
curl -X POST -H "Content-Type: application/vnd.kafka.avro.v2+json" \
-H "Accept: application/vnd.kafka.v2+json" \
--data '{"value_schema": "{\"type\": \"record\", \"name\": \"User\", \"fields\": [{\"name\": \"name\", \"type\": \"string\"}]}", "records": [{"value": {"name": "testUser"}}]}' \
# Expected output from preceding command:
# Produce a message with Avro key and value.
# Note that if you use Avro values you must also use Avro keys, but the schemas can differ
curl -X POST -H "Content-Type: application/vnd.kafka.avro.v2+json" \
-H "Accept: application/vnd.kafka.v2+json" \
--data '{"key_schema": "{\"name\":\"user_id\" ,\"type\": \"int\" }", "value_schema": "{\"type\": \"record\", \"name\": \"User\", \"fields\": [{\"name\": \"name\", \"type\": \"string\"}]}", "records": [{"key" : 1 , "value": {"name": "testUser"}}]}' \
# Expected output from preceding command:
# Create a consumer for Avro data, starting at the beginning of the topic's
# log and subscribe to a topic. Then consume some data from a topic, which is decoded, translated to
# JSON, and included in the response. The schema used for deserialization is
# fetched automatically from schema registry. Finally, clean up.
curl -X POST -H "Content-Type: application/vnd.kafka.v2+json" \
--data '{"name": "my_consumer_instance", "format": "avro", "auto.offset.reset": "earliest"}' \
# Expected output from preceding command:
curl -X POST -H "Content-Type: application/vnd.kafka.v2+json" --data '{"topics":["avrotest"]}' \
# No content in response
curl -X GET -H "Accept: application/vnd.kafka.avro.v2+json" \
# Expected output from preceding command:
curl -X DELETE -H "Content-Type: application/vnd.kafka.v2+json" \
# No content in response
# Produce a message using binary embedded data with value "Kafka" to the topic binarytest
curl -X POST -H "Content-Type: application/vnd.kafka.binary.v2+json" \
-H "Accept: application/vnd.kafka.v2+json" \
--data '{"records":[{"value":"S2Fma2E="}]}' "http://localhost:8082/topics/binarytest"
# Expected output from preceding command:
# Create a consumer for binary data, starting at the beginning of the topic's
# log. Then consume some data from a topic using the base URL in the first response.
# Finally, close the consumer with a DELETE to make it leave the group and clean up
# its resources.
curl -X POST -H "Content-Type: application/vnd.kafka.v2+json" \
--data '{"name": "my_consumer_instance", "format": "binary", "auto.offset.reset": "earliest"}' \
# Expected output from preceding command:
curl -X POST -H "Content-Type: application/vnd.kafka.v2+json" --data '{"topics":["binarytest"]}' \
# No content in response
curl -X GET -H "Accept: application/vnd.kafka.binary.v2+json" \
# Expected output from preceding command:
curl -X DELETE -H "Content-Type: application/vnd.kafka.v2+json" \
# No content in response
Protobuf メッセージの生成と消費¶
# Produce a message using Protobuf embedded data, including the schema which will
# be registered with schema registry and used to validate and serialize
# before storing the data in Kafka
curl -X POST -H "Content-Type: application/vnd.kafka.protobuf.v2+json" \
-H "Accept: application/vnd.kafka.v2+json" \
--data '{"value_schema": "syntax=\"proto3\"; message User { string name = 1; }", "records": [{"value": {"name": "testUser"}}]}' \
# Expected output from preceding command:
# Create a consumer for Protobuf data, starting at the beginning of the topic's
# log and subscribe to a topic. Then consume some data from a topic, which is decoded, translated to
# JSON, and included in the response. The schema used for deserialization is
# fetched automatically from schema registry. Finally, clean up.
curl -X POST -H "Content-Type: application/vnd.kafka.protobuf.v2+json" \
--data '{"name": "my_consumer_instance", "format": "protobuf", "auto.offset.reset": "earliest"}' \
# Expected output from preceding command:
curl -X POST -H "Content-Type: application/vnd.kafka.protobuf.v2+json" --data '{"topics":["protobuftest"]}' \
# No content in response
curl -X GET -H "Accept: application/vnd.kafka.protobuf.v2+json" \
# Expected output from preceding command:
curl -X DELETE -H "Content-Type: application/vnd.kafka.protobuf.v2+json" \
# No content in response
JSON スキーマメッセージの生成と消費¶
# Produce a message using JSON schema embedded data, including the schema which will
# be registered with schema registry and used to validate and serialize
# before storing the data in Kafka
curl -X POST -H "Content-Type: application/vnd.kafka.jsonschema.v2+json" \
-H "Accept: application/vnd.kafka.v2+json" \
--data '{"value_schema": "{\"type\":\"object\",\"properties\":{\"name\":{\"type\":\"string\"}}}", "records": [{"value": {"name": "testUser"}}]}' \
# Expected output from preceding command:
# Create a consumer for JSON schema data, starting at the beginning of the topic's
# log and subscribe to a topic. Then consume some data from a topic, which is decoded, translated to
# JSON, and included in the response. The schema used for deserialization is
# fetched automatically from schema registry. Finally, clean up.
curl -X POST -H "Content-Type: application/vnd.kafka.jsonschema.v2+json" \
--data '{"name": "my_consumer_instance", "format": "jsonschema", "auto.offset.reset": "earliest"}' \
# Expected output from preceding command:
curl -X POST -H "Content-Type: application/vnd.kafka.jsonschema.v2+json" --data '{"topics":["jsonschematest"]}' \
# No content in response
curl -X GET -H "Accept: application/vnd.kafka.jsonschema.v2+json" \
# Expected output from preceding command:
curl -X DELETE -H "Content-Type: application/vnd.kafka.jsonschema.v2+json" \
# No content in response
# Get a list of topics
curl "http://localhost:8082/topics"
# Expected output from preceding command:
# Get info about one topic
curl "http://localhost:8082/topics/avrotest"
# Expected output from preceding command:
# Get info about a topic's partitions
curl "http://localhost:8082/topics/avrotest/partitions"
# Expected output from preceding command:
- Confluent REST Proxy を使用して Kafka クラスターでデータを生成したり消費したりするための実践的な例については、 Confluent REST Proxy チュートリアル を参照してください。
- セキュリティが構成された REST Proxy を使用する例については、 Confluent Platform のデモ を参照してください。