REST Proxy Quick Start¶
Start by running the REST Proxy and the services it depends on: ZooKeeper, Kafka, and Schema Registry. You can do this in one command with the Confluent CLI confluent local commands.
Important
The confluent local commands are intended for a single-node development environment and are not suitable for a production environment. The data that are produced are transient and are intended to be temporary. For production-ready workflows, see Install and Upgrade Confluent Platform.
- Prerequisites
- Confluent Platform
- Confluent CLI (requires separate installation)
confluent local services kafka-rest start
Each service reads its configuration from its property files under etc
.
Note
To manually start each service in its own terminal, run instead:
bin/zookeeper-server-start ./etc/kafka/zookeeper.properties
bin/kafka-server-start ./etc/kafka/server.properties
bin/kafka-rest-start ./etc/kafka-rest/kafka-rest.properties
# optional, if you want to use the Avro, JSON Schema, or Protobuf data format
bin/schema-registry-start ./etc/schema-registry/schema-registry.properties
See the Confluent Platform quickstart for a more detailed explanation of how to get these services up and running.
Produce and Consume JSON Messages¶
# Produce a message using JSON with the value '{ "foo": "bar" }' to the topic jsontest
curl -X POST -H "Content-Type: application/vnd.kafka.json.v2+json" \
--data '{"records":[{"value":{"foo":"bar"}}]}' "http://localhost:8082/topics/jsontest"
# Expected output from preceding command
{
"offsets":[{"partition":0,"offset":0,"error_code":null,"error":null}],"key_schema_id":null,"value_schema_id":null
}
# Create a consumer for JSON data in "my_json_consumer_group" consumer group, starting at the beginning of the topic's
# log and subscribe to a topic. Then consume some data using the base URL in the first response.
# Finally, close the consumer with a DELETE to make it leave the group and clean up
# its resources.
curl -X POST -H "Content-Type: application/vnd.kafka.v2+json" \
--data '{"name": "my_consumer_instance", "format": "json", "auto.offset.reset": "earliest"}' \
http://localhost:8082/consumers/my_json_consumer_group
# Expected output from preceding command
{
"instance_id":"my_consumer_instance",
"base_uri":"http://localhost:8082/consumers/my_json_consumer_group/instances/my_consumer_instance"
}
curl -X POST -H "Content-Type: application/vnd.kafka.v2+json" --data '{"topics":["jsontest"]}' \
http://localhost:8082/consumers/my_json_consumer_group/instances/my_consumer_instance/subscription
# No content in response
curl -X GET -H "Accept: application/vnd.kafka.json.v2+json" \
http://localhost:8082/consumers/my_json_consumer_group/instances/my_consumer_instance/records
# Expected output from preceding command
[
{"key":null,"value":{"foo":"bar"},"partition":0,"offset":0,"topic":"jsontest"}
]
curl -X DELETE -H "Content-Type: application/vnd.kafka.v2+json" \
http://localhost:8082/consumers/my_json_consumer_group/instances/my_consumer_instance
# No content in response
Produce and Consume Avro Messages¶
# Produce a message using Avro embedded data, including the schema which will
# be registered with schema registry and used to validate and serialize
# before storing the data in Kafka
curl -X POST -H "Content-Type: application/vnd.kafka.avro.v2+json" \
-H "Accept: application/vnd.kafka.v2+json" \
--data '{"value_schema": "{\"type\": \"record\", \"name\": \"User\", \"fields\": [{\"name\": \"name\", \"type\": \"string\"}]}", "records": [{"value": {"name": "testUser"}}]}' \
"http://localhost:8082/topics/avrotest"
# Expected output from preceding command:
{"offsets":[{"partition":0,"offset":0,"error_code":null,"error":null}],"key_schema_id":null,"value_schema_id":21}
# Produce a message with Avro key and value.
# Note that if you use Avro values you must also use Avro keys, but the schemas can differ
curl -X POST -H "Content-Type: application/vnd.kafka.avro.v2+json" \
-H "Accept: application/vnd.kafka.v2+json" \
--data '{"key_schema": "{\"name\":\"user_id\" ,\"type\": \"int\" }", "value_schema": "{\"type\": \"record\", \"name\": \"User\", \"fields\": [{\"name\": \"name\", \"type\": \"string\"}]}", "records": [{"key" : 1 , "value": {"name": "testUser"}}]}' \
"http://localhost:8082/topics/avrokeytest2"
# Expected output from preceding command:
{"offsets":[{"partition":0,"offset":0,"error_code":null,"error":null}],"key_schema_id":2,"value_schema_id":1}
# Create a consumer for Avro data in "my_avro_consumer_group" consumer group, starting at the beginning of the topic's
# log and subscribe to a topic. Then consume some data from a topic, which is decoded, translated to
# JSON, and included in the response. The schema used for deserialization is
# fetched automatically from schema registry. Finally, clean up.
curl -X POST -H "Content-Type: application/vnd.kafka.v2+json" \
--data '{"name": "my_consumer_instance", "format": "avro", "auto.offset.reset": "earliest"}' \
http://localhost:8082/consumers/my_avro_consumer_group
# Expected output from preceding command:
{"instance_id":"my_consumer_instance","base_uri":"http://localhost:8082/consumers/my_avro_consumer_group/instances/my_consumer_instance"}
curl -X POST -H "Content-Type: application/vnd.kafka.v2+json" --data '{"topics":["avrotest"]}' \
http://localhost:8082/consumers/my_avro_consumer_group/instances/my_consumer_instance/subscription
# No content in response
curl -X GET -H "Accept: application/vnd.kafka.avro.v2+json" \
http://localhost:8082/consumers/my_avro_consumer_group/instances/my_consumer_instance/records
# Expected output from preceding command:
[{"key":null,"value":{"name":"testUser"},"partition":0,"offset":1,"topic":"avrotest"}]
curl -X DELETE -H "Content-Type: application/vnd.kafka.v2+json" \
http://localhost:8082/consumers/my_avro_consumer_group/instances/my_consumer_instance
# No content in response
Produce and Consume Binary Messages¶
# Produce a message using binary embedded data with value "Kafka" to the topic binarytest
curl -X POST -H "Content-Type: application/vnd.kafka.binary.v2+json" \
-H "Accept: application/vnd.kafka.v2+json" \
--data '{"records":[{"value":"S2Fma2E="}]}' "http://localhost:8082/topics/binarytest"
# Expected output from preceding command:
{"offsets":[{"partition":0,"offset":0,"error_code":null,"error":null}],"key_schema_id":null,"value_schema_id":null}
# Create a consumer for binary data in "my_binary_consumer_group" consumer group, starting at the beginning of the topic's
# log. Then consume some data from a topic using the base URL in the first response.
# Finally, close the consumer with a DELETE to make it leave the group and clean up
# its resources.
curl -X POST -H "Content-Type: application/vnd.kafka.v2+json" \
--data '{"name": "my_consumer_instance", "format": "binary", "auto.offset.reset": "earliest"}' \
http://localhost:8082/consumers/my_binary_consumer_group
# Expected output from preceding command:
{"instance_id":"my_consumer_instance","base_uri":"http://localhost:8082/consumers/my_binary_consumer_group/instances/my_consumer_instance"}
curl -X POST -H "Content-Type: application/vnd.kafka.v2+json" --data '{"topics":["binarytest"]}' \
http://localhost:8082/consumers/my_binary_consumer_group/instances/my_consumer_instance/subscription
# No content in response
curl -X GET -H "Accept: application/vnd.kafka.binary.v2+json" \
http://localhost:8082/consumers/my_binary_consumer_group/instances/my_consumer_instance/records
# Expected output from preceding command:
[{"key":null,"value":"S2Fma2E=","partition":0,"offset":0,"topic":"binarytest"}]
curl -X DELETE -H "Content-Type: application/vnd.kafka.v2+json" \
http://localhost:8082/consumers/my_binary_consumer_group/instances/my_consumer_instance
# No content in response
Produce and Consume Protobuf Messages¶
# Produce a message using Protobuf embedded data, including the schema which will
# be registered with schema registry and used to validate and serialize
# before storing the data in Kafka
curl -X POST -H "Content-Type: application/vnd.kafka.protobuf.v2+json" \
-H "Accept: application/vnd.kafka.v2+json" \
--data '{"value_schema": "syntax=\"proto3\"; message User { string name = 1; }", "records": [{"value": {"name": "testUser"}}]}' \
"http://localhost:8082/topics/protobuftest"
# Expected output from preceding command:
{"offsets":[{"partition":0,"offset":0,"error_code":null,"error":null}],"key_schema_id":null,"value_schema_id":21}
# Create a consumer for Protobuf data in "my_protobuf_consumer_group" consumer group, starting at the beginning of the topic's
# log and subscribe to a topic. Then consume some data from a topic, which is decoded, translated to
# JSON, and included in the response. The schema used for deserialization is
# fetched automatically from schema registry. Finally, clean up.
curl -X POST -H "Content-Type: application/vnd.kafka.protobuf.v2+json" \
--data '{"name": "my_consumer_instance", "format": "protobuf", "auto.offset.reset": "earliest"}' \
http://localhost:8082/consumers/my_protobuf_consumer_group
# Expected output from preceding command:
{"instance_id":"my_consumer_instance","base_uri":"http://localhost:8082/consumers/my_protobuf_consumer_group/instances/my_consumer_instance"}
curl -X POST -H "Content-Type: application/vnd.kafka.protobuf.v2+json" --data '{"topics":["protobuftest"]}' \
http://localhost:8082/consumers/my_protobuf_consumer_group/instances/my_consumer_instance/subscription
# No content in response
curl -X GET -H "Accept: application/vnd.kafka.protobuf.v2+json" \
http://localhost:8082/consumers/my_protobuf_consumer_group/instances/my_consumer_instance/records
# Expected output from preceding command:
[{"key":null,"value":{"name":"testUser"},"partition":0,"offset":1,"topic":"protobuftest"}]
curl -X DELETE -H "Content-Type: application/vnd.kafka.protobuf.v2+json" \
http://localhost:8082/consumers/my_protobuf_consumer_group/instances/my_consumer_instance
# No content in response
Produce and Consume JSON Schema Messages¶
# Produce a message using JSON schema embedded data, including the schema which will
# be registered with schema registry and used to validate and serialize
# before storing the data in Kafka
curl -X POST -H "Content-Type: application/vnd.kafka.jsonschema.v2+json" \
-H "Accept: application/vnd.kafka.v2+json" \
--data '{"value_schema": "{\"type\":\"object\",\"properties\":{\"name\":{\"type\":\"string\"}}}", "records": [{"value": {"name": "testUser"}}]}' \
"http://localhost:8082/topics/jsonschematest"
# Expected output from preceding command:
{"offsets":[{"partition":0,"offset":0,"error_code":null,"error":null}],"key_schema_id":null,"value_schema_id":21}
# Create a consumer for JSON schema data in "my_jsonschema_consumer_group" consumer group, starting at the beginning of the topic's
# log and subscribe to a topic. Then consume some data from a topic, which is decoded, translated to
# JSON, and included in the response. The schema used for deserialization is
# fetched automatically from schema registry. Finally, clean up.
curl -X POST -H "Content-Type: application/vnd.kafka.jsonschema.v2+json" \
--data '{"name": "my_consumer_instance", "format": "jsonschema", "auto.offset.reset": "earliest"}' \
http://localhost:8082/consumers/my_jsonschema_consumer_group
# Expected output from preceding command:
{"instance_id":"my_consumer_instance","base_uri":"http://localhost:8082/consumers/my_jsonschema_consumer_group/instances/my_consumer_instance"}
curl -X POST -H "Content-Type: application/vnd.kafka.jsonschema.v2+json" --data '{"topics":["jsonschematest"]}' \
http://localhost:8082/consumers/my_jsonschema_consumer_group/instances/my_consumer_instance/subscription
# No content in response
curl -X GET -H "Accept: application/vnd.kafka.jsonschema.v2+json" \
http://localhost:8082/consumers/my_jsonschema_consumer_group/instances/my_consumer_instance/records
# Expected output from preceding command:
[{"key":null,"value":{"name":"testUser"},"partition":0,"offset":1,"topic":"jsonschematest"}]
curl -X DELETE -H "Content-Type: application/vnd.kafka.jsonschema.v2+json" \
http://localhost:8082/consumers/my_jsonschema_consumer_group/instances/my_consumer_instance
# No content in response
Inspect Topic Metadata¶
# Get a list of topics
curl "http://localhost:8082/topics"
# Expected output from preceding command:
["__consumer_offsets","_schemas","avrotest","binarytest","jsontest"]
# Get info about one topic
curl "http://localhost:8082/topics/avrotest"
# Expected output from preceding command:
{"name":"avrotest","configs":{"message.downconversion.enable":"true","file.delete.delay.ms":"60000",\
"segment.ms":"604800000","min.compaction.lag.ms":"0","retention.bytes":"-1","segment.index.bytes":"10485760",\
"cleanup.policy":"delete","follower.replication.throttled.replicas":"",\
"message.timestamp.difference.max.ms":"9223372036854775807","segment.jitter.ms":"0","preallocate":"false",\
"message.timestamp.type":"CreateTime","message.format.version":"2.0-IV1","segment.bytes":"1073741824",\
"unclean.leader.election.enable":"false","max.message.bytes":"1000012","retention.ms":"604800000",\
"flush.ms":"9223372036854775807","delete.retention.ms":"86400000","leader.replication.throttled.replicas":"",\
"min.insync.replicas":"1","flush.messages":"9223372036854775807","compression.type":"producer",\
"index.interval.bytes":"4096","min.cleanable.dirty.ratio":"0.5"},"partitions":\
[{"partition":0,"leader":0,"replicas":[{"broker":0,"leader":true,"in_sync":true}]}]}
...
# Get info about a topic's partitions
curl "http://localhost:8082/topics/avrotest/partitions"
# Expected output from preceding command:
[{"partition":0,"leader":0,"replicas":[{"broker":0,"leader":true,"in_sync":true}]}]
More Examples¶
- For a hands-on example that uses Confluent REST Proxy to produce and consume data from a Kafka cluster, see the Confluent REST Proxy tutorial.
- For an example that uses REST Proxy configured with security, see the Confluent Platform demo.