Important

You are viewing documentation for an older version of Confluent Platform. For the latest, click here.

REST Proxy Quick Start

Start by running the REST Proxy and the services it depends on: ZooKeeper, Kafka, and Schema Registry. You can do this in one command with the Confluent CLI confluent local commands.

Important

The confluent local commands are intended for a single-node development environment and are not suitable for a production environment. The data that are produced are transient and are intended to be temporary. For production-ready workflows, see Install and Upgrade.

Prerequisites
$ confluent local start kafka-rest

Each service reads its configuration from its property files under etc.

Note

To manually start each service in its own terminal, run instead:

$ bin/zookeeper-server-start ./etc/kafka/zookeeper.properties
$ bin/kafka-server-start ./etc/kafka/server.properties
$ bin/kafka-rest-start ./etc/kafka-rest/kafka-rest.properties

# optional, if you want to use Avro data format
$ bin/schema-registry-start ./etc/schema-registry/schema-registry.properties

See the Confluent Platform quickstart for a more detailed explanation of how to get these services up and running.

Produce and Consume JSON Messages

# Produce a message using JSON with the value '{ "foo": "bar" }' to the topic jsontest
$ curl -X POST -H "Content-Type: application/vnd.kafka.json.v2+json" \
      -H "Accept: application/vnd.kafka.v2+json" \
      --data '{"records":[{"value":{"foo":"bar"}}]}' "http://localhost:8082/topics/jsontest"

# Expected output from preceding command
  {
   "offsets":[{"partition":0,"offset":0,"error_code":null,"error":null}],"key_schema_id":null,"value_schema_id":null
  }

# Create a consumer for JSON data, starting at the beginning of the topic's
# log and subscribe to a topic. Then consume some data using the base URL in the first response.
# Finally, close the consumer with a DELETE to make it leave the group and clean up
# its resources.
$ curl -X POST -H "Content-Type: application/vnd.kafka.v2+json" \
      --data '{"name": "my_consumer_instance", "format": "json", "auto.offset.reset": "earliest"}' \
      http://localhost:8082/consumers/my_json_consumer

# Expected output from preceding command
 {
  "instance_id":"my_consumer_instance",
  "base_uri":"http://localhost:8082/consumers/my_json_consumer/instances/my_consumer_instance"
 }

$ curl -X POST -H "Content-Type: application/vnd.kafka.v2+json" --data '{"topics":["jsontest"]}' \
 http://localhost:8082/consumers/my_json_consumer/instances/my_consumer_instance/subscription
# No content in response

$ curl -X GET -H "Accept: application/vnd.kafka.json.v2+json" \
      http://localhost:8082/consumers/my_json_consumer/instances/my_consumer_instance/records

# Expected output from preceding command
  [
   {"key":null,"value":{"foo":"bar"},"partition":0,"offset":0,"topic":"jsontest"}
  ]

$ curl -X DELETE -H "Content-Type: application/vnd.kafka.v2+json" \
      http://localhost:8082/consumers/my_json_consumer/instances/my_consumer_instance
# No content in response

Produce and Consume Avro Messages

# Produce a message using Avro embedded data, including the schema which will
# be registered with schema registry and used to validate and serialize
# before storing the data in Kafka
$ curl -X POST -H "Content-Type: application/vnd.kafka.avro.v2+json" \
      -H "Accept: application/vnd.kafka.v2+json" \
      --data '{"value_schema": "{\"type\": \"record\", \"name\": \"User\", \"fields\": [{\"name\": \"name\", \"type\": \"string\"}]}", "records": [{"value": {"name": "testUser"}}]}' \
      "http://localhost:8082/topics/avrotest"

# Expected output from preceding command:
  {"offsets":[{"partition":0,"offset":0,"error_code":null,"error":null}],"key_schema_id":null,"value_schema_id":21}

# Produce a message with Avro key and value.
# Note that if you use Avro values you must also use Avro keys, but the schemas can differ

$ curl -X POST -H "Content-Type: application/vnd.kafka.avro.v2+json" \
      -H "Accept: application/vnd.kafka.v2+json" \
      --data '{"key_schema": "{\"name\":\"user_id\"  ,\"type\": \"int\"   }", "value_schema": "{\"type\": \"record\", \"name\": \"User\", \"fields\": [{\"name\": \"name\", \"type\": \"string\"}]}", "records": [{"key" : 1 , "value": {"name": "testUser"}}]}' \
      "http://localhost:8082/topics/avrokeytest2"

# Expected output from preceding command:
  {"offsets":[{"partition":0,"offset":0,"error_code":null,"error":null}],"key_schema_id":2,"value_schema_id":1}

# Create a consumer for Avro data, starting at the beginning of the topic's
# log and subscribe to a topic. Then consume some data from a topic, which is decoded, translated to
# JSON, and included in the response. The schema used for deserialization is
# fetched automatically from schema registry. Finally, clean up.
$ curl -X POST  -H "Content-Type: application/vnd.kafka.v2+json" \
      --data '{"name": "my_consumer_instance", "format": "avro", "auto.offset.reset": "earliest"}' \
      http://localhost:8082/consumers/my_avro_consumer

# Expected output from preceding command:
  {"instance_id":"my_consumer_instance","base_uri":"http://localhost:8082/consumers/my_avro_consumer/instances/my_consumer_instance"}

$ curl -X POST -H "Content-Type: application/vnd.kafka.v2+json" --data '{"topics":["avrotest"]}' \
      http://localhost:8082/consumers/my_avro_consumer/instances/my_consumer_instance/subscription
# No content in response

$ curl -X GET -H "Accept: application/vnd.kafka.avro.v2+json" \
      http://localhost:8082/consumers/my_avro_consumer/instances/my_consumer_instance/records

# Expected output from preceding command:
  [{"key":null,"value":{"name":"testUser"},"partition":0,"offset":1,"topic":"avrotest"}]

$ curl -X DELETE -H "Content-Type: application/vnd.kafka.v2+json" \
      http://localhost:8082/consumers/my_avro_consumer/instances/my_consumer_instance
# No content in response

Produce and Consume Binary Messages

# Produce a message using binary embedded data with value "Kafka" to the topic binarytest
$ curl -X POST -H "Content-Type: application/vnd.kafka.binary.v2+json" \
      -H "Accept: application/vnd.kafka.v2+json" \
      --data '{"records":[{"value":"S2Fma2E="}]}' "http://localhost:8082/topics/binarytest"

# Expected output from preceding command:
  {"offsets":[{"partition":0,"offset":0,"error_code":null,"error":null}],"key_schema_id":null,"value_schema_id":null}

# Create a consumer for binary data, starting at the beginning of the topic's
# log. Then consume some data from a topic using the base URL in the first response.
# Finally, close the consumer with a DELETE to make it leave the group and clean up
# its resources.
$ curl -X POST -H "Content-Type: application/vnd.kafka.v2+json" \
      --data '{"name": "my_consumer_instance", "format": "binary", "auto.offset.reset": "earliest"}' \
      http://localhost:8082/consumers/my_binary_consumer

# Expected output from preceding command:
  {"instance_id":"my_consumer_instance","base_uri":"http://localhost:8082/consumers/my_binary_consumer/instances/my_consumer_instance"}

$ curl -X POST -H "Content-Type: application/vnd.kafka.v2+json" --data '{"topics":["binarytest"]}' \
      http://localhost:8082/consumers/my_binary_consumer/instances/my_consumer_instance/subscription
# No content in response

$ curl -X GET -H "Accept: application/vnd.kafka.binary.v2+json" \
      http://localhost:8082/consumers/my_binary_consumer/instances/my_consumer_instance/records

# Expected output from preceding command:
 [{"key":null,"value":"S2Fma2E=","partition":0,"offset":0,"topic":"binarytest"}]

$ curl -X DELETE -H "Content-Type: application/vnd.kafka.v2+json" \
      http://localhost:8082/consumers/my_binary_consumer/instances/my_consumer_instance
# No content in response

Inspect Topic Metadata

# Get a list of topics
$ curl "http://localhost:8082/topics"

# Expected output from preceding command:
  ["__consumer_offsets","_schemas","avrotest","binarytest","jsontest"]

# Get info about one topic
$ curl "http://localhost:8082/topics/avrotest"

# Expected output from preceding command:
  {"name":"avrotest","configs":{"message.downconversion.enable":"true","file.delete.delay.ms":"60000",\
  "segment.ms":"604800000","min.compaction.lag.ms":"0","retention.bytes":"-1","segment.index.bytes":"10485760",\
  "cleanup.policy":"delete","follower.replication.throttled.replicas":"",\
  "message.timestamp.difference.max.ms":"9223372036854775807","segment.jitter.ms":"0","preallocate":"false",\
  "message.timestamp.type":"CreateTime","message.format.version":"2.0-IV1","segment.bytes":"1073741824",\
  "unclean.leader.election.enable":"false","max.message.bytes":"1000012","retention.ms":"604800000",\
  "flush.ms":"9223372036854775807","delete.retention.ms":"86400000","leader.replication.throttled.replicas":"",\
  "min.insync.replicas":"1","flush.messages":"9223372036854775807","compression.type":"producer",\
  "index.interval.bytes":"4096","min.cleanable.dirty.ratio":"0.5"},"partitions":\
  [{"partition":0,"leader":0,"replicas":[{"broker":0,"leader":true,"in_sync":true}]}]}
  ...

# Get info about a topic's partitions
$ curl "http://localhost:8082/topics/avrotest/partitions"

# Expected output from preceding command:
  [{"partition":0,"leader":0,"replicas":[{"broker":0,"leader":true,"in_sync":true}]}]

More Examples

For an example that uses REST Proxy configured with security, see the Confluent Platform demo.