Important
You are viewing documentation for an older version of Confluent Platform. For the latest, click here.
REST Proxy Quick Start¶
Start by running the REST Proxy and the services it depends on: ZooKeeper, Kafka, and Schema Registry. You can do this in one command with the Confluent CLI confluent local commands.
Important
The confluent local commands are intended for a single-node development environment and are not suitable for a production environment. The data that are produced are transient and are intended to be temporary. For production-ready workflows, see Install and Upgrade Confluent Platform.
- Prerequisites
- Confluent Platform
- Confluent CLI (requires separate installation)
confluent local start kafka-rest
Each service reads its configuration from its property files under etc
.
Note
To manually start each service in its own terminal, run instead:
bin/zookeeper-server-start ./etc/kafka/zookeeper.properties
bin/kafka-server-start ./etc/kafka/server.properties
bin/kafka-rest-start ./etc/kafka-rest/kafka-rest.properties
# optional, if you want to use the Avro, JSON Schema, or Protobuf data format
bin/schema-registry-start ./etc/schema-registry/schema-registry.properties
See the Confluent Platform quickstart for a more detailed explanation of how to get these services up and running.
Produce and Consume JSON Messages¶
# Produce a message using JSON with the value '{ "foo": "bar" }' to the topic jsontest
curl -X POST -H "Content-Type: application/vnd.kafka.json.v2+json" \
-H "Accept: application/vnd.kafka.v2+json" \
--data '{"records":[{"value":{"foo":"bar"}}]}' "http://localhost:8082/topics/jsontest"
# Expected output from preceding command
{
"offsets":[{"partition":0,"offset":0,"error_code":null,"error":null}],"key_schema_id":null,"value_schema_id":null
}
# Create a consumer for JSON data, starting at the beginning of the topic's
# log and subscribe to a topic. Then consume some data using the base URL in the first response.
# Finally, close the consumer with a DELETE to make it leave the group and clean up
# its resources.
curl -X POST -H "Content-Type: application/vnd.kafka.v2+json" \
--data '{"name": "my_consumer_instance", "format": "json", "auto.offset.reset": "earliest"}' \
http://localhost:8082/consumers/my_json_consumer
# Expected output from preceding command
{
"instance_id":"my_consumer_instance",
"base_uri":"http://localhost:8082/consumers/my_json_consumer/instances/my_consumer_instance"
}
curl -X POST -H "Content-Type: application/vnd.kafka.v2+json" --data '{"topics":["jsontest"]}' \
http://localhost:8082/consumers/my_json_consumer/instances/my_consumer_instance/subscription
# No content in response
curl -X GET -H "Accept: application/vnd.kafka.json.v2+json" \
http://localhost:8082/consumers/my_json_consumer/instances/my_consumer_instance/records
# Expected output from preceding command
[
{"key":null,"value":{"foo":"bar"},"partition":0,"offset":0,"topic":"jsontest"}
]
curl -X DELETE -H "Content-Type: application/vnd.kafka.v2+json" \
http://localhost:8082/consumers/my_json_consumer/instances/my_consumer_instance
# No content in response
Produce and Consume Avro Messages¶
# Produce a message using Avro embedded data, including the schema which will
# be registered with schema registry and used to validate and serialize
# before storing the data in Kafka
curl -X POST -H "Content-Type: application/vnd.kafka.avro.v2+json" \
-H "Accept: application/vnd.kafka.v2+json" \
--data '{"value_schema": "{\"type\": \"record\", \"name\": \"User\", \"fields\": [{\"name\": \"name\", \"type\": \"string\"}]}", "records": [{"value": {"name": "testUser"}}]}' \
"http://localhost:8082/topics/avrotest"
# Expected output from preceding command:
{"offsets":[{"partition":0,"offset":0,"error_code":null,"error":null}],"key_schema_id":null,"value_schema_id":21}
# Produce a message with Avro key and value.
# Note that if you use Avro values you must also use Avro keys, but the schemas can differ
curl -X POST -H "Content-Type: application/vnd.kafka.avro.v2+json" \
-H "Accept: application/vnd.kafka.v2+json" \
--data '{"key_schema": "{\"name\":\"user_id\" ,\"type\": \"int\" }", "value_schema": "{\"type\": \"record\", \"name\": \"User\", \"fields\": [{\"name\": \"name\", \"type\": \"string\"}]}", "records": [{"key" : 1 , "value": {"name": "testUser"}}]}' \
"http://localhost:8082/topics/avrokeytest2"
# Expected output from preceding command:
{"offsets":[{"partition":0,"offset":0,"error_code":null,"error":null}],"key_schema_id":2,"value_schema_id":1}
# Create a consumer for Avro data, starting at the beginning of the topic's
# log and subscribe to a topic. Then consume some data from a topic, which is decoded, translated to
# JSON, and included in the response. The schema used for deserialization is
# fetched automatically from schema registry. Finally, clean up.
curl -X POST -H "Content-Type: application/vnd.kafka.v2+json" \
--data '{"name": "my_consumer_instance", "format": "avro", "auto.offset.reset": "earliest"}' \
http://localhost:8082/consumers/my_avro_consumer
# Expected output from preceding command:
{"instance_id":"my_consumer_instance","base_uri":"http://localhost:8082/consumers/my_avro_consumer/instances/my_consumer_instance"}
curl -X POST -H "Content-Type: application/vnd.kafka.v2+json" --data '{"topics":["avrotest"]}' \
http://localhost:8082/consumers/my_avro_consumer/instances/my_consumer_instance/subscription
# No content in response
curl -X GET -H "Accept: application/vnd.kafka.avro.v2+json" \
http://localhost:8082/consumers/my_avro_consumer/instances/my_consumer_instance/records
# Expected output from preceding command:
[{"key":null,"value":{"name":"testUser"},"partition":0,"offset":1,"topic":"avrotest"}]
curl -X DELETE -H "Content-Type: application/vnd.kafka.v2+json" \
http://localhost:8082/consumers/my_avro_consumer/instances/my_consumer_instance
# No content in response
Produce and Consume Binary Messages¶
# Produce a message using binary embedded data with value "Kafka" to the topic binarytest
curl -X POST -H "Content-Type: application/vnd.kafka.binary.v2+json" \
-H "Accept: application/vnd.kafka.v2+json" \
--data '{"records":[{"value":"S2Fma2E="}]}' "http://localhost:8082/topics/binarytest"
# Expected output from preceding command:
{"offsets":[{"partition":0,"offset":0,"error_code":null,"error":null}],"key_schema_id":null,"value_schema_id":null}
# Create a consumer for binary data, starting at the beginning of the topic's
# log. Then consume some data from a topic using the base URL in the first response.
# Finally, close the consumer with a DELETE to make it leave the group and clean up
# its resources.
curl -X POST -H "Content-Type: application/vnd.kafka.v2+json" \
--data '{"name": "my_consumer_instance", "format": "binary", "auto.offset.reset": "earliest"}' \
http://localhost:8082/consumers/my_binary_consumer
# Expected output from preceding command:
{"instance_id":"my_consumer_instance","base_uri":"http://localhost:8082/consumers/my_binary_consumer/instances/my_consumer_instance"}
curl -X POST -H "Content-Type: application/vnd.kafka.v2+json" --data '{"topics":["binarytest"]}' \
http://localhost:8082/consumers/my_binary_consumer/instances/my_consumer_instance/subscription
# No content in response
curl -X GET -H "Accept: application/vnd.kafka.binary.v2+json" \
http://localhost:8082/consumers/my_binary_consumer/instances/my_consumer_instance/records
# Expected output from preceding command:
[{"key":null,"value":"S2Fma2E=","partition":0,"offset":0,"topic":"binarytest"}]
curl -X DELETE -H "Content-Type: application/vnd.kafka.v2+json" \
http://localhost:8082/consumers/my_binary_consumer/instances/my_consumer_instance
# No content in response
Produce and Consume Protobuf Messages¶
# Produce a message using Protobuf embedded data, including the schema which will
# be registered with schema registry and used to validate and serialize
# before storing the data in Kafka
curl -X POST -H "Content-Type: application/vnd.kafka.protobuf.v2+json" \
-H "Accept: application/vnd.kafka.v2+json" \
--data '{"value_schema": "syntax=\"proto3\"; message User { string name = 1; }", "records": [{"value": {"name": "testUser"}}]}' \
"http://localhost:8082/topics/protobuftest"
# Expected output from preceding command:
{"offsets":[{"partition":0,"offset":0,"error_code":null,"error":null}],"key_schema_id":null,"value_schema_id":21}
# Create a consumer for Protobuf data, starting at the beginning of the topic's
# log and subscribe to a topic. Then consume some data from a topic, which is decoded, translated to
# JSON, and included in the response. The schema used for deserialization is
# fetched automatically from schema registry. Finally, clean up.
curl -X POST -H "Content-Type: application/vnd.kafka.protobuf.v2+json" \
--data '{"name": "my_consumer_instance", "format": "protobuf", "auto.offset.reset": "earliest"}' \
http://localhost:8082/consumers/my_protobuf_consumer
# Expected output from preceding command:
{"instance_id":"my_consumer_instance","base_uri":"http://localhost:8082/consumers/my_protobuf_consumer/instances/my_consumer_instance"}
curl -X POST -H "Content-Type: application/vnd.kafka.protobuf.v2+json" --data '{"topics":["protobuftest"]}' \
http://localhost:8082/consumers/my_protobuf_consumer/instances/my_consumer_instance/subscription
# No content in response
curl -X GET -H "Accept: application/vnd.kafka.v2+json" \
http://localhost:8082/consumers/my_protobuf_consumer/instances/my_consumer_instance/records
# Expected output from preceding command:
[{"key":null,"value":{"name":"testUser"},"partition":0,"offset":1,"topic":"protobuftest"}]
curl -X DELETE -H "Content-Type: application/vnd.kafka.protobuf.v2+json" \
http://localhost:8082/consumers/my_protobuf_consumer/instances/my_consumer_instance
# No content in response
Produce and Consume JSON Schema Messages¶
# Produce a message using JSON schema embedded data, including the schema which will
# be registered with schema registry and used to validate and serialize
# before storing the data in Kafka
curl -X POST -H "Content-Type: application/vnd.kafka.jsonschema.v2+json" \
-H "Accept: application/vnd.kafka.v2+json" \
--data '{"value_schema": "{\"type\":\"object\",\"properties\":{\"name\":{\"type\":\"string\"}}}", "records": [{"value": {"name": "testUser"}}]}' \
"http://localhost:8082/topics/jsonschematest"
# Expected output from preceding command:
{"offsets":[{"partition":0,"offset":0,"error_code":null,"error":null}],"key_schema_id":null,"value_schema_id":21}
# Create a consumer for JSON schema data, starting at the beginning of the topic's
# log and subscribe to a topic. Then consume some data from a topic, which is decoded, translated to
# JSON, and included in the response. The schema used for deserialization is
# fetched automatically from schema registry. Finally, clean up.
curl -X POST -H "Content-Type: application/vnd.kafka.jsonschema.v2+json" \
--data '{"name": "my_consumer_instance", "format": "jsonschema", "auto.offset.reset": "earliest"}' \
http://localhost:8082/consumers/my_jsonschema_consumer
# Expected output from preceding command:
{"instance_id":"my_consumer_instance","base_uri":"http://localhost:8082/consumers/my_jsonschema_consumer/instances/my_consumer_instance"}
curl -X POST -H "Content-Type: application/vnd.kafka.jsonschema.v2+json" --data '{"topics":["jsonschematest"]}' \
http://localhost:8082/consumers/my_jsonschema_consumer/instances/my_consumer_instance/subscription
# No content in response
curl -X GET -H "Accept: application/vnd.kafka.v2+json" \
http://localhost:8082/consumers/my_jsonschema_consumer/instances/my_consumer_instance/records
# Expected output from preceding command:
[{"key":null,"value":{"name":"testUser"},"partition":0,"offset":1,"topic":"jsonschematest"}]
curl -X DELETE -H "Content-Type: application/vnd.kafka.jsonschema.v2+json" \
http://localhost:8082/consumers/my_jsonschema_consumer/instances/my_consumer_instance
# No content in response
Inspect Topic Metadata¶
# Get a list of topics
curl "http://localhost:8082/topics"
# Expected output from preceding command:
["__consumer_offsets","_schemas","avrotest","binarytest","jsontest"]
# Get info about one topic
curl "http://localhost:8082/topics/avrotest"
# Expected output from preceding command:
{"name":"avrotest","configs":{"message.downconversion.enable":"true","file.delete.delay.ms":"60000",\
"segment.ms":"604800000","min.compaction.lag.ms":"0","retention.bytes":"-1","segment.index.bytes":"10485760",\
"cleanup.policy":"delete","follower.replication.throttled.replicas":"",\
"message.timestamp.difference.max.ms":"9223372036854775807","segment.jitter.ms":"0","preallocate":"false",\
"message.timestamp.type":"CreateTime","message.format.version":"2.0-IV1","segment.bytes":"1073741824",\
"unclean.leader.election.enable":"false","max.message.bytes":"1000012","retention.ms":"604800000",\
"flush.ms":"9223372036854775807","delete.retention.ms":"86400000","leader.replication.throttled.replicas":"",\
"min.insync.replicas":"1","flush.messages":"9223372036854775807","compression.type":"producer",\
"index.interval.bytes":"4096","min.cleanable.dirty.ratio":"0.5"},"partitions":\
[{"partition":0,"leader":0,"replicas":[{"broker":0,"leader":true,"in_sync":true}]}]}
...
# Get info about a topic's partitions
curl "http://localhost:8082/topics/avrotest/partitions"
# Expected output from preceding command:
[{"partition":0,"leader":0,"replicas":[{"broker":0,"leader":true,"in_sync":true}]}]
More Examples¶
For an example that uses REST Proxy configured with security, see the Confluent Platform demo.