Quick Start for Confluent REST Proxy for Kafka¶
Use the following Quick Start instructions to get up and running with Confluent REST Proxy for Apache Kafka®.
- Prerequisites
- Confluent Platform
- Confluent CLI (requires separate installation)
Start by running the REST Proxy and the services it depends on: Kafka and Schema Registry. You can do this in one command with the Confluent CLI confluent local commands.
Important
The Confluent CLI confluent local commands are intended for a single-node development environment and are not suitable for a production environment. The data that are produced are transient and are intended to be temporary. For production-ready workflows, see Install and Upgrade Confluent Platform.
confluent local services kafka-rest start
Each service reads its configuration from its property files under etc
.
Note
To manually start each service in its own terminal, run instead:
bin/zookeeper-server-start ./etc/kafka/zookeeper.properties
bin/kafka-server-start ./etc/kafka/server.properties
bin/kafka-rest-start ./etc/kafka-rest/kafka-rest.properties
# optional, if you want to use the Avro, JSON Schema, or Protobuf data format
bin/schema-registry-start ./etc/schema-registry/schema-registry.properties
As of Confluent Platform 7.5, ZooKeeper is deprecated for new deployments. Confluent recommends KRaft mode for new deployments. For more information, see KRaft Overview.
See the Confluent Platform quickstart for a more detailed explanation of how to get these services up and running.
Produce and Consume JSON Messages¶
# Produce a message using JSON with the value '{ "foo": "bar" }' to the topic jsontest
curl -X POST -H "Content-Type: application/vnd.kafka.json.v2+json" \
--data '{"records":[{"value":{"foo":"bar"}}]}' "http://localhost:8082/topics/jsontest"
# Expected output from preceding command
{
"offsets":[{"partition":0,"offset":0,"error_code":null,"error":null}],"key_schema_id":null,"value_schema_id":null
}
# Create a consumer for JSON data in "my_json_consumer_group" consumer group, starting at the beginning of the topic's
# log and subscribe to a topic. Then consume some data using the base URL in the first response.
# Finally, close the consumer with a DELETE to make it leave the group and clean up
# its resources.
curl -X POST -H "Content-Type: application/vnd.kafka.v2+json" \
--data '{"name": "my_consumer_instance", "format": "json", "auto.offset.reset": "earliest"}' \
http://localhost:8082/consumers/my_json_consumer_group
# Expected output from preceding command
{
"instance_id":"my_consumer_instance",
"base_uri":"http://localhost:8082/consumers/my_json_consumer_group/instances/my_consumer_instance"
}
curl -X POST -H "Content-Type: application/vnd.kafka.v2+json" --data '{"topics":["jsontest"]}' \
http://localhost:8082/consumers/my_json_consumer_group/instances/my_consumer_instance/subscription
# No content in response
curl -X GET -H "Accept: application/vnd.kafka.json.v2+json" \
http://localhost:8082/consumers/my_json_consumer_group/instances/my_consumer_instance/records
# Expected output from preceding command
[
{"key":null,"value":{"foo":"bar"},"partition":0,"offset":0,"topic":"jsontest"}
]
curl -X DELETE -H "Content-Type: application/vnd.kafka.v2+json" \
http://localhost:8082/consumers/my_json_consumer_group/instances/my_consumer_instance
# No content in response
Produce and Consume Avro Messages¶
# Produce a message using Avro embedded data, including the schema which will
# be registered with schema registry and used to validate and serialize
# before storing the data in Kafka
curl -X POST -H "Content-Type: application/vnd.kafka.avro.v2+json" \
-H "Accept: application/vnd.kafka.v2+json" \
--data '{"value_schema": "{\"type\": \"record\", \"name\": \"User\", \"fields\": [{\"name\": \"name\", \"type\": \"string\"}]}", "records": [{"value": {"name": "testUser"}}]}' \
"http://localhost:8082/topics/avrotest"
# Expected output from preceding command:
{"offsets":[{"partition":0,"offset":0,"error_code":null,"error":null}],"key_schema_id":null,"value_schema_id":21}
# Produce a message with Avro key and value.
# Note that if you use Avro values you must also use Avro keys, but the schemas can differ
curl -X POST -H "Content-Type: application/vnd.kafka.avro.v2+json" \
-H "Accept: application/vnd.kafka.v2+json" \
--data '{"key_schema": "{\"name\":\"user_id\" ,\"type\": \"int\" }", "value_schema": "{\"type\": \"record\", \"name\": \"User\", \"fields\": [{\"name\": \"name\", \"type\": \"string\"}]}", "records": [{"key" : 1 , "value": {"name": "testUser"}}]}' \
"http://localhost:8082/topics/avrokeytest2"
# Expected output from preceding command:
{"offsets":[{"partition":0,"offset":0,"error_code":null,"error":null}],"key_schema_id":2,"value_schema_id":1}
# Create a consumer for Avro data in "my_avro_consumer_group" consumer group, starting at the beginning of the topic's
# log and subscribe to a topic. Then consume some data from a topic, which is decoded, translated to
# JSON, and included in the response. The schema used for deserialization is
# fetched automatically from schema registry. Finally, clean up.
curl -X POST -H "Content-Type: application/vnd.kafka.v2+json" \
--data '{"name": "my_consumer_instance", "format": "avro", "auto.offset.reset": "earliest"}' \
http://localhost:8082/consumers/my_avro_consumer_group
# Expected output from preceding command:
{"instance_id":"my_consumer_instance","base_uri":"http://localhost:8082/consumers/my_avro_consumer_group/instances/my_consumer_instance"}
curl -X POST -H "Content-Type: application/vnd.kafka.v2+json" --data '{"topics":["avrotest"]}' \
http://localhost:8082/consumers/my_avro_consumer_group/instances/my_consumer_instance/subscription
# No content in response
curl -X GET -H "Accept: application/vnd.kafka.avro.v2+json" \
http://localhost:8082/consumers/my_avro_consumer_group/instances/my_consumer_instance/records
# Expected output from preceding command:
[{"key":null,"value":{"name":"testUser"},"partition":0,"offset":1,"topic":"avrotest"}]
curl -X DELETE -H "Content-Type: application/vnd.kafka.v2+json" \
http://localhost:8082/consumers/my_avro_consumer_group/instances/my_consumer_instance
# No content in response
Produce and Consume Binary Messages¶
# Produce a message using binary embedded data with value "Kafka" to the topic binarytest
curl -X POST -H "Content-Type: application/vnd.kafka.binary.v2+json" \
-H "Accept: application/vnd.kafka.v2+json" \
--data '{"records":[{"value":"S2Fma2E="}]}' "http://localhost:8082/topics/binarytest"
# Expected output from preceding command:
{"offsets":[{"partition":0,"offset":0,"error_code":null,"error":null}],"key_schema_id":null,"value_schema_id":null}
# Create a consumer for binary data in "my_binary_consumer_group" consumer group, starting at the beginning of the topic's
# log. Then consume some data from a topic using the base URL in the first response.
# Finally, close the consumer with a DELETE to make it leave the group and clean up
# its resources.
curl -X POST -H "Content-Type: application/vnd.kafka.v2+json" \
--data '{"name": "my_consumer_instance", "format": "binary", "auto.offset.reset": "earliest"}' \
http://localhost:8082/consumers/my_binary_consumer_group
# Expected output from preceding command:
{"instance_id":"my_consumer_instance","base_uri":"http://localhost:8082/consumers/my_binary_consumer_group/instances/my_consumer_instance"}
curl -X POST -H "Content-Type: application/vnd.kafka.v2+json" --data '{"topics":["binarytest"]}' \
http://localhost:8082/consumers/my_binary_consumer_group/instances/my_consumer_instance/subscription
# No content in response
curl -X GET -H "Accept: application/vnd.kafka.binary.v2+json" \
http://localhost:8082/consumers/my_binary_consumer_group/instances/my_consumer_instance/records
# Expected output from preceding command:
[{"key":null,"value":"S2Fma2E=","partition":0,"offset":0,"topic":"binarytest"}]
curl -X DELETE -H "Content-Type: application/vnd.kafka.v2+json" \
http://localhost:8082/consumers/my_binary_consumer_group/instances/my_consumer_instance
# No content in response
Produce and Consume Protobuf Messages¶
# Produce a message using Protobuf embedded data, including the schema which will
# be registered with schema registry and used to validate and serialize
# before storing the data in Kafka
curl -X POST -H "Content-Type: application/vnd.kafka.protobuf.v2+json" \
-H "Accept: application/vnd.kafka.v2+json" \
--data '{"value_schema": "syntax=\"proto3\"; message User { string name = 1; }", "records": [{"value": {"name": "testUser"}}]}' \
"http://localhost:8082/topics/protobuftest"
# Expected output from preceding command:
{"offsets":[{"partition":0,"offset":0,"error_code":null,"error":null}],"key_schema_id":null,"value_schema_id":21}
# Create a consumer for Protobuf data in "my_protobuf_consumer_group" consumer group, starting at the beginning of the topic's
# log and subscribe to a topic. Then consume some data from a topic, which is decoded, translated to
# JSON, and included in the response. The schema used for deserialization is
# fetched automatically from schema registry. Finally, clean up.
curl -X POST -H "Content-Type: application/vnd.kafka.protobuf.v2+json" \
--data '{"name": "my_consumer_instance", "format": "protobuf", "auto.offset.reset": "earliest"}' \
http://localhost:8082/consumers/my_protobuf_consumer_group
# Expected output from preceding command:
{"instance_id":"my_consumer_instance","base_uri":"http://localhost:8082/consumers/my_protobuf_consumer_group/instances/my_consumer_instance"}
curl -X POST -H "Content-Type: application/vnd.kafka.protobuf.v2+json" --data '{"topics":["protobuftest"]}' \
http://localhost:8082/consumers/my_protobuf_consumer_group/instances/my_consumer_instance/subscription
# No content in response
curl -X GET -H "Accept: application/vnd.kafka.protobuf.v2+json" \
http://localhost:8082/consumers/my_protobuf_consumer_group/instances/my_consumer_instance/records
# Expected output from preceding command:
[{"key":null,"value":{"name":"testUser"},"partition":0,"offset":1,"topic":"protobuftest"}]
curl -X DELETE -H "Content-Type: application/vnd.kafka.protobuf.v2+json" \
http://localhost:8082/consumers/my_protobuf_consumer_group/instances/my_consumer_instance
# No content in response
Produce and Consume JSON Schema Messages¶
# Produce a message using JSON schema embedded data, including the schema which will
# be registered with schema registry and used to validate and serialize
# before storing the data in Kafka
curl -X POST -H "Content-Type: application/vnd.kafka.jsonschema.v2+json" \
-H "Accept: application/vnd.kafka.v2+json" \
--data '{"value_schema": "{\"type\":\"object\",\"properties\":{\"name\":{\"type\":\"string\"}}}", "records": [{"value": {"name": "testUser"}}]}' \
"http://localhost:8082/topics/jsonschematest"
# Expected output from preceding command:
{"offsets":[{"partition":0,"offset":0,"error_code":null,"error":null}],"key_schema_id":null,"value_schema_id":21}
# Create a consumer for JSON schema data in "my_jsonschema_consumer_group" consumer group, starting at the beginning of the topic's
# log and subscribe to a topic. Then consume some data from a topic, which is decoded, translated to
# JSON, and included in the response. The schema used for deserialization is
# fetched automatically from schema registry. Finally, clean up.
curl -X POST -H "Content-Type: application/vnd.kafka.jsonschema.v2+json" \
--data '{"name": "my_consumer_instance", "format": "jsonschema", "auto.offset.reset": "earliest"}' \
http://localhost:8082/consumers/my_jsonschema_consumer_group
# Expected output from preceding command:
{"instance_id":"my_consumer_instance","base_uri":"http://localhost:8082/consumers/my_jsonschema_consumer_group/instances/my_consumer_instance"}
curl -X POST -H "Content-Type: application/vnd.kafka.jsonschema.v2+json" --data '{"topics":["jsonschematest"]}' \
http://localhost:8082/consumers/my_jsonschema_consumer_group/instances/my_consumer_instance/subscription
# No content in response
curl -X GET -H "Accept: application/vnd.kafka.jsonschema.v2+json" \
http://localhost:8082/consumers/my_jsonschema_consumer_group/instances/my_consumer_instance/records
# Expected output from preceding command:
[{"key":null,"value":{"name":"testUser"},"partition":0,"offset":1,"topic":"jsonschematest"}]
curl -X DELETE -H "Content-Type: application/vnd.kafka.jsonschema.v2+json" \
http://localhost:8082/consumers/my_jsonschema_consumer_group/instances/my_consumer_instance
# No content in response
Inspect Topic Metadata¶
# Get a list of topics
curl "http://localhost:8082/topics"
# Expected output from preceding command:
["__consumer_offsets","_schemas","avrotest","binarytest","jsontest"]
# Get info about one topic
curl "http://localhost:8082/topics/avrotest"
# Expected output from preceding command:
{"name":"avrotest","configs":{"message.downconversion.enable":"true","file.delete.delay.ms":"60000",\
"segment.ms":"604800000","min.compaction.lag.ms":"0","retention.bytes":"-1","segment.index.bytes":"10485760",\
"cleanup.policy":"delete","follower.replication.throttled.replicas":"",\
"message.timestamp.difference.max.ms":"9223372036854775807","segment.jitter.ms":"0","preallocate":"false",\
"message.timestamp.type":"CreateTime","message.format.version":"2.0-IV1","segment.bytes":"1073741824",\
"unclean.leader.election.enable":"false","max.message.bytes":"1000012","retention.ms":"604800000",\
"flush.ms":"9223372036854775807","delete.retention.ms":"86400000","leader.replication.throttled.replicas":"",\
"min.insync.replicas":"1","flush.messages":"9223372036854775807","compression.type":"producer",\
"index.interval.bytes":"4096","min.cleanable.dirty.ratio":"0.5"},"partitions":\
[{"partition":0,"leader":0,"replicas":[{"broker":0,"leader":true,"in_sync":true}]}]}
...
# Get info about a topic's partitions
curl "http://localhost:8082/topics/avrotest/partitions"
# Expected output from preceding command:
[{"partition":0,"leader":0,"replicas":[{"broker":0,"leader":true,"in_sync":true}]}]
More Examples¶
- For a hands-on example that uses Confluent REST Proxy to produce and consume data from a Kafka cluster, see the Confluent REST Proxy tutorial.
- For an example that uses REST Proxy configured with security, see the Confluent Platform demo.