Quick Start for Confluent REST Proxy for Kafka

Use the following Quick Start instructions to get up and running with Confluent REST Proxy for Apache Kafka®.

Prerequisites

Start by running the REST Proxy and the services it depends on: Kafka and Schema Registry. You can do this in one command with the Confluent CLI confluent local commands.

Important

The Confluent CLI confluent local commands are intended for a single-node development environment and are not suitable for a production environment. The data that are produced are transient and are intended to be temporary. For production-ready workflows, see Install and Upgrade Confluent Platform.

confluent local services kafka-rest start

Each service reads its configuration from its property files under etc.

Note

To manually start each service in its own terminal, run instead:

bin/zookeeper-server-start ./etc/kafka/zookeeper.properties
bin/kafka-server-start ./etc/kafka/server.properties
bin/kafka-rest-start ./etc/kafka-rest/kafka-rest.properties

# optional, if you want to use the Avro, JSON Schema, or Protobuf data format
bin/schema-registry-start ./etc/schema-registry/schema-registry.properties

As of Confluent Platform 7.5, ZooKeeper is deprecated for new deployments. Confluent recommends KRaft mode for new deployments. For more information, see KRaft Overview.

See the Confluent Platform quickstart for a more detailed explanation of how to get these services up and running.

Produce and Consume JSON Messages

# Produce a message using JSON with the value '{ "foo": "bar" }' to the topic jsontest
curl -X POST -H "Content-Type: application/vnd.kafka.json.v2+json" \
      --data '{"records":[{"value":{"foo":"bar"}}]}' "http://localhost:8082/topics/jsontest"

# Expected output from preceding command
  {
   "offsets":[{"partition":0,"offset":0,"error_code":null,"error":null}],"key_schema_id":null,"value_schema_id":null
  }

# Create a consumer for JSON data in "my_json_consumer_group" consumer group, starting at the beginning of the topic's
# log and subscribe to a topic. Then consume some data using the base URL in the first response.
# Finally, close the consumer with a DELETE to make it leave the group and clean up
# its resources.
curl -X POST -H "Content-Type: application/vnd.kafka.v2+json" \
      --data '{"name": "my_consumer_instance", "format": "json", "auto.offset.reset": "earliest"}' \
      http://localhost:8082/consumers/my_json_consumer_group

# Expected output from preceding command
 {
  "instance_id":"my_consumer_instance",
  "base_uri":"http://localhost:8082/consumers/my_json_consumer_group/instances/my_consumer_instance"
 }

curl -X POST -H "Content-Type: application/vnd.kafka.v2+json" --data '{"topics":["jsontest"]}' \
 http://localhost:8082/consumers/my_json_consumer_group/instances/my_consumer_instance/subscription
# No content in response

curl -X GET -H "Accept: application/vnd.kafka.json.v2+json" \
      http://localhost:8082/consumers/my_json_consumer_group/instances/my_consumer_instance/records

# Expected output from preceding command
  [
   {"key":null,"value":{"foo":"bar"},"partition":0,"offset":0,"topic":"jsontest"}
  ]

curl -X DELETE -H "Content-Type: application/vnd.kafka.v2+json" \
      http://localhost:8082/consumers/my_json_consumer_group/instances/my_consumer_instance
# No content in response

Produce and Consume Avro Messages

# Produce a message using Avro embedded data, including the schema which will
# be registered with schema registry and used to validate and serialize
# before storing the data in Kafka
curl -X POST -H "Content-Type: application/vnd.kafka.avro.v2+json" \
      -H "Accept: application/vnd.kafka.v2+json" \
      --data '{"value_schema": "{\"type\": \"record\", \"name\": \"User\", \"fields\": [{\"name\": \"name\", \"type\": \"string\"}]}", "records": [{"value": {"name": "testUser"}}]}' \
      "http://localhost:8082/topics/avrotest"

# Expected output from preceding command:
  {"offsets":[{"partition":0,"offset":0,"error_code":null,"error":null}],"key_schema_id":null,"value_schema_id":21}

# Produce a message with Avro key and value.
# Note that if you use Avro values you must also use Avro keys, but the schemas can differ

curl -X POST -H "Content-Type: application/vnd.kafka.avro.v2+json" \
      -H "Accept: application/vnd.kafka.v2+json" \
      --data '{"key_schema": "{\"name\":\"user_id\"  ,\"type\": \"int\"   }", "value_schema": "{\"type\": \"record\", \"name\": \"User\", \"fields\": [{\"name\": \"name\", \"type\": \"string\"}]}", "records": [{"key" : 1 , "value": {"name": "testUser"}}]}' \
      "http://localhost:8082/topics/avrokeytest2"

# Expected output from preceding command:
  {"offsets":[{"partition":0,"offset":0,"error_code":null,"error":null}],"key_schema_id":2,"value_schema_id":1}

# Create a consumer for Avro data in "my_avro_consumer_group" consumer group, starting at the beginning of the topic's
# log and subscribe to a topic. Then consume some data from a topic, which is decoded, translated to
# JSON, and included in the response. The schema used for deserialization is
# fetched automatically from schema registry. Finally, clean up.
curl -X POST  -H "Content-Type: application/vnd.kafka.v2+json" \
      --data '{"name": "my_consumer_instance", "format": "avro", "auto.offset.reset": "earliest"}' \
      http://localhost:8082/consumers/my_avro_consumer_group

# Expected output from preceding command:
  {"instance_id":"my_consumer_instance","base_uri":"http://localhost:8082/consumers/my_avro_consumer_group/instances/my_consumer_instance"}

curl -X POST -H "Content-Type: application/vnd.kafka.v2+json" --data '{"topics":["avrotest"]}' \
      http://localhost:8082/consumers/my_avro_consumer_group/instances/my_consumer_instance/subscription
# No content in response

curl -X GET -H "Accept: application/vnd.kafka.avro.v2+json" \
      http://localhost:8082/consumers/my_avro_consumer_group/instances/my_consumer_instance/records

# Expected output from preceding command:
  [{"key":null,"value":{"name":"testUser"},"partition":0,"offset":1,"topic":"avrotest"}]

curl -X DELETE -H "Content-Type: application/vnd.kafka.v2+json" \
      http://localhost:8082/consumers/my_avro_consumer_group/instances/my_consumer_instance
# No content in response

Produce and Consume Binary Messages

# Produce a message using binary embedded data with value "Kafka" to the topic binarytest
curl -X POST -H "Content-Type: application/vnd.kafka.binary.v2+json" \
      -H "Accept: application/vnd.kafka.v2+json" \
      --data '{"records":[{"value":"S2Fma2E="}]}' "http://localhost:8082/topics/binarytest"

# Expected output from preceding command:
  {"offsets":[{"partition":0,"offset":0,"error_code":null,"error":null}],"key_schema_id":null,"value_schema_id":null}

# Create a consumer for binary data in "my_binary_consumer_group" consumer group, starting at the beginning of the topic's
# log. Then consume some data from a topic using the base URL in the first response.
# Finally, close the consumer with a DELETE to make it leave the group and clean up
# its resources.
curl -X POST -H "Content-Type: application/vnd.kafka.v2+json" \
      --data '{"name": "my_consumer_instance", "format": "binary", "auto.offset.reset": "earliest"}' \
      http://localhost:8082/consumers/my_binary_consumer_group

# Expected output from preceding command:
  {"instance_id":"my_consumer_instance","base_uri":"http://localhost:8082/consumers/my_binary_consumer_group/instances/my_consumer_instance"}

curl -X POST -H "Content-Type: application/vnd.kafka.v2+json" --data '{"topics":["binarytest"]}' \
      http://localhost:8082/consumers/my_binary_consumer_group/instances/my_consumer_instance/subscription
# No content in response

curl -X GET -H "Accept: application/vnd.kafka.binary.v2+json" \
      http://localhost:8082/consumers/my_binary_consumer_group/instances/my_consumer_instance/records

# Expected output from preceding command:
 [{"key":null,"value":"S2Fma2E=","partition":0,"offset":0,"topic":"binarytest"}]

curl -X DELETE -H "Content-Type: application/vnd.kafka.v2+json" \
      http://localhost:8082/consumers/my_binary_consumer_group/instances/my_consumer_instance
# No content in response

Produce and Consume Protobuf Messages

# Produce a message using Protobuf embedded data, including the schema which will
# be registered with schema registry and used to validate and serialize
# before storing the data in Kafka
curl -X POST -H "Content-Type: application/vnd.kafka.protobuf.v2+json" \
   -H "Accept: application/vnd.kafka.v2+json" \
   --data '{"value_schema": "syntax=\"proto3\"; message User { string name = 1; }", "records": [{"value": {"name": "testUser"}}]}' \
   "http://localhost:8082/topics/protobuftest"

# Expected output from preceding command:
  {"offsets":[{"partition":0,"offset":0,"error_code":null,"error":null}],"key_schema_id":null,"value_schema_id":21}

# Create a consumer for Protobuf data in "my_protobuf_consumer_group" consumer group, starting at the beginning of the topic's
# log and subscribe to a topic. Then consume some data from a topic, which is decoded, translated to
# JSON, and included in the response. The schema used for deserialization is
# fetched automatically from schema registry. Finally, clean up.
curl -X POST  -H "Content-Type: application/vnd.kafka.protobuf.v2+json" \
      --data '{"name": "my_consumer_instance", "format": "protobuf", "auto.offset.reset": "earliest"}' \
      http://localhost:8082/consumers/my_protobuf_consumer_group

# Expected output from preceding command:
  {"instance_id":"my_consumer_instance","base_uri":"http://localhost:8082/consumers/my_protobuf_consumer_group/instances/my_consumer_instance"}

curl -X POST -H "Content-Type: application/vnd.kafka.protobuf.v2+json" --data '{"topics":["protobuftest"]}' \
      http://localhost:8082/consumers/my_protobuf_consumer_group/instances/my_consumer_instance/subscription
# No content in response

curl -X GET -H "Accept: application/vnd.kafka.protobuf.v2+json" \
      http://localhost:8082/consumers/my_protobuf_consumer_group/instances/my_consumer_instance/records

# Expected output from preceding command:
  [{"key":null,"value":{"name":"testUser"},"partition":0,"offset":1,"topic":"protobuftest"}]

curl -X DELETE -H "Content-Type: application/vnd.kafka.protobuf.v2+json" \
      http://localhost:8082/consumers/my_protobuf_consumer_group/instances/my_consumer_instance
# No content in response

Produce and Consume JSON Schema Messages

# Produce a message using JSON schema embedded data, including the schema which will
# be registered with schema registry and used to validate and serialize
# before storing the data in Kafka
curl -X POST -H "Content-Type: application/vnd.kafka.jsonschema.v2+json" \
   -H "Accept: application/vnd.kafka.v2+json" \
   --data '{"value_schema": "{\"type\":\"object\",\"properties\":{\"name\":{\"type\":\"string\"}}}", "records": [{"value": {"name": "testUser"}}]}' \
   "http://localhost:8082/topics/jsonschematest"

# Expected output from preceding command:
  {"offsets":[{"partition":0,"offset":0,"error_code":null,"error":null}],"key_schema_id":null,"value_schema_id":21}

# Create a consumer for JSON schema data in "my_jsonschema_consumer_group" consumer group, starting at the beginning of the topic's
# log and subscribe to a topic. Then consume some data from a topic, which is decoded, translated to
# JSON, and included in the response. The schema used for deserialization is
# fetched automatically from schema registry. Finally, clean up.
curl -X POST  -H "Content-Type: application/vnd.kafka.jsonschema.v2+json" \
      --data '{"name": "my_consumer_instance", "format": "jsonschema", "auto.offset.reset": "earliest"}' \
      http://localhost:8082/consumers/my_jsonschema_consumer_group

# Expected output from preceding command:
  {"instance_id":"my_consumer_instance","base_uri":"http://localhost:8082/consumers/my_jsonschema_consumer_group/instances/my_consumer_instance"}

curl -X POST -H "Content-Type: application/vnd.kafka.jsonschema.v2+json" --data '{"topics":["jsonschematest"]}' \
      http://localhost:8082/consumers/my_jsonschema_consumer_group/instances/my_consumer_instance/subscription
# No content in response

curl -X GET -H "Accept: application/vnd.kafka.jsonschema.v2+json" \
      http://localhost:8082/consumers/my_jsonschema_consumer_group/instances/my_consumer_instance/records

# Expected output from preceding command:
  [{"key":null,"value":{"name":"testUser"},"partition":0,"offset":1,"topic":"jsonschematest"}]

curl -X DELETE -H "Content-Type: application/vnd.kafka.jsonschema.v2+json" \
      http://localhost:8082/consumers/my_jsonschema_consumer_group/instances/my_consumer_instance
# No content in response

Inspect Topic Metadata

# Get a list of topics
curl "http://localhost:8082/topics"

# Expected output from preceding command:
  ["__consumer_offsets","_schemas","avrotest","binarytest","jsontest"]

# Get info about one topic
curl "http://localhost:8082/topics/avrotest"

# Expected output from preceding command:
  {"name":"avrotest","configs":{"message.downconversion.enable":"true","file.delete.delay.ms":"60000",\
  "segment.ms":"604800000","min.compaction.lag.ms":"0","retention.bytes":"-1","segment.index.bytes":"10485760",\
  "cleanup.policy":"delete","follower.replication.throttled.replicas":"",\
  "message.timestamp.difference.max.ms":"9223372036854775807","segment.jitter.ms":"0","preallocate":"false",\
  "message.timestamp.type":"CreateTime","message.format.version":"2.0-IV1","segment.bytes":"1073741824",\
  "unclean.leader.election.enable":"false","max.message.bytes":"1000012","retention.ms":"604800000",\
  "flush.ms":"9223372036854775807","delete.retention.ms":"86400000","leader.replication.throttled.replicas":"",\
  "min.insync.replicas":"1","flush.messages":"9223372036854775807","compression.type":"producer",\
  "index.interval.bytes":"4096","min.cleanable.dirty.ratio":"0.5"},"partitions":\
  [{"partition":0,"leader":0,"replicas":[{"broker":0,"leader":true,"in_sync":true}]}]}
  ...

# Get info about a topic's partitions
curl "http://localhost:8082/topics/avrotest/partitions"

# Expected output from preceding command:
  [{"partition":0,"leader":0,"replicas":[{"broker":0,"leader":true,"in_sync":true}]}]

More Examples