Important

You are viewing documentation for an older version of Confluent Platform. For the latest, click here.

Confluent REST Proxy API Reference

Content Types

The Confluent REST Proxy uses content types for both requests and responses to indicate these data properties:

  • Serialization format: json

  • API version (e.g. v2)

  • Embedded formats: json, binary, avro, protobuf and jsonschema

    Important

    The jsonschema and protobuf embedded types are supported beginning with REST Proxy v2.

REST Proxy supports the Avro®, JSON Schema, and Protobuf serialization formats. The versions of the REST Proxy API are v1 and v2.

The embedded format is the format of data you are producing or consuming, which are embedded into requests or responses in the serialization format. For example, you can provide binary data in a json-serialized request; in this case the data should be provided as a base64-encoded string and the content type will be application/vnd.kafka.binary.v2+json. If your data is JSON, you can use json as the embedded format and embed it directly; in this case the content type will be application/vnd.kafka.json.v2+json. With avro, protobuf, and jsonschema embedded types, you can directly embed JSON formatted data along with a schema (or schema ID) in the request. These types use Schema Registry, and the ID of the schema is serialized in addition to the data and payload.

  • The Avro content type is application/vnd.kafka.avro.v2+json.
  • The Protobuf content type is application/vnd.kafka.protobuf.v2+json.
  • The JSON schema content type is application/vnd.kafka.jsonschema.v2+json.

The format for the content type is:

application/vnd.kafka[.embedded_format].[api_version]+[serialization_format]

The embedded format can be omitted when there are no embedded messages (i.e. for metadata requests you can use application/vnd.kafka.v2+json). The preferred content type is application/vnd.kafka.[embedded_format].v1+json. However, other less specific content types are permitted, including application/vnd.kafka+json to indicate no specific API version requirement (the most recent stable version will be used), application/json, and application/octet-stream. The latter two are only supported for compatibility and ease of use. In all cases, if the embedded format is omitted, binary is assumed. Although using these less specific values is permitted, to remain compatible with future versions you should specify preferred content types in requests and check the content types of responses.

Your requests should specify the most specific format and version information possible via the HTTP Accept header:

Accept: application/vnd.kafka.v2+json

The server also supports content negotiation, so you may include multiple, weighted preferences:

Accept: application/vnd.kafka.v2+json; q=0.9, application/json; q=0.5

which can be useful when, for example, a new version of the API is preferred but you cannot be certain it is available yet.

Errors

All API endpoints use a standard error message format for any requests that return an HTTP status indicating an error (any 400 or 500 statuses). For example, a request entity that omits a required field may generate the following response:

HTTP/1.1 422 Unprocessable Entity
Content-Type: application/vnd.kafka.v1+json

{
    "error_code": 422,
    "message": "records may not be empty"
}

Although it is good practice to check the status code, you may safely parse the response of any non-DELETE API calls and check for the presence of an error_code field to detect errors.

Some error codes are used frequently across the entire API and you will probably want to have general purpose code to handle these, whereas most other error codes will need to be handled on a per-request basis.

Tip

To try out the REST Proxy API v3 preview, click here.

ANY /
Status Codes:
  • 401 Unauthorized
    • Error code 40101 – Kafka Authentication Error.
  • 403 Forbidden
    • Error code 40301 – Kafka Authorization Error.
  • 404 Not Found
    • Error code 40401 – Topic not found.
    • Error code 40402 – Partition not found.
  • 422 Unprocessable Entity – The request payload is either improperly formatted or contains semantic errors
  • 500 Internal Server Error
    • Error code 50001 – Zookeeper error.
    • Error code 50002 – Kafka error.
    • Error code 50003 – Retriable Kafka error. Although the operation failed, it’s possible that retrying the request will be successful.
    • Error code 50101 – Only SSL endpoints were found for the specified broker, but SSL is not supported for the invoked API yet.

REST Proxy API v2

Topics

The topics resource provides information about the topics in your Kafka cluster and their current state. It also lets you produce messages by making POST requests to specific topics.

GET /topics

Get a list of Kafka topics.

Response JSON Object:
 
  • topics (array) – List of topic names

Example request:

GET /topics HTTP/1.1
Host: kafkaproxy.example.com
Accept: application/vnd.kafka.v2+json

Example response:

HTTP/1.1 200 OK
Content-Type: application/vnd.kafka.v2+json

["topic1", "topic2"]
GET /topics/(string: topic_name)

Get metadata about a specific topic.

Parameters:
  • topic_name (string) – Name of the topic to get metadata about
Response JSON Object:
 
  • name (string) – Name of the topic
  • configs (map) – Per-topic configuration overrides
  • partitions (array) – List of partitions for this topic
  • partitions[i].partition (int) – the ID of this partition
  • partitions[i].leader (int) – the broker ID of the leader for this partition
  • partitions[i].replicas (array) – list of replicas for this partition, including the leader
  • partitions[i].replicas[j].broker (array) – broker ID of the replica
  • partitions[i].replicas[j].leader (boolean) – true if this replica is the leader for the partition
  • partitions[i].replicas[j].in_sync (boolean) – true if this replica is currently in sync with the leader
Status Codes:

Example request:

GET /topics/test HTTP/1.1
Accept: application/vnd.kafka.v2+json

Example response:

HTTP/1.1 200 OK
Content-Type: application/vnd.kafka.v2+json

{
  "name": "test",
  "configs": {
     "cleanup.policy": "compact"
  },
  "partitions": [
    {
      "partition": 1,
      "leader": 1,
      "replicas": [
        {
          "broker": 1,
          "leader": true,
          "in_sync": true,
        },
        {
          "broker": 2,
          "leader": false,
          "in_sync": true,
        }
      ]
    },
    {
      "partition": 2,
      "leader": 2,
      "replicas": [
        {
          "broker": 1,
          "leader": false,
          "in_sync": true,
        },
        {
          "broker": 2,
          "leader": true,
          "in_sync": true,
        }
      ]
    }
  ]
}
POST /topics/(string: topic_name)

Produce messages to a topic, optionally specifying keys or partitions for the messages. If no partition is provided, one will be chosen based on the hash of the key. If no key is provided, the partition will be chosen for each message in a round-robin fashion.

For the avro, protobuf, and jsonschema embedded formats, you must provide information about schemas and the REST Proxy must be configured with the URL to access Schema Registry (schema.registry.url). Schemas may be provided as the full schema encoded as a string, or, after the initial request may be provided as the schema ID returned with the first response.

Parameters:
  • topic_name (string) – Name of the topic to produce the messages to
Request JSON Object:
 
  • key_schema (string) – Full schema encoded as a string (e.g. JSON serialized for Avro data)
  • key_schema_id (int) – ID returned by a previous request using the same schema. This ID corresponds to the ID of the schema in the registry.
  • value_schema (string) – Full schema encoded as a string (e.g. JSON serialized for Avro data)
  • value_schema_id (int) – ID returned by a previous request using the same schema. This ID corresponds to the ID of the schema in the registry.
Request JSON Array of Objects:
 
  • records – A list of records to produce to the topic.
  • records[i].key (object) – The message key, formatted according to the embedded format, or null to omit a key (optional)
  • records[i].value (object) – The message value, formatted according to the embedded format
  • records[i].partition (int) – Partition to store the message in (optional)
Response JSON Object:
 
  • key_schema_id (int) – The ID for the schema used to produce keys, or null if keys were not used
  • value_schema_id (int) – The ID for the schema used to produce values.
Response JSON Array of Objects:
 
  • offsets (object) – List of partitions and offsets the messages were published to
  • offsets[i].partition (int) – Partition the message was published to, or null if publishing the message failed
  • offsets[i].offset (long) – Offset of the message, or null if publishing the message failed
  • offsets[i].error_code (long) –

    An error code classifying the reason this operation failed, or null if it succeeded.

    • 1 - Non-retriable Kafka exception
    • 2 - Retriable Kafka exception; the message might be sent successfully if retried
  • offsets[i].error (string) – An error message describing why the operation failed, or null if it succeeded
Status Codes:
  • 404 Not Found
    • Error code 40401 – Topic not found
  • 422 Unprocessable Entity
    • Error code 42201 – Request includes keys and uses a format that requires schemas, but does not include the key_schema or key_schema_id fields
    • Error code 42202 – Request includes values and uses a format that requires schemas, but does not include the value_schema or value_schema_id fields
    • Error code 42205 – Request includes invalid schema.
  • 408 Request Timeout
    • Error code 40801 – Schema registration or lookup failed.

Example binary request:

POST /topics/test HTTP/1.1
Host: kafkaproxy.example.com
Content-Type: application/vnd.kafka.binary.v2+json
Accept: application/vnd.kafka.v2+json, application/vnd.kafka+json, application/json

{
  "records": [
    {
      "key": "a2V5",
      "value": "Y29uZmx1ZW50"
    },
    {
      "value": "a2Fma2E=",
      "partition": 1
    },
    {
      "value": "bG9ncw=="
    }
  ]
}

Example binary response:

HTTP/1.1 200 OK
Content-Type: application/vnd.kafka.v2+json

{
  "key_schema_id": null,
  "value_schema_id": null,
  "offsets": [
    {
      "partition": 2,
      "offset": 100
    },
    {
      "partition": 1,
      "offset": 101
    },
    {
      "partition": 2,
      "offset": 102
    }
  ]
}

Example Avro request:

POST /topics/test HTTP/1.1
Host: kafkaproxy.example.com
Content-Type: application/vnd.kafka.avro.v2+json
Accept: application/vnd.kafka.v2+json, application/vnd.kafka+json, application/json

{
  "value_schema": "{\"name\":\"int\",\"type\": \"int\"}",
  "records": [
    {
      "value": 12
    },
    {
      "value": 24,
      "partition": 1
    }
  ]
}

Example Avro response:

HTTP/1.1 200 OK
Content-Type: application/vnd.kafka.v2+json

{
  "key_schema_id": null,
  "value_schema_id": 32,
  "offsets": [
    {
      "partition": 2,
      "offset": 103
    },
    {
      "partition": 1,
      "offset": 104
    }
  ]
}

Example JSON request:

POST /topics/test HTTP/1.1
Host: kafkaproxy.example.com
Content-Type: application/vnd.kafka.json.v2+json
Accept: application/vnd.kafka.v2+json, application/vnd.kafka+json, application/json

{
  "records": [
    {
      "key": "somekey",
      "value": {"foo": "bar"}
    },
    {
      "value": [ "foo", "bar" ],
      "partition": 1
    },
    {
      "value": 53.5
    }
  ]
}

Example JSON response:

HTTP/1.1 200 OK
Content-Type: application/vnd.kafka.v2+json

{
  "key_schema_id": null,
  "value_schema_id": null,
  "offsets": [
    {
      "partition": 2,
      "offset": 100
    },
    {
      "partition": 1,
      "offset": 101
    },
    {
      "partition": 2,
      "offset": 102
    }
  ]
}

Partitions

The partitions resource provides per-partition metadata, including the current leaders and replicas for each partition. It also allows you to consume and produce messages to single partition using GET and POST requests.

GET /topics/(string: topic_name)/partitions

Get a list of partitions for the topic.

Parameters:
  • topic_name (string) – the name of the topic
Response JSON Array of Objects:
 
  • partition (int) – ID of the partition
  • leader (int) – Broker ID of the leader for this partition
  • replicas (array) – List of brokers acting as replicas for this partition
  • replicas[i].broker (int) – Broker ID of the replica
  • replicas[i].leader (boolean) – true if this broker is the leader for the partition
  • replicas[i].in_sync (boolean) – true if the replica is in sync with the leader
Status Codes:

Example request:

GET /topics/test/partitions HTTP/1.1
Host: kafkaproxy.example.com
Accept: application/vnd.kafka.v2+json, application/vnd.kafka+json, application/json

Example response:

HTTP/1.1 200 OK
Content-Type: application/vnd.kafka.v2+json

[
  {
    "partition": 1,
    "leader": 1,
    "replicas": [
      {
        "broker": 1,
        "leader": true,
        "in_sync": true,
      },
      {
        "broker": 2,
        "leader": false,
        "in_sync": true,
      },
      {
        "broker": 3,
        "leader": false,
        "in_sync": false,
      }
    ]
  },
  {
    "partition": 2,
    "leader": 2,
    "replicas": [
      {
        "broker": 1,
        "leader": false,
        "in_sync": true,
      },
      {
        "broker": 2,
        "leader": true,
        "in_sync": true,
      },
      {
        "broker": 3,
        "leader": false,
        "in_sync": false,
      }
    ]
  }
]
GET /topics/(string: topic_name)/partitions/(int: partition_id)

Get metadata about a single partition in the topic.

Parameters:
  • topic_name (string) – Name of the topic
  • partition_id (int) – ID of the partition to inspect
Response JSON Object:
 
  • partition (int) – ID of the partition
  • leader (int) – Broker ID of the leader for this partition
  • replicas (array) – List of brokers acting as replicas for this partition
  • replicas[i].broker (int) – Broker ID of the replica
  • replicas[i].leader (boolean) – true if this broker is the leader for the partition
  • replicas[i].in_sync (boolean) – true if the replica is in sync with the leader
Status Codes:
  • 404 Not Found
    • Error code 40401 – Topic not found
    • Error code 40402 – Partition not found

Example request:

GET /topics/test/partitions/1 HTTP/1.1
Host: kafkaproxy.example.com
Accept: application/vnd.kafka.v2+json, application/vnd.kafka+json, application/json

Example response:

HTTP/1.1 200 OK
Content-Type: application/vnd.kafka.v2+json

{
  "partition": 1,
  "leader": 1,
  "replicas": [
    {
      "broker": 1,
      "leader": true,
      "in_sync": true,
    },
    {
      "broker": 2,
      "leader": false,
      "in_sync": true,
    },
    {
      "broker": 3,
      "leader": false,
      "in_sync": false,
    }
  ]
}
GET /topics/(string: topic_name)/partitions/(int: partition_id)/offsets

Get a summary of the offsets in this topic partition.

Parameters:
  • topic_name (string) – Name of the topic
  • partition_id (int) – ID of the partition to inspect
Response JSON Object:
 
  • beginning_offset (int) – First offset in this partition
  • end_offset (int) – Last offset in this partition
Status Codes:
  • 404 Not Found
    • Error code 40401 – Topic not found
    • Error code 40402 – Partition not found

Example request:

GET /topics/test/partitions/1/offsets HTTP/1.1
Host: kafkaproxy.example.com
Accept: application/vnd.kafka.v2+json, application/vnd.kafka+json, application/json

Example response:

HTTP/1.1 200 OK
Content-Type: application/vnd.kafka.v2+json

{
  "beginning_offset": 10,
  "end_offset": 50,
}
POST /topics/(string: topic_name)/partitions/(int: partition_id)

Produce messages to one partition of the topic. For the Avro, JSON Schema, and Protobuf embedded formats, you must provide information about schemas. This may be provided as the full schema encoded as a string, or, after the initial request may be provided as the schema ID returned with the first response.

Parameters:
  • topic_name (string) – Topic to produce the messages to
  • partition_id (int) – Partition to produce the messages to
Request JSON Object:
 
  • key_schema (string) – Full schema encoded as a string (e.g. JSON serialized for Avro data)
  • key_schema_id (int) – ID returned by a previous request using the same schema. This ID corresponds to the ID of the schema in the registry.
  • value_schema (string) – Full schema encoded as a string (e.g. JSON serialized for Avro data)
  • value_schema_id (int) – ID returned by a previous request using the same schema. This ID corresponds to the ID of the schema in the registry.
  • records – A list of records to produce to the partition.
Request JSON Array of Objects:
 
  • records[i].key (object) – The message key, formatted according to the embedded format, or null to omit a key (optional)
  • records[i].value (object) – The message value, formatted according to the embedded format
Response JSON Object:
 
  • key_schema_id (int) – The ID for the schema used to produce keys, or null if keys were not used
  • value_schema_id (int) – The ID for the schema used to produce values.
Response JSON Array of Objects:
 
  • offsets (object) – List of partitions and offsets the messages were published to
  • offsets[i].partition (int) – Partition the message was published to. This will be the same as the partition_id parameter and is provided only to maintain consistency with responses from producing to a topic
  • offsets[i].offset (long) – Offset of the message
  • offsets[i].error_code (long) –

    An error code classifying the reason this operation failed, or null if it succeeded.

    • 1 - Non-retriable Kafka exception
    • 2 - Retriable Kafka exception; the message might be sent successfully if retried
  • offsets[i].error (string) – An error message describing why the operation failed, or null if it succeeded
Status Codes:
  • 404 Not Found
    • Error code 40401 – Topic not found
    • Error code 40402 – Partition not found
  • 422 Unprocessable Entity
    • Error code 42201 – Request includes keys and uses a format that requires schemas, but does not include the key_schema or key_schema_id fields
    • Error code 42202 – Request includes values and uses a format that requires schemas, but does not include the value_schema or value_schema_id fields
    • Error code 42205 – Request includes invalid schema.

Example binary request:

POST /topics/test/partitions/1 HTTP/1.1
Host: kafkaproxy.example.com
Content-Type: application/vnd.kafka.binary.v2+json
Accept: application/vnd.kafka.v2+json, application/vnd.kafka+json, application/json

{
  "records": [
    {
      "key": "a2V5",
      "value": "Y29uZmx1ZW50"
    },
    {
      "value": "a2Fma2E="
    }
  ]
}

Example binary response:

HTTP/1.1 200 OK
Content-Type: application/vnd.kafka.v2+json

{
  "key_schema_id": null,
  "value_schema_id": null,
  "offsets": [
    {
      "partition": 1,
      "offset": 100,
    },
    {
      "partition": 1,
      "offset": 101,
    }
  ]
}

Example Avro request:

POST /topics/test/partitions/1 HTTP/1.1
Host: kafkaproxy.example.com
Content-Type: application/vnd.kafka.avro.v2+json
Accept: application/vnd.kafka.v2+json, application/vnd.kafka+json, application/json

{
  "value_schema": "{\"name\":\"int\",\"type\": \"int\"}"
  "records": [
    {
      "value": 25
    },
    {
      "value": 26
    }
  ]
}

Example Avro response:

HTTP/1.1 200 OK
Content-Type: application/vnd.kafka.v2+json

{
  "key_schema_id": null,
  "value_schema_id": 32,
  "offsets": [
    {
      "partition": 1,
      "offset": 100,
    },
    {
      "partition": 1,
      "offset": 101,
    }
  ]
}

Example JSON request:

POST /topics/test/partitions/1 HTTP/1.1
Host: kafkaproxy.example.com
Content-Type: application/vnd.kafka.json.v2+json
Accept: application/vnd.kafka.v2+json, application/vnd.kafka+json, application/json

{
  "records": [
    {
      "key": "somekey",
      "value": {"foo": "bar"}
    },
    {
      "value": 53.5
    }
  ]
}

Example JSON response:

HTTP/1.1 200 OK
Content-Type: application/vnd.kafka.v2+json

{
  "key_schema_id": null,
  "value_schema_id": null,
  "offsets": [
    {
      "partition": 1,
      "offset": 100,
    },
    {
      "partition": 1,
      "offset": 101,
    }
  ]
}

Example PROTOBUF request:

POST /topics/test/partitions/1 HTTP/1.1
Content-Type: application/vnd.kafka.protobuf.v2+json
Accept: application/vnd.kafka.v2+json, application/json

{
  "value_schema": "syntax=\"proto3\"; message Foo { string f1 = 1; }"
  "records": [{"value": {"f1": "foo"}}]
}

Example PROTOBUF response:

HTTP/1.1 200 OK
Content-Type: application/vnd.kafka.v2+json

{
  "key_schema_id": null,
  "value_schema_id": 32,
  "offsets": [
    {
      "partition": 1,
      "offset": 100,
    },
    {
      "partition": 1,
      "offset": 101,
    }
  ]
}

Example JSONSCHEMA request:

POST /topics/test/partitions/1 HTTP/1.1
Content-Type: application/vnd.kafka.jsonschema.v2+json
Accept: application/vnd.kafka.v2+json, application/json

{
  "value_schema": "{\"type\":\"object\",\"properties\":{\"f1\":{\"type\":\"string\"}}}",
  "records": [{"value": {"f1": "bar"}}]
}

Example JSONSCHEMA response:

HTTP/1.1 200 OK
Content-Type: application/vnd.kafka.v2+json

{
  "key_schema_id": null,
  "value_schema_id": 32,
  "offsets": [
    {
      "partition": 1,
      "offset": 100,
    },
    {
      "partition": 1,
      "offset": 101,
    }
  ]
}

Consumers

The consumers resource provides access to the current state of consumer groups, allows you to create a consumer in a consumer group and consume messages from topics and partitions. REST Proxy can convert data stored in Kafka in serialized form into a JSON-compatible embedded format. These formats are supported:

  • Raw binary data is encoded as base64 strings
  • Avro data is converted into embedded
  • JSON objects, and JSON is embedded directly
  • Protobuf
  • JSON Schema

Because consumers are stateful, any consumer instances created with the REST API are tied to a specific REST Proxy instance. A full URL is provided when the instance is created and it should be used to construct any subsequent requests. Failing to use the returned URL for future consumer requests will result in 404 errors because the consumer instance will not be found. If a REST Proxy instance is shutdown, it will attempt to cleanly destroy any consumers before it is terminated.

POST /consumers/(string: group_name)

Create a new consumer instance in the consumer group. The format parameter controls the deserialization of data from Kafka and the content type that must be used in the Accept header of subsequent read API requests performed against this consumer. For example, if the creation request specifies avro for the format, subsequent read requests should use Accept: application/vnd.kafka.avro.v2+json.

Note that the response includes a URL including the host since the consumer is stateful and tied to a specific REST Proxy instance. Subsequent examples in this section use a Host header for this specific REST Proxy instance.

Parameters:
  • group_name (string) – The name of the consumer group to join
Request JSON Object:
 
  • name (string) – Name for the consumer instance, which will be used in URLs for the consumer. This must be unique, at least within REST Proxy process handling the request. If omitted, falls back on the automatically generated ID. Using automatically generated names is recommended for most use cases.
  • format (string) – The format of consumed messages, which is used to convert messages into a JSON-compatible form. Valid values: “binary”, “avro”, “json”, “jsonschema”, and protobuf. If unspecified, defaults to “binary”.
  • auto.offset.reset (string) – Sets the auto.offset.reset setting for the consumer
  • auto.commit.enable (string) – Sets the auto.commit.enable setting for the consumer
  • fetch.min.bytes (string) – Sets the fetch.min.bytes setting for this consumer specifically
  • consumer.request.timeout.ms (string) – Sets the consumer.request.timeout.ms setting for this consumer specifically. This setting controls the maximum total time to wait for messages for a request if the maximum request size has not yet been reached. It does not affect the underlying consumer->broker connection. Default value is taken from the REST Proxy config file
Response JSON Object:
 
  • instance_id (string) – Unique ID for the consumer instance in this group.
  • base_uri (string) – Base URI used to construct URIs for subsequent requests against this consumer instance. This will be of the form http://hostname:port/consumers/consumer_group/instances/instance_id.
Status Codes:
  • 409 Conflict
    • Error code 40902 – Consumer instance with the specified name already exists.
  • 422 Unprocessable Entity
    • Error code 42204 – Invalid consumer configuration. One of the settings specified in the request contained an invalid value.

Example request:

POST /consumers/testgroup/ HTTP/1.1
Host: kafkaproxy.example.com
Content-Type: application/vnd.kafka.v2+json


{
  "name": "my_consumer",
  "format": "binary",
  "auto.offset.reset": "earliest",
  "auto.commit.enable": "false"
}

Example response:

HTTP/1.1 200 OK
Content-Type: application/vnd.kafka.v2+json

{
  "instance_id": "my_consumer",
  "base_uri": "http://proxy-instance.kafkaproxy.example.com/consumers/testgroup/instances/my_consumer"
}

Example PROTOBUF request:

POST /consumers/testgroup/ HTTP/1.1
Host: kafkaproxy.example.com
Content-Type: application/vnd.kafka.protobuf.v2+json


{
  "name": "my_consumer",
  "format": "protobuf",
  "auto.offset.reset": "earliest",
  "auto.commit.enable": "false"
}

Example PROTOBUF response:

HTTP/1.1 200 OK
Content-Type: application/vnd.kafka.protobuf.v2+json

{
  "instance_id": "my_consumer",
  "base_uri": "http://proxy-instance.kafkaproxy.example.com/consumers/my_protobuf_consumer"
}

Example JSONSCHEMA request:

POST /consumers/testgroup/ HTTP/1.1
Host: kafkaproxy.example.com
Content-Type: application/vnd.kafka.jsonschema.v2+json


{
  "name": "my_consumer",
  "format": "jsonschema",
  "auto.offset.reset": "earliest",
  "auto.commit.enable": "false"
}

Example JSONSCHEMA response:

HTTP/1.1 200 OK
Content-Type: application/vnd.kafka.jsonschema.v2+json

{
  "instance_id": "my_consumer",
  "base_uri": "http://proxy-instance.kafkaproxy.example.com/consumers/my_jsonschema_consumer"
}
DELETE /consumers/(string: group_name)/instances/(string: instance)

Destroy the consumer instance.

Note that this request must be made to the specific REST Proxy instance holding the consumer instance.

Parameters:
  • group_name (string) – The name of the consumer group
  • instance (string) – The ID of the consumer instance
Status Codes:
  • 404 Not Found
    • Error code 40403 – Consumer instance not found

Example request:

DELETE /consumers/testgroup/instances/my_consumer HTTP/1.1
Host: proxy-instance.kafkaproxy.example.com
Content-Type: application/vnd.kafka.v2+json

Example response:

HTTP/1.1 204 No Content
POST /consumers/(string: group_name)/instances/(string: instance)/offsets

Commit a list of offsets for the consumer. When the post body is empty, it commits all the records that have been fetched by the consumer instance.

Note that this request must be made to the specific REST Proxy instance holding the consumer instance.

Parameters:
  • group_name (string) – The name of the consumer group
  • instance (string) – The ID of the consumer instance
Request JSON Array of Objects:
 
  • offsets – A list of offsets to commit for partitions
  • offsets[i].topic (string) – Name of the topic
  • offsets[i].partition (int) – Partition ID
  • offset – the offset to commit
Status Codes:
  • 404 Not Found
    • Error code 40403 – Consumer instance not found

Example request:

POST /consumers/testgroup/instances/my_consumer/offsets HTTP/1.1
Host: proxy-instance.kafkaproxy.example.com
Content-Type: application/vnd.kafka.v2+json

{
  "offsets": [
    {
      "topic": "test",
      "partition": 0,
      "offset": 20
    },
    {
      "topic": "test",
      "partition": 1,
      "offset": 30
    }
  ]
}
GET /consumers/(string: group_name)/instances/(string: instance)/offsets

Get the last committed offsets for the given partitions (whether the commit happened by this process or another).

Note that this request must be made to the specific REST Proxy instance holding the consumer instance.

Parameters:
  • group_name (string) – The name of the consumer group
  • instance (string) – The ID of the consumer instance
Request JSON Array of Objects:
 
  • partitions – A list of partitions to find the last committed offsets for
  • partitions[i].topic (string) – Name of the topic
  • partitions[i].partition (int) – Partition ID
Response JSON Array of Objects:
 
  • offsets – A list of committed offsets
  • offsets[i].topic (string) – Name of the topic for which an offset was committed
  • offsets[i].partition (int) – Partition ID for which an offset was committed
  • offsets[i].offset (int) – Committed offset
  • offsets[i].metadata (string) – Metadata for the committed offset
Status Codes:
  • 404 Not Found
    • Error code 40402 – Partition not found
    • Error code 40403 – Consumer instance not found

Example request:

GET /consumers/testgroup/instances/my_consumer/offsets HTTP/1.1
Host: proxy-instance.kafkaproxy.example.com
Content-Type: application/vnd.kafka.v2+json, application/vnd.kafka+json, application/json

{
  "partitions": [
    {
      "topic": "test",
      "partition": 0
    },
    {
      "topic": "test",
      "partition": 1
    }

  ]
}

Example response:

HTTP/1.1 200 OK
Content-Type: application/vnd.kafka.v2+json

{"offsets":
 [
  {
    "topic": "test",
    "partition": 0,
    "offset": 21,
    "metadata":""
  },
  {
    "topic": "test",
    "partition": 1,
    "offset": 31,
    "metadata":""
  }
 ]
}
POST /consumers/(string: group_name)/instances/(string: instance)/subscription

Subscribe to the given list of topics or a topic pattern to get dynamically assigned partitions. If a prior subscription exists, it would be replaced by the latest subscription.

Parameters:
  • group_name (string) – The name of the consumer group
  • instance (string) – The ID of the consumer instance
Request JSON Array of Objects:
 
  • topics – A list of topics to subscribe
  • topics[i].topic (string) – Name of the topic
Request JSON Object:
 
  • topic_pattern (string) – A REGEX pattern. topics_pattern and topics fields are mutually exclusive.
Status Codes:
  • 404 Not Found
    • Error code 40403 – Consumer instance not found
  • 409 Conflict
    • Error code 40903 – Subscription to topics, partitions and pattern are mutually exclusive.

Example request:

POST /consumers/testgroup/instances/my_consumer/subscription HTTP/1.1
Host: proxy-instance.kafkaproxy.example.com
Content-Type: application/vnd.kafka.v2+json

{
  "topics": [
    "test1",
    "test2"
  ]
}

Example response:

HTTP/1.1 204 No Content

Example request:

POST /consumers/testgroup/instances/my_consumer/subscription HTTP/1.1
Host: proxy-instance.kafkaproxy.example.com
Content-Type: application/vnd.kafka.v2+json

{
  "topic_pattern": "test.*"
}

Example response:

HTTP/1.1 204 No Content
GET /consumers/(string: group_name)/instances/(string: instance)/subscription

Get the current subscribed list of topics.

Parameters:
  • group_name (string) – The name of the consumer group
  • instance (string) – The ID of the consumer instance
Response JSON Array of Objects:
 
  • topics – A list of subscribed topics
  • topics[i] (string) – Name of the topic
Status Codes:
  • 404 Not Found
    • Error code 40403 – Consumer instance not found

Example request:

GET /consumers/testgroup/instances/my_consumer/subscription HTTP/1.1
Host: proxy-instance.kafkaproxy.example.com
Accept: application/vnd.kafka.v2+json
HTTP/1.1 200 OK
Content-Type: application/vnd.kafka.v2+json

{
  "topics": [
    "test1",
    "test2"
  ]
}
DELETE /consumers/(string: group_name)/instances/(string: instance)/subscription

Unsubscribe from topics currently subscribed.

Note that this request must be made to the specific REST Proxy instance holding the consumer instance.

Parameters:
  • group_name (string) – The name of the consumer group
  • instance (string) – The ID of the consumer instance
Status Codes:
  • 404 Not Found
    • Error code 40403 – Consumer instance not found

Example request:

DELETE /consumers/testgroup/instances/my_consumer/subscription HTTP/1.1
Host: proxy-instance.kafkaproxy.example.com
Accept: application/vnd.kafka.v2+json, application/vnd.kafka+json, application/json

Example response:

HTTP/1.1 204 No Content
POST /consumers/(string: group_name)/instances/(string: instance)/assignments

Manually assign a list of partitions to this consumer.

Parameters:
  • group_name (string) – The name of the consumer group
  • instance (string) – The ID of the consumer instance
Request JSON Array of Objects:
 
  • partitions – A list of partitions to assign to this consumer
  • partitions[i].topic (string) – Name of the topic
  • partitions[i].partition (int) – Partition ID
Status Codes:
  • 404 Not Found
    • Error code 40403 – Consumer instance not found
  • 409 Conflict
    • Error code 40903 – Subscription to topics, partitions and pattern are mutually exclusive.

Example request:

POST /consumers/testgroup/instances/my_consumer/assignments HTTP/1.1
Host: proxy-instance.kafkaproxy.example.com
Content-Type: application/vnd.kafka.v2+json

{
  "partitions": [
    {
      "topic": "test",
      "partition": 0
    },
    {
      "topic": "test",
      "partition": 1
    }

  ]
}

Example response:

HTTP/1.1 204 No Content
GET /consumers/(string: group_name)/instances/(string: instance)/assignments

Get the list of partitions currently manually assigned to this consumer.

Parameters:
  • group_name (string) – The name of the consumer group
  • instance (string) – The ID of the consumer instance
Response JSON Array of Objects:
 
  • partitions – A list of partitions manually to assign to this consumer
  • partitions[i].topic (string) – Name of the topic
  • partitions[i].partition (int) – Partition ID
Status Codes:
  • 404 Not Found
    • Error code 40403 – Consumer instance not found

Example request:

GET /consumers/testgroup/instances/my_consumer/assignments HTTP/1.1
Host: proxy-instance.kafkaproxy.example.com
Accept: application/vnd.kafka.v2+json
HTTP/1.1 200 OK
Content-Type: application/vnd.kafka.v2+json

{
  "partitions": [
    {
      "topic": "test",
      "partition": 0
    },
    {
      "topic": "test",
      "partition": 1
    }

  ]
}
POST /consumers/(string: group_name)/instances/(string: instance)/positions

Overrides the fetch offsets that the consumer will use for the next set of records to fetch.

Parameters:
  • group_name (string) – The name of the consumer group
  • instance (string) – The ID of the consumer instance
Request JSON Array of Objects:
 
  • offsets – A list of offsets
  • offsets[i].topic (string) – Name of the topic for
  • offsets[i].partition (int) – Partition ID
  • offsets[i].offset (int) – Seek to offset for the next set of records to fetch
Status Codes:
  • 404 Not Found
    • Error code 40403 – Consumer instance not found

Example request:

POST /consumers/testgroup/instances/my_consumer/positions HTTP/1.1
Host: proxy-instance.kafkaproxy.example.com
Content-Type: application/vnd.kafka.v2+json


{
  "offsets": [
    {
      "topic": "test",
      "partition": 0,
      "offset": 20
    },
    {
      "topic": "test",
      "partition": 1,
      "offset": 30
    }
  ]
}

Example response:

HTTP/1.1 204 No Content
POST /consumers/(string: group_name)/instances/(string: instance)/positions/beginning

Seek to the first offset for each of the given partitions.

Parameters:
  • group_name (string) – The name of the consumer group
  • instance (string) – The ID of the consumer instance
Request JSON Array of Objects:
 
  • partitions – A list of partitions
  • partitions[i].topic (string) – Name of the topic
  • partitions[i].partition (int) – Partition ID
Status Codes:
  • 404 Not Found
    • Error code 40403 – Consumer instance not found

Example request:

POST /consumers/testgroup/instances/my_consumer/positions/beginning HTTP/1.1
Host: proxy-instance.kafkaproxy.example.com
Content-Type: application/vnd.kafka.v2+json

{
  "partitions": [
    {
      "topic": "test",
      "partition": 0
    },
    {
      "topic": "test",
      "partition": 1
    }

  ]
}

Example response:

HTTP/1.1 204 No Content
POST /consumers/(string: group_name)/instances/(string: instance)/positions/end

Seek to the last offset for each of the given partitions.

Parameters:
  • group_name (string) – The name of the consumer group
  • instance (string) – The ID of the consumer instance
Request JSON Array of Objects:
 
  • partitions – A list of partitions
  • partitions[i].topic (string) – Name of the topic
  • partitions[i].partition (int) – Partition ID
Status Codes:
  • 404 Not Found
    • Error code 40403 – Consumer instance not found

Example request:

POST /consumers/testgroup/instances/my_consumer/positions/end HTTP/1.1
Host: proxy-instance.kafkaproxy.example.com
Content-Type: application/vnd.kafka.v2+json

{
  "partitions": [
    {
      "topic": "test",
      "partition": 0
    },
    {
      "topic": "test",
      "partition": 1
    }

  ]
}

Example response:

HTTP/1.1 204 No Content
GET /consumers/(string: group_name)/instances/(string: instance)/records

Fetch data for the topics or partitions specified using one of the subscribe/assign APIs.

The format of the embedded data returned by this request is determined by the format specified in the initial consumer instance creation request and must match the format of the Accept header. Mismatches will result in error code 40601.

Note that this request must be made to the specific REST Proxy instance holding the consumer instance.

Parameters:
  • group_name (string) – The name of the consumer group
  • instance (string) – The ID of the consumer instance
Query Parameters:
 
  • timeout – Maximum amount of milliseconds the REST Proxy will spend fetching records. Other parameters controlling actual time spent fetching records: max_bytes and fetch.min.bytes. Default value is undefined. This parameter is used only if it’s smaller than the consumer.timeout.ms that is defined either during consumer instance creation or in the REST Proxy’s config file.
  • max_bytes – The maximum number of bytes of unencoded keys and values that should be included in the response. This provides approximate control over the size of responses and the amount of memory required to store the decoded response. The actual limit will be the minimum of this setting and the server-side configuration consumer.request.max.bytes. Default is unlimited.
Response JSON Array of Objects:
 
  • topic (string) – The topic
  • key (string) – The message key, formatted according to the embedded format
  • value (string) – The message value, formatted according to the embedded format
  • partition (int) – Partition of the message
  • offset (long) – Offset of the message
Status Codes:
  • 404 Not Found
    • Error code 40403 – Consumer instance not found
  • 406 Not Acceptable
    • Error code 40601 – Consumer format does not match the embedded format requested by the Accept header.

Example binary request:

GET /consumers/testgroup/instances/my_consumer/records?timeout=3000&max_bytes=300000 HTTP/1.1
Host: proxy-instance.kafkaproxy.example.com
Accept: application/vnd.kafka.binary.v2+json

Example binary response:

HTTP/1.1 200 OK
Content-Type: application/vnd.kafka.binary.v2+json

[
  {
    "topic": "test",
    "key": "a2V5",
    "value": "Y29uZmx1ZW50",
    "partition": 1,
    "offset": 100,
  },
  {
    "topic": "test",
    "key": "a2V5",
    "value": "a2Fma2E=",
    "partition": 2,
    "offset": 101,
  }
]

Example Avro request:

GET /consumers/avrogroup/instances/my_avro_consumer/records?timeout=3000&max_bytes=300000 HTTP/1.1
Host: proxy-instance.kafkaproxy.example.com
Accept: application/vnd.kafka.avro.v2+json

Example Avro response:

HTTP/1.1 200 OK
Content-Type: application/vnd.kafka.avro.v2+json

[
  {
    "topic": "test",
    "key": 1,
    "value": {
      "id": 1,
      "name": "Bill"
    },
    "partition": 1,
    "offset": 100,
  },
  {
    "topic": "test",
    "key": 2,
    "value": {
      "id": 2,
      "name": "Melinda"
    },
    "partition": 2,
    "offset": 101,
  }
]

Example JSON request:

GET /consumers/jsongroup/instances/my_json_consumer/records?timeout=3000&max_bytes=300000 HTTP/1.1
Host: proxy-instance.kafkaproxy.example.com
Accept: application/vnd.kafka.json.v2+json

Example JSON response:

HTTP/1.1 200 OK
Content-Type: application/vnd.kafka.json.v2+json

[
  {
    "topic": "test",
    "key": "somekey",
    "value": {"foo":"bar"},
    "partition": 1,
    "offset": 10,
  },
  {
    "topic": "test",
    "key": "somekey",
    "value": ["foo", "bar"],
    "partition": 2,
    "offset": 11,
  }
]

Brokers

The brokers resource provides access to the current state of Kafka brokers in the cluster.

GET /brokers

Get a list of brokers.

Response JSON Object:
 
  • brokers (array) – List of broker IDs

Example request:

GET /brokers HTTP/1.1
Host: kafkaproxy.example.com
Accept: application/vnd.kafka.v2+json, application/vnd.kafka+json, application/json

Example response:

HTTP/1.1 200 OK
Content-Type: application/vnd.kafka.v2+json

{
  "brokers": [1, 2, 3]
}

REST Proxy API v1

Important

Beginning with the next major release of Confluent Platform, Confluent REST Proxy V1 will be removed.

Topics

The topics resource provides information about the topics in your Kafka cluster and their current state. It also lets you produce messages by making POST requests to specific topics.

GET /topics

Get a list of Kafka topics.

Response JSON Object:
 
  • topics (array) – List of topic names

Example request:

GET /topics HTTP/1.1
Host: kafkaproxy.example.com
Accept: application/vnd.kafka.v1+json, application/vnd.kafka+json, application/json

Example response:

HTTP/1.1 200 OK
Content-Type: application/vnd.kafka.v1+json

["topic1", "topic2"]
GET /topics/(string: topic_name)

Get metadata about a specific topic.

Parameters:
  • topic_name (string) – Name of the topic to get metadata about
Response JSON Object:
 
  • name (string) – Name of the topic
  • configs (map) – Per-topic configuration overrides
  • partitions (array) – List of partitions for this topic
  • partitions[i].partition (int) – the ID of this partition
  • partitions[i].leader (int) – the broker ID of the leader for this partition
  • partitions[i].replicas (array) – list of replicas for this partition, including the leader
  • partitions[i].replicas[j].broker (array) – broker ID of the replica
  • partitions[i].replicas[j].leader (boolean) – true if this replica is the leader for the partition
  • partitions[i].replicas[j].in_sync (boolean) – true if this replica is currently in sync with the leader
Status Codes:

Example request:

GET /topics/test HTTP/1.1
Host: kafkaproxy.example.com
Accept: application/vnd.kafka.v1+json, application/vnd.kafka+json, application/json

Example response:

HTTP/1.1 200 OK
Content-Type: application/vnd.kafka.v1+json

{
  "name": "test",
  "configs": {
     "cleanup.policy": "compact"
  },
  "partitions": [
    {
      "partition": 1,
      "leader": 1,
      "replicas": [
        {
          "broker": 1,
          "leader": true,
          "in_sync": true,
        },
        {
          "broker": 2,
          "leader": false,
          "in_sync": true,
        }
      ]
    },
    {
      "partition": 2,
      "leader": 2,
      "replicas": [
        {
          "broker": 1,
          "leader": false,
          "in_sync": true,
        },
        {
          "broker": 2,
          "leader": true,
          "in_sync": true,
        }
      ]
    }
  ]
}
POST /topics/(string: topic_name)

Produce messages to a topic, optionally specifying keys or partitions for the messages. If no partition is provided, one will be chosen based on the hash of the key. If no key is provided, the partition will be chosen for each message in a round-robin fashion.

Avro, JSON, JSON Schema, Protobuf, and binary message formats.

For the Avro, JSON Schema, and Protobuf embedded formats, you must provide information about schemas and the REST Proxy must be configured with the URL to access Schema Registry (schema.registry.url). Schemas may be provided as the full schema encoded as a string, or, after the initial request may be provided as the schema ID returned with the first response. Note that if you use Avro, JSON Schema, or Protobuf for value you must also use them for the key, but the key and value may have different schemas.

Parameters:
  • topic_name (string) – Name of the topic to produce the messages to
Request JSON Object:
 
  • key_schema (string) – Full schema encoded as a string (e.g. JSON serialized for Avro data). This is only needed for Avro format.
  • key_schema_id (int) – ID returned by a previous request using the same schema. This ID corresponds to the ID of the schema in the registry.
  • value_schema (string) – Full schema encoded as a string (e.g. JSON serialized for Avro data). This is only needed for Avro format.
  • value_schema_id (int) – ID returned by a previous request using the same schema. This ID corresponds to the ID of the schema in the registry.
Request JSON Array of Objects:
 
  • records – A list of records to produce to the topic.
  • records[i].key (object) – The message key, formatted according to the embedded format, or null to omit a key (optional)
  • records[i].value (object) – The message value, formatted according to the embedded format
  • records[i].partition (int) – Partition to store the message in (optional)
Response JSON Object:
 
  • key_schema_id (int) – The ID for the schema used to produce keys, or null if keys were not used
  • value_schema_id (int) – The ID for the schema used to produce values.
Response JSON Array of Objects:
 
  • offsets (object) – List of partitions and offsets the messages were published to
  • offsets[i].partition (int) – Partition the message was published to, or null if publishing the message failed
  • offsets[i].offset (long) – Offset of the message, or null if publishing the message failed
  • offsets[i].error_code (long) –

    An error code classifying the reason this operation failed, or null if it succeeded.

    • 1 - Non-retriable Kafka exception
    • 2 - Retriable Kafka exception; the message might be sent successfully if retried
  • offsets[i].error (string) – An error message describing why the operation failed, or null if it succeeded
Status Codes:
  • 404 Not Found
    • Error code 40401 – Topic not found
  • 422 Unprocessable Entity
    • Error code 42201 – Request includes keys and uses a format that requires schemas, but does not include the key_schema or key_schema_id fields
    • Error code 42202 – Request includes values and uses a format that requires schemas, but does not include the value_schema or value_schema_id fields
    • Error code 42205 – Request includes invalid schema.

Example binary request:

POST /topics/test HTTP/1.1
Host: kafkaproxy.example.com
Content-Type: application/vnd.kafka.binary.v1+json
Accept: application/vnd.kafka.v1+json, application/vnd.kafka+json, application/json

{
  "records": [
    {
      "key": "a2V5",
      "value": "Y29uZmx1ZW50"
    },
    {
      "value": "a2Fma2E=",
      "partition": 1
    },
    {
      "value": "bG9ncw=="
    }
  ]
}

Example binary response:

HTTP/1.1 200 OK
Content-Type: application/vnd.kafka.v1+json

{
  "key_schema_id": null,
  "value_schema_id": null,
  "offsets": [
    {
      "partition": 2,
      "offset": 100
    },
    {
      "partition": 1,
      "offset": 101
    },
    {
      "partition": 2,
      "offset": 102
    }
  ]
}

Example Avro request:

POST /topics/test HTTP/1.1
Host: kafkaproxy.example.com
Content-Type: application/vnd.kafka.avro.v1+json
Accept: application/vnd.kafka.v1+json, application/vnd.kafka+json, application/json

{
  "value_schema": "{\"name\":\"int\",\"type\": \"int\"}"
  "records": [
    {
      "value": 12
    },
    {
      "value": 24,
      "partition": 1
    }
  ]
}

Example Avro response:

HTTP/1.1 200 OK
Content-Type: application/vnd.kafka.v1+json

{
  "key_schema_id": null,
  "value_schema_id": 32,
  "offsets": [
    {
      "partition": 2,
      "offset": 103
    },
    {
      "partition": 1,
      "offset": 104
    }
  ]
}

Example JSON request:

POST /topics/test HTTP/1.1
Host: kafkaproxy.example.com
Content-Type: application/vnd.kafka.json.v1+json
Accept: application/vnd.kafka.v1+json, application/vnd.kafka+json, application/json

{
  "records": [
    {
      "key": "somekey",
      "value": {"foo": "bar"}
    },
    {
      "value": [ "foo", "bar" ],
      "partition": 1
    },
    {
      "value": 53.5
    }
  ]
}

Example JSON response:

HTTP/1.1 200 OK
Content-Type: application/vnd.kafka.v1+json

{
  "key_schema_id": null,
  "value_schema_id": null,
  "offsets": [
    {
      "partition": 2,
      "offset": 100
    },
    {
      "partition": 1,
      "offset": 101
    },
    {
      "partition": 2,
      "offset": 102
    }
  ]
}

Partitions

The partitions resource provides per-partition metadata, including the current leaders and replicas for each partition. It also allows you to consume and produce messages to single partition using GET and POST requests.

GET /topics/(string: topic_name)/partitions

Get a list of partitions for the topic.

Parameters:
  • topic_name (string) – the name of the topic
Response JSON Array of Objects:
 
  • partition (int) – ID of the partition
  • leader (int) – Broker ID of the leader for this partition
  • replicas (array) – List of brokers acting as replicas for this partition
  • replicas[i].broker (int) – Broker ID of the replica
  • replicas[i].leader (boolean) – true if this broker is the leader for the partition
  • replicas[i].in_sync (boolean) – true if the replica is in sync with the leader
Status Codes:
  • 404 Not Found
    • Error code 40401 – Topic not found

    Example request:

GET /topics/test/partitions HTTP/1.1
Host: kafkaproxy.example.com
Accept: application/vnd.kafka.v1+json, application/vnd.kafka+json, application/json

Example response:

HTTP/1.1 200 OK
Content-Type: application/vnd.kafka.v1+json

[
  {
    "partition": 1,
    "leader": 1,
    "replicas": [
      {
        "broker": 1,
        "leader": true,
        "in_sync": true,
      },
      {
        "broker": 2,
        "leader": false,
        "in_sync": true,
      },
      {
        "broker": 3,
        "leader": false,
        "in_sync": false,
      }
    ]
  },
  {
    "partition": 2,
    "leader": 2,
    "replicas": [
      {
        "broker": 1,
        "leader": false,
        "in_sync": true,
      },
      {
        "broker": 2,
        "leader": true,
        "in_sync": true,
      },
      {
        "broker": 3,
        "leader": false,
        "in_sync": false,
      }
    ]
  }
]
GET /topics/(string: topic_name)/partitions/(int: partition_id)

Get metadata about a single partition in the topic.

Parameters:
  • topic_name (string) – Name of the topic
  • partition_id (int) – ID of the partition to inspect
Response JSON Object:
 
  • partition (int) – ID of the partition
  • leader (int) – Broker ID of the leader for this partition
  • replicas (array) – List of brokers acting as replicas for this partition
  • replicas[i].broker (int) – Broker ID of the replica
  • replicas[i].leader (boolean) – true if this broker is the leader for the partition
  • replicas[i].in_sync (boolean) – true if the replica is in sync with the leader
Status Codes:
  • 404 Not Found
    • Error code 40401 – Topic not found
    • Error code 40402 – Partition not found

Example request:

GET /topics/test/partitions/1 HTTP/1.1
Host: kafkaproxy.example.com
Accept: application/vnd.kafka.v1+json, application/vnd.kafka+json, application/json

Example response:

HTTP/1.1 200 OK
Content-Type: application/vnd.kafka.v1+json

{
  "partition": 1,
  "leader": 1,
  "replicas": [
    {
      "broker": 1,
      "leader": true,
      "in_sync": true,
    },
    {
      "broker": 2,
      "leader": false,
      "in_sync": true,
    },
    {
      "broker": 3,
      "leader": false,
      "in_sync": false,
    }
  ]
}
GET /topics/(string: topic_name)/partitions/(int: partition_id)/messages[?offset=(int)][&timestamp=(string)][&count=(int)]

Consume messages from one partition of the topic.

Parameters:
  • topic_name (string) – Topic to consume the messages from
  • partition_id (int) – Partition to consume the messages from
Query Parameters:
 
  • offset (int) – Offset to start from
  • timestamp (string) – ISO 8601 timestamp to start from
  • count (int) – Number of messages to consume (optional). Default is 1.
Response JSON Array of Objects:
 
  • key (string) – The message key, formatted according to the embedded format
  • value (string) – The message value, formatted according to the embedded format
  • partition (int) – Partition of the message
  • offset (long) – Offset of the message
Status Codes:
  • 404 Not Found
    • Error code 40401 – Topic not found
    • Error code 40402 – Partition not found
    • Error code 40404 – Leader not available
  • 500 Internal Server Error
    • Error code 500 – General consumer error response, caused by an exception during the operation. An error message is included in the standard format which explains the cause.
  • 503 Service Unavailable
    • Error code 50301 – No SimpleConsumer is available at the time in the pool. The request can be retried. You can increase the pool size or the pool timeout to avoid this error in the future.

Example binary request with offset:

GET /topic/test/partitions/1/messages?offset=10&count=2 HTTP/1.1
Host: proxy-instance.kafkaproxy.example.com
Accept: application/vnd.kafka.binary.v1+json

Example binary response:

HTTP/1.1 200 OK
Content-Type: application/vnd.kafka.binary.v1+json

[
  {
    "key": "a2V5",
    "value": "Y29uZmx1ZW50",
    "partition": 1,
    "offset": 10,
  },
  {
    "key": "a2V5",
    "value": "a2Fma2E=",
    "partition": 1,
    "offset": 11,
  }
]

Example Avro request with offset:

GET /topic/test/partitions/1/messages?offset=1 HTTP/1.1
Host: proxy-instance.kafkaproxy.example.com
Accept: application/vnd.kafka.avro.v1+json

Example Avro response:

HTTP/1.1 200 OK
Content-Type: application/vnd.kafka.avro.v1+json

[
  {
    "key": 1,
    "value": {
      "id": 1,
      "name": "Bill"
    },
    "partition": 1,
    "offset": 1,
  }
]

Example JSON request with timestamp:

GET /topic/test/partitions/1/messages?offset=2019-10-15T13:30:00Z&count=2 HTTP/1.1
Host: proxy-instance.kafkaproxy.example.com
Accept: application/vnd.kafka.json.v1+json

Example JSON response:

HTTP/1.1 200 OK
Content-Type: application/vnd.kafka.json.v1+json

[
  {
    "key": "somekey",
    "value": {"foo":"bar"},
    "partition": 1,
    "offset": 10,
  },
  {
    "key": "somekey",
    "value": ["foo", "bar"],
    "partition": 1,
    "offset": 11,
  }
]
POST /topics/(string: topic_name)/partitions/(int: partition_id)

Produce messages to one partition of the topic. For the Avro, JSON Schema, and Protobuf embedded formats, you must provide information about schemas. This may be provided as the full schema encoded as a string, or, after the initial request may be provided as the schema ID returned with the first response.

Parameters:
  • topic_name (string) – Topic to produce the messages to
  • partition_id (int) – Partition to produce the messages to
Request JSON Object:
 
  • key_schema (string) – Full schema encoded as a string (e.g. JSON serialized for Avro data)
  • key_schema_id (int) – ID returned by a previous request using the same schema. This ID corresponds to the ID of the schema in the registry.
  • value_schema (string) – Full schema encoded as a string (e.g. JSON serialized for Avro data)
  • value_schema_id (int) – ID returned by a previous request using the same schema. This ID corresponds to the ID of the schema in the registry.
  • records – A list of records to produce to the partition.
Request JSON Array of Objects:
 
  • records[i].key (object) – The message key, formatted according to the embedded format, or null to omit a key (optional)
  • records[i].value (object) – The message value, formatted according to the embedded format
Response JSON Object:
 
  • key_schema_id (int) – The ID for the schema used to produce keys, or null if keys were not used
  • value_schema_id (int) – The ID for the schema used to produce values.
Response JSON Array of Objects:
 
  • offsets (object) – List of partitions and offsets the messages were published to
  • offsets[i].partition (int) – Partition the message was published to. This will be the same as the partition_id parameter and is provided only to maintain consistency with responses from producing to a topic
  • offsets[i].offset (long) – Offset of the message
  • offsets[i].error_code (long) –

    An error code classifying the reason this operation failed, or null if it succeeded.

    • 1 - Non-retriable Kafka exception
    • 2 - Retriable Kafka exception; the message might be sent successfully if retried
  • offsets[i].error (string) – An error message describing why the operation failed, or null if it succeeded
Status Codes:
  • 404 Not Found
    • Error code 40401 – Topic not found
    • Error code 40402 – Partition not found
  • 422 Unprocessable Entity
    • Error code 42201 – Request includes keys and uses a format that requires schemas, but does not include the key_schema or key_schema_id fields
    • Error code 42202 – Request includes values and uses a format that requires schemas, but does not include the value_schema or value_schema_id fields
    • Error code 42205 – Request includes invalid schema.

Example binary request:

POST /topics/test/partitions/1 HTTP/1.1
Host: kafkaproxy.example.com
Content-Type: application/vnd.kafka.binary.v1+json
Accept: application/vnd.kafka.v1+json, application/vnd.kafka+json, application/json

{
  "records": [
    {
      "key": "a2V5",
      "value": "Y29uZmx1ZW50"
    },
    {
      "value": "a2Fma2E="
    }
  ]
}

Example binary response:

HTTP/1.1 200 OK
Content-Type: application/vnd.kafka.v1+json

{
  "key_schema_id": null,
  "value_schema_id": null,
  "offsets": [
    {
      "partition": 1,
      "offset": 100,
    },
    {
      "partition": 1,
      "offset": 101,
    }
  ]
}

Example Avro request:

POST /topics/test/partitions/1 HTTP/1.1
Host: kafkaproxy.example.com
Content-Type: application/vnd.kafka.avro.v1+json
Accept: application/vnd.kafka.v1+json, application/vnd.kafka+json, application/json

{
  "value_schema": "{\"name\":\"int\",\"type\": \"int\"}"
  "records": [
    {
      "value": 25
    },
    {
      "value": 26
    }
  ]
}

Example Avro response:

HTTP/1.1 200 OK
Content-Type: application/vnd.kafka.v1+json

{
  "key_schema_id": null,
  "value_schema_id": 32,
  "offsets": [
    {
      "partition": 1,
      "offset": 100,
    },
    {
      "partition": 1,
      "offset": 101,
    }
  ]
}

Example JSON request:

POST /topics/test/partitions/1 HTTP/1.1
Host: kafkaproxy.example.com
Content-Type: application/vnd.kafka.json.v1+json
Accept: application/vnd.kafka.v1+json, application/vnd.kafka+json, application/json

{
  "records": [
    {
      "key": "somekey",
      "value": {"foo": "bar"}
    },
    {
      "value": 53.5
    }
  ]
}

Example JSON response:

HTTP/1.1 200 OK
Content-Type: application/vnd.kafka.v1+json

{
  "key_schema_id": null,
  "value_schema_id": null,
  "offsets": [
    {
      "partition": 1,
      "offset": 100,
    },
    {
      "partition": 1,
      "offset": 101,
    }
  ]
}

Consumers

Important

  • The Consumer API for REST Proxy v1 does not support Kafka security.
  • Beginning with the next major release of Confluent Platform, Confluent REST Proxy V1 will be removed.

The consumers resource provides access to the current state of consumer groups, allows you to create a consumer in a consumer group and consume messages from topics and partitions. REST Proxy can convert data stored in Kafka in serialized form into a JSON-compatible embedded format. These formats are supported:

  • Raw binary data is encoded as base64 strings
  • Avro data is converted into embedded
  • JSON objects, and JSON is embedded directly
  • Protobuf
  • JSON Schema

Because consumers are stateful, any consumer instances created with the REST API are tied to a specific REST Proxy instance. A full URL is provided when the instance is created and it should be used to construct any subsequent requests. Failing to use the returned URL for future consumer requests will result in 404 errors because the consumer instance will not be found. If a REST Proxy instance is shutdown, it will attempt to cleanly destroy any consumers before it is terminated.

Consumers may not change the set of topics they are subscribed to once they have started consuming messages. For example, if a consumer is created without specifying topic subscriptions, the first read from a topic will subscribe the consumer to that topic and attempting to read from another topic will cause an error.

POST /consumers/(string: group_name)

Create a new consumer instance in the consumer group. The format parameter controls the deserialization of data from Kafka and the content type that must be used in the Accept header of subsequent read API requests performed against this consumer. For example, if the creation request specifies avro for the format, subsequent read requests should use Accept: application/vnd.kafka.avro.v1+json.

Note that the response includes a URL including the host since the consumer is stateful and tied to a specific REST Proxy instance. Subsequent examples in this section use a Host header for this specific REST Proxy instance.

Parameters:
  • group_name (string) – The name of the consumer group to join
Request JSON Object:
 
  • id (string) – DEPRECATED Unique ID for the consumer instance in this group. If omitted, one will be automatically generated
  • name (string) – Name for the consumer instance, which will be used in URLs for the consumer. This must be unique, at least within the REST Proxy process handling the request. If omitted, falls back on the automatically generated ID. Using automatically generated names is recommended for most use cases.
  • format (string) – The format of consumed messages, which is used to convert messages into a JSON-compatible form. Valid values: “binary”, “avro”, “json”, “jsonschema”, and protobuf. If unspecified, defaults to “binary”.
  • auto.offset.reset (string) – Sets the auto.offset.reset setting for the consumer
  • auto.commit.enable (string) – Sets the auto.commit.enable setting for the consumer
Response JSON Object:
 
  • instance_id (string) – Unique ID for the consumer instance in this group. If provided in the initial request, this will be identical to id.
  • base_uri (string) – Base URI used to construct URIs for subsequent requests against this consumer instance. This will be of the form http://hostname:port/consumers/consumer_group/instances/instance_id.
Status Codes:
  • 409 Conflict
    • Error code 40902 – Consumer instance with the specified name already exists.
  • 422 Unprocessable Entity
    • Error code 42204 – Invalid consumer configuration. One of the settings specified in the request contained an invalid value.

Example request:

POST /consumers/testgroup/ HTTP/1.1
Host: kafkaproxy.example.com
Accept: application/vnd.kafka.v1+json, application/vnd.kafka+json, application/json

{
  "name": "my_consumer",
  "format": "binary",
  "auto.offset.reset": "smallest",
  "auto.commit.enable": "false"
}

Example response:

HTTP/1.1 200 OK
Content-Type: application/vnd.kafka.v1+json

{
  "instance_id": "my_consumer",
  "base_uri": "http://proxy-instance.kafkaproxy.example.com/consumers/testgroup/instances/my_consumer"
}
POST /consumers/(string: group_name)/instances/(string: instance)/offsets

Commit offsets for the consumer. Returns a list of the partitions with the committed offsets.

The body of this request is empty. The offsets are determined by the current state of the consumer instance on the proxy. The returned state includes both consumed and committed offsets. After a successful commit, these should be identical; however, both are included so the output format is consistent with other API calls that return the offsets.

Note that this request must be made to the specific REST Proxy instance holding the consumer instance.

Parameters:
  • group_name (string) – The name of the consumer group
  • instance (string) – The ID of the consumer instance
Response JSON Array of Objects:
 
  • topic (string) – Name of the topic for which an offset was committed
  • partition (int) – Partition ID for which an offset was committed
  • consumed (long) – The offset of the most recently consumed message
  • committed (long) – The committed offset value. If the commit was successful, this should be identical to consumed.
Status Codes:
  • 404 Not Found
    • Error code 40403 – Consumer instance not found

Example request:

POST /consumers/testgroup/instances/my_consumer/offsets HTTP/1.1
Host: proxy-instance.kafkaproxy.example.com
Accept: application/vnd.kafka.v1+json, application/vnd.kafka+json, application/json

Example response:

HTTP/1.1 200 OK
Content-Type: application/vnd.kafka.v1+json

[
  {
    "topic": "test",
    "partition": 1,
    "consumed": 100,
    "committed": 100
  },
  {
    "topic": "test",
    "partition": 2,
    "consumed": 200,
    "committed": 200
  },
  {
    "topic": "test2",
    "partition": 1,
    "consumed": 50,
    "committed": 50
  }
]
DELETE /consumers/(string: group_name)/instances/(string: instance)

Destroy the consumer instance.

Note that this request must be made to the specific REST Proxy instance holding the consumer instance.

Parameters:
  • group_name (string) – The name of the consumer group
  • instance (string) – The ID of the consumer instance
Status Codes:
  • 404 Not Found
    • Error code 40403 – Consumer instance not found

Example request:

DELETE /consumers/testgroup/instances/my_consumer HTTP/1.1
Host: proxy-instance.kafkaproxy.example.com
Accept: application/vnd.kafka.v1+json, application/vnd.kafka+json, application/json

Example response:

HTTP/1.1 204 No Content
GET /consumers/(string: group_name)/instances/(string: instance)/topics/(string: topic_name)

Consume messages from a topic. If the consumer is not yet subscribed to the topic, this adds it as a subscriber, possibly causing a consumer rebalance.

The format of the embedded data returned by this request is determined by the format specified in the initial consumer instance creation request and must match the format of the Accept header. Mismatches will result in error code 40601.

Note that this request must be made to the specific REST Proxy instance holding the consumer instance.

Parameters:
  • group_name (string) – The name of the consumer group
  • instance (string) – The ID of the consumer instance
  • topic_name (string) – The topic to consume messages from.
Query Parameters:
 
  • max_bytes – The maximum number of bytes of unencoded keys and values that should be included in the response. This provides approximate control over the size of responses and the amount of memory required to store the decoded response. The actual limit will be the minimum of this setting and the server-side configuration consumer.request.max.bytes. Default is unlimited.
Response JSON Array of Objects:
 
  • key (string) – The message key, formatted according to the embedded format
  • value (string) – The message value, formatted according to the embedded format
  • partition (int) – Partition of the message
  • offset (long) – Offset of the message
Status Codes:
  • 404 Not Found
    • Error code 40401 – Topic not found
    • Error code 40403 – Consumer instance not found
  • 406 Not Acceptable
    • Error code 40601 – Consumer format does not match the embedded format requested by the Accept header.
  • 409 Conflict
    • Error code 40901 – Consumer has already initiated a subscription. Consumers may subscribe to multiple topics, but all subscriptions must be initiated in a single request.
  • 500 Internal Server Error
    • Error code 500 – General consumer error response, caused by an exception during the operation. An error message is included in the standard format which explains the cause.

Example binary request:

GET /consumers/testgroup/instances/my_consumer/topics/test_topic HTTP/1.1
Host: proxy-instance.kafkaproxy.example.com
Accept: application/vnd.kafka.binary.v1+json

Example binary response:

HTTP/1.1 200 OK
Content-Type: application/vnd.kafka.binary.v1+json

[
  {
    "key": "a2V5",
    "value": "Y29uZmx1ZW50",
    "partition": 1,
    "offset": 100,
    "topic": "test_topic"
  },
  {
    "key": "a2V5",
    "value": "a2Fma2E=",
    "partition": 2,
    "offset": 101,
    "topic": "test_topic"
  }
]

Example Avro request:

GET /consumers/avrogroup/instances/my_avro_consumer/topics/test_avro_topic HTTP/1.1
Host: proxy-instance.kafkaproxy.example.com
Accept: application/vnd.kafka.avro.v1+json

Example Avro response:

HTTP/1.1 200 OK
Content-Type: application/vnd.kafka.avro.v1+json

[
  {
    "key": 1,
    "value": {
      "id": 1,
      "name": "Bill"
    },
    "partition": 1,
    "offset": 100,
    "topic": "test_avro_topic"
  },
  {
    "key": 2,
    "value": {
      "id": 2,
      "name": "Melinda"
    },
    "partition": 2,
    "offset": 101,
    "topic": "test_avro_topic"
  }
]

Example JSON request:

GET /consumers/jsongroup/instances/my_json_consumer/topics/test_json_topic HTTP/1.1
Host: proxy-instance.kafkaproxy.example.com
Accept: application/vnd.kafka.json.v1+json

Example JSON response:

HTTP/1.1 200 OK
Content-Type: application/vnd.kafka.json.v1+json

[
  {
    "key": "somekey",
    "value": {"foo":"bar"},
    "partition": 1,
    "offset": 10,
    "topic": "test_json_topic"
  },
  {
    "key": "somekey",
    "value": ["foo", "bar"],
    "partition": 2,
    "offset": 11,
    "topic": "test_json_topic"
  }
]

Brokers

The brokers resource provides access to the current state of Kafka brokers in the cluster.

GET /brokers

Get a list of brokers.

Response JSON Object:
 
  • brokers (array) – List of broker IDs

Example request:

GET /brokers HTTP/1.1
Host: kafkaproxy.example.com
Accept: application/vnd.kafka.v1+json, application/vnd.kafka+json, application/json

Example response:

HTTP/1.1 200 OK
Content-Type: application/vnd.kafka.v1+json

{
  "brokers": [1, 2, 3]
}

REST Proxy API v3

Important

The REST Proxy API v3 is currently available as a preview feature. A preview feature is a component of Confluent Platform that is being introduced to gain early feedback from developers. The API v3 can be used for evaluation and non-production testing purposes or to provide feedback to Confluent.

Content-Type

Starting with API v3, REST Proxy will rely on URL versioning instead of using Content-Type for API versioning. All v3 API endpoints are prefixed with the /v3 path segment.

The v3 Admin APIs use the JSON:API specification. The Content-Type for both request and response of these APIs is application/vnd.api+json.

Clusters

GET /v3/clusters

List the clusters. Because the REST Proxy can only connect to a single Kafka cluster, this list will always return a single cluster.

Response JSON Object:
 
  • links.self (string) – An absolute link for this collection.
  • data[i].links.self (string) – An absolute link for the i-th cluster.
  • data[i].type (string) – The Confluent resource type for the i-th cluster.
  • data[i].id (string) – The Confluent resource name for the i-th cluster.
  • data[i].attributes.cluster_id (string) – The cluster ID of the i-th cluster.
  • data[i].relationships.controller.links.related (string) – An absolute link for the controller broker of the i-th cluster.
  • data[i].relationships.brokers.links.related (string) – An absolute link for the collection of brokers of the i-th cluster.
  • data[i].relationships.topics.links.related (string) – An absolute link for the collection of topics of the i-th cluster.

Example request:

GET /v3/clusters HTTP/1.1
Host: kafkaproxy.example.com
Accept: application/vnd.api+json

Example response:

HTTP/1.1 200 OK
Content-Type: application/vnd.api+json

{
  "links": {
    "self": "http://kafkaproxy.example.com/v3/clusters"
  },
  "data": [
    {
      "links" {
        "self": "http://kafkaproxy.example.com/v3/clusters/cluster-1"
      },
      "type": "KafkaCluster",
      "id": "crn:///kafka=cluster-1",
      "attributes": {
        "cluster_id": "cluster-1"
      },
      "relationships": {
        "controller": {
          "links": {
            "related": "http://kafkaproxy.example.com/v3/clusters/cluster-1/brokers/1"
          }
        },
        "brokers": {
          "links": {
            "related": "http://kafkaproxy.example.com/v3/clusters/cluster-1/brokers"
          }
        },
        "topics": {
          "links": {
            "related": "http://kafkaproxy.example.com/v3/clusters/cluster-1/topics"
          }
        }
      }
    }
  ]
}
GET /v3/clusters/(string: cluster_id)

Get a cluster.

Parameters:
  • cluster_id (string) – The cluster ID.
Response JSON Object:
 
  • data.links.self (string) – An absolute link for this cluster.
  • data.type (string) – The Confluent resource type for this cluster.
  • data.id (string) – The Confluent resource name for this cluster.
  • data.attributes.cluster_id (string) – The cluster ID of this cluster.
  • data.relationships.controller.links.related (string) – An absolute link for the controller broker of this cluster.
  • data.relationships.brokers.links.related (string) – An absolute link for the collection of brokers of this cluster.
  • data.relationships.topics.links.related (string) – An absolute link for the collection of topics of this cluster.

Example request:

GET /v3/clusters/cluster-1 HTTP/1.1
Host: kafkaproxy.example.com
Accept: application/vnd.api+json

Example response:

HTTP/1.1 200 OK
Content-Type: application/vnd.api+json

{
  "data": {
    "links" {
      "self": "http://kafkaproxy.example.com/v3/clusters/cluster-1"
    },
    "type": "KafkaCluster",
    "id": "crn:///kafka=cluster-1",
    "attributes": {
      "cluster_id": "cluster-1"
    },
    "relationships": {
      "controller": {
        "links": {
          "related": "http://kafkaproxy.example.com/v3/clusters/cluster-1/brokers/1"
        }
      },
      "brokers": {
        "links": {
          "related": "http://kafkaproxy.example.com/v3/clusters/cluster-1/brokers"
        }
      },
      "topics": {
        "links": {
          "related": "http://kafkaproxy.example.com/v3/clusters/cluster-1/topics"
        }
      }
    }
  }
}

Brokers

GET /v3/clusters/(string: cluster_id)/brokers

List the brokers belonging to a given cluster.

Parameters:
  • cluster_id (string) – The cluster ID of the cluster the brokers belong to.
Response JSON Object:
 
  • links.self (string) – An absolute link for this collection.
  • data[i].links.self (string) – An absolute link for the i-th broker.
  • data[i].type (string) – The Confluent resource type for the i-th broker.
  • data[i].id (string) – The Confluent resource name for the i-th broker.
  • data[i].attributes.cluster_id (string) – The cluster ID of the cluster the i-th broker belongs to.
  • data[i].attributes.broker_id (int) – The broker ID of the i-th broker.
  • data[i].attributes.host (string) – The advertised hostname of the i-th broker.
  • data[i].attributes.port (int) – The advertised port of the i-th broker.
  • data[i].attributes.rack (string) – The rack of the i-th broker, or null if the i-th broker is not rack-aware.

Example request:

GET /v3/clusters/cluster-1/brokers HTTP/1.1
Host: kafkaproxy.example.com
Accept: application/vnd.api+json

Example response:

HTTP/1.1 200 OK
Content-Type: application/vnd.api+json

{
  "links": {
    "self": "http://kafkaproxy.example.com/v3/clusters/cluster-1/brokers"
  },
  "data": [
    {
      "links" {
        "self": "http://kafkaproxy.example.com/v3/clusters/cluster-1/brokers/1"
      },
      "type": "KafkaBroker",
      "id": "crn:///kafka=cluster-1/broker=1",
      "attributes": {
        "cluster_id": "cluster-1",
        "broker_id": 1,
        "host": "12.3.4.56",
        "port": 9090,
        "rack": "rack-1"
      }
    },
    {
      "links" {
        "self": "http://kafkaproxy.example.com/v3/clusters/cluster-1/brokers/2"
      },
      "type": "KafkaBroker",
      "id": "crn:///kafka=cluster-1/broker=2",
      "attributes": {
        "cluster_id": "cluster-1",
        "broker_id": 2,
        "host": "12.3.4.57",
        "port": 9091,
        "rack": null
      }
    }
  ]
}
GET /v3/clusters/(string: cluster_id)/brokers/(int: broker_id)

Get a broker.

Parameters:
  • cluster_id (string) – The cluster ID of the cluster the broker belongs to.
  • broker_id (int) – The broker ID of the broker.
Response JSON Object:
 
  • data.links.self (string) – An absolute link for this broker.
  • data.type (string) – The Confluent resource type for this broker.
  • data.id (string) – The Confluent resource name for this broker.
  • data.attributes.cluster_id (string) – The cluster ID of the cluster this broker belongs to.
  • data.attributes.broker_id (int) – The broker ID of this broker.
  • data.attributes.host (string) – The advertised hostname of this broker.
  • data.attributes.port (int) – The advertised port of this broker.
  • data.attributes.rack (string) – The rack of this broker, or null if this broker is not rack-aware.

Example request:

GET /v3/clusters/cluster-1/brokers/1 HTTP/1.1
Host: kafkaproxy.example.com
Accept: application/vnd.api+json

Example response:

HTTP/1.1 200 OK
Content-Type: application/vnd.api+json

{
  "data": {
    "links" {
      "self": "http://kafkaproxy.example.com/v3/clusters/cluster-1/brokers/1"
    },
    "type": "KafkaBroker",
    "id": "crn:///kafka=cluster-1/broker=1",
    "attributes": {
      "cluster_id": "cluster-1",
      "broker_id": 1,
      "host": "12.3.4.56",
      "port": 9090,
      "rack": "rack-1"
    }
  }
}

Topics

GET /v3/clusters/(string: cluster_id)/topics

List the topics belonging to a given cluster.

Parameters:
  • cluster_id (string) – The cluster ID of the cluster the topics belong to.
Response JSON Object:
 
  • links.self (string) – An absolute link for this collection.
  • data[i].links.self (string) – An absolute link for the i-th topic.
  • data[i].type (string) – The Confluent resource type for the i-th topic.
  • data[i].id (string) – The Confluent resource name for the i-th topic.
  • data[i].attributes.cluster_id (string) – The cluster ID of the cluster the i-th topic belongs to.
  • data[i].attributes.topic_name (string) – The name of the i-th topic.
  • data[i].attributes.is_internal (boolean) – Whether the i-th topic is an internal topic.
  • data[i].attributes.replication_factor (int) – The replication factor of the i-th topic.
  • data[i].relationships.configs.links.related (string) – An absolute link to the collection of configs of the i-th topic.
  • data[i].relationships.partitions.links.related (string) – An absolute link to the collection of partitions of the i-th topic.

Example request:

GET /v3/clusters/cluster-1/topics HTTP/1.1
Host: kafkaproxy.example.com
Accept: application/vnd.api+json

Example response:

HTTP/1.1 200 OK
Content-Type: application/vnd.api+json

{
  "links": {
    "self": "http://kafkaproxy.example.com/v3/clusters/cluster-1/topics"
  },
  "data": [
    {
      "links" {
        "self": "http://kafkaproxy.example.com/v3/clusters/cluster-1/topics/topic-1"
      },
      "type": "KafkaTopic",
      "id": "crn:///kafka=cluster-1/topic=topic-1",
      "attributes": {
        "cluster_id": "cluster-1",
        "topic_name": "topic-1",
        "is_internal": false,
        "replication_factor": 3
      },
      "relationships": {
        "configs": {
          "links": {
            "related": "http://kafkaproxy.example.com/v3/clusters/cluster-1/topics/topic-1/configs"
          }
        },
        "partitions": {
          "links": {
            "related": "http://kafkaproxy.example.com/v3/clusters/cluster-1/topics/topic-1/partitions"
          }
        }
      }
    },
    {
      "links" {
        "self": "http://kafkaproxy.example.com/v3/clusters/cluster-1/topics/topic-2"
      },
      "type": "KafkaTopic",
      "id": "crn:///kafka=cluster-1/topic=topic-2",
      "attributes": {
        "cluster_id": "cluster-1",
        "topic_name": "topic-2",
        "is_internal": false,
        "replication_factor": 3
      },
      "relationships": {
        "configs": {
          "links": {
            "related": "http://kafkaproxy.example.com/v3/clusters/cluster-1/topics/topic-2/configs"
          }
        },
        "partitions": {
          "links": {
            "related": "http://kafkaproxy.example.com/v3/clusters/cluster-1/topics/topic-2/partitions"
          }
        }
      }
    }
  ]
}
GET /v3/clusters/(string: cluster_id)/topics/(string: topic_name)

Get a topic.

Parameters:
  • cluster_id (string) – The cluster ID of the cluster the topic belongs to.
  • topic_name (string) – The name of the topic.
Response JSON Object:
 
  • data.links.self (string) – An absolute link for this topic.
  • data.type (string) – The Confluent resource type for this topic.
  • data.id (string) – The Confluent resource name for this topic.
  • data.attributes.cluster_id (string) – The cluster ID of the cluster this topic belongs to.
  • data.attributes.topic_name (string) – The name of this topic.
  • data.attributes.is_internal (boolean) – Whether this topic is an internal topic.
  • data.attributes.replication_factor (int) – The replication factor of this topic.
  • data.relationships.configs.links.related (string) – An absolute link to the collection of configs of this topic.
  • data.relationships.partitions.links.related (string) – An absolute link to the collection of partitions of this topic.

Example request:

GET /v3/clusters/cluster-1/topics/topic-1 HTTP/1.1
Host: kafkaproxy.example.com
Accept: application/vnd.api+json

Example response:

HTTP/1.1 200 OK
Content-Type: application/vnd.api+json

{
  "data": {
    "links" {
      "self": "http://kafkaproxy.example.com/v3/clusters/cluster-1/topics/topic-1"
    },
    "type": "KafkaTopic",
    "id": "crn:///kafka=cluster-1/topic=topic-1",
    "attributes": {
      "cluster_id": "cluster-1",
      "topic_name": "topic-1",
      "is_internal": false,
      "replication_factor": 3
    },
    "relationships": {
      "configs": {
        "links": {
          "related": "http://kafkaproxy.example.com/v3/clusters/cluster-1/topics/topic-1/configs"
        }
      },
      "partitions": {
        "links": {
          "related": "http://kafkaproxy.example.com/v3/clusters/cluster-1/topics/topic-1/partitions"
        }
      }
    }
  }
}
POST /v3/clusters/(string: cluster_id)/topics

Create a topic.

Parameters:
  • cluster_id (string) – The cluster ID of the cluster the topic belongs to.
Request JSON Object:
 
  • data.attributes.topic_name (string) – The name of the topic.
  • data.attributes.partitions_count (int) – The number of partitions of the topic.
  • data.attributes.replication_factor (int) – The replication factor of the topic.
  • data.attributes.configs[i].name (string) – The name of the i-th config of the topic.
  • data.attributes.configs[i].value (string) – The value of the i-th config of the topic.
Response JSON Object:
 
  • data.links.self (string) – An absolute link for this topic.
  • data.type (string) – The Confluent resource type for this topic.
  • data.id (string) – The Confluent resource name for this topic.
  • data.attributes.cluster_id (string) – The cluster ID of the cluster this topic belongs to.
  • data.attributes.topic_name (string) – The name of this topic.
  • data.attributes.is_internal (boolean) – Whether this topic is an internal topic.
  • data.attributes.replication_factor (int) – The replication factor of this topic.
  • data.relationships.configs.links.related (string) – An absolute link to the collection of configs of this topic.
  • data.relationships.partitions.links.related (string) – An absolute link to the collection of partitions of this topic.

Example request:

POST /v3/clusters/cluster-1/topics HTTP/1.1
Host: kafkaproxy.example.com
Content-Type: application/vnd.api+json
Accept: application/vnd.api+json

{
  "data": {
    "attributes": {
      "topic_name": "topic-1",
      "partitions_count": 2,
      "replication_factor": 3,
      "configs": [
        {
          "name": "cleanup.policy",
          "value": "compact"
        }
      ]
    }
  }
}

Example response:

HTTP/1.1 201 Created
Content-Type: application/vnd.api+json

{
  "data": {
    "links" {
      "self": "http://kafkaproxy.example.com/v3/clusters/cluster-1/topics/topic-1"
    },
    "type": "KafkaTopic",
    "id": "crn:///kafka=cluster-1/topic=topic-1",
    "attributes": {
      "cluster_id": "cluster-1",
      "topic_name": "topic-1",
      "is_internal": false,
      "replication_factor": 3
    },
    "relationships": {
      "configs": {
        "links": {
          "related": "http://kafkaproxy.example.com/v3/clusters/cluster-1/topics/topic-1/configs"
        }
      },
      "partitions": {
        "links": {
          "related": "http://kafkaproxy.example.com/v3/clusters/cluster-1/topics/topic-1/partitions"
        }
      }
    }
  }
}
DELETE /v3/clusters/(string: cluster_id)/topics/(string: topic_name)

Delete a topic.

Parameters:
  • cluster_id (string) – The cluster ID of the cluster the topic belongs to.
  • topic_name (string) – The name of the topic.

Example request:

DELETE /v3/clusters/cluster-1/topics/topic-1 HTTP/1.1
Host: kafkaproxy.example.com

Example response:

HTTP/1.1 204 No Content

Topic Configuration

GET /v3/clusters/(string: cluster_id)/topics/(string: topic_name)/configs

List the configs of a given topic.

Parameters:
  • cluster_id (string) – The cluster ID of the cluster the configs belong to.
  • topic_name (string) – The name of the topic the configs belong to.
Response JSON Object:
 
  • links.self (string) – An absolute link for this collection.
  • data[i].links.self (string) – An absolute link for the i-th config.
  • data[i].type (string) – The Confluent resource type for the i-th config.
  • data[i].id (string) – The Confluent resource name for the i-th config.
  • data[i].attributes.cluster_id (string) – The cluster ID of the cluster the i-th config belongs to.
  • data[i].attributes.topic_name (string) – The name of the topic the i-th config belongs to.
  • data[i].attributes.name (string) – The name of the i-th config.
  • data[i].attributes.value (string) – The value of the i-th config.
  • data[i].attributes.is_default (boolean) – Whether the value of the i-th config is the default value, or it has been set.
  • data[i].attributes.is_read_only (boolean) – Whether the value of the i-th config is read only.
  • data[i].attributes.is_sensitive (boolean) – Whether the value of the i-th config is sensitive, in which case value will return null instead of the actual value.

Example request:

GET /v3/clusters/cluster-1/topics/topic-1/configs HTTP/1.1
Host: kafkaproxy.example.com
Accept: application/vnd.api+json

Example response:

HTTP/1.1 200 OK
Content-Type: application/vnd.api+json

{
  "links": {
    "self": "http://kafkaproxy.example.com/v3/clusters/cluster-1/topics/topic-1/configs"
  },
  "data": [
    {
      "links" {
        "self": "http://kafkaproxy.example.com/v3/clusters/cluster-1/topics/topic-1/configs/cleanup.policy"
      },
      "type": "KafkaTopicConfig",
      "id": "crn:///kafka=cluster-1/topic=topic-1/config=cleanup.policy",
      "attributes": {
        "cluster_id": "cluster-1",
        "topic_name": "topic-1",
        "name": "cleanup.policy",
        "value": "delete"
        "is_default": true,
        "is_read_only": false,
        "is_sensitive": false
      }
    },
    {
      "links" {
        "self": "http://kafkaproxy.example.com/v3/clusters/cluster-1/topics/topic-1/configs/compression.type"
      },
      "type": "KafkaTopic",
      "id": "crn:///kafka=cluster-1/topic=topic-1/config=compression.type",
      "attributes": {
        "cluster_id": "cluster-1",
        "topic_name": "topic-1",
        "name": "compression.type",
        "value": "producer"
        "is_default": true,
        "is_read_only": false,
        "is_sensitive": false
      }
    }
  ]
}
GET /v3/clusters/(string: cluster_id)/topics/(string: topic_name)/configs/(string: name)

Get a config.

Parameters:
  • cluster_id (string) – The cluster ID of the cluster the config belongs to.
  • topic_name (string) – The name of the topic the config belongs to.
  • name (string) –
Response JSON Object:
 
  • data.links.self (string) – An absolute link for this config.
  • data.type (string) – The Confluent resource type for this config.
  • data.id (string) – The Confluent resource name for this config.
  • data.attributes.cluster_id (string) – The cluster ID of the cluster this config belongs to.
  • data.attributes.topic_name (string) – The name of the topic this config belongs to.
  • data.attributes.name (string) – The name of this config.
  • data.attributes.value (string) – The value of this config.
  • data.attributes.is_default (boolean) – Whether the value of this config is the default value, or it has been set.
  • data.attributes.is_read_only (boolean) – Whether the value of this config is read only.
  • data.attributes.is_sensitive (boolean) – Whether the value of this config is sensitive, in which case value will return null instead of the actual value.

Example request:

GET /v3/clusters/cluster-1/topics/topic-1/configs/cleanup.policy HTTP/1.1
Host: kafkaproxy.example.com
Accept: application/vnd.api+json

Example response:

HTTP/1.1 200 OK
Content-Type: application/vnd.api+json

{
  "data": {
    "links" {
      "self": "http://kafkaproxy.example.com/v3/clusters/cluster-1/topics/topic-1/configs/cleanup.policy"
    },
    "type": "KafkaTopicConfig",
    "id": "crn:///kafka=cluster-1/topic=topic-1/config=cleanup.policy",
    "attributes": {
      "cluster_id": "cluster-1",
      "topic_name": "topic-1",
      "name": "cleanup.policy",
      "value": "delete"
      "is_default": true,
      "is_read_only": false,
      "is_sensitive": false
    }
  }
}
PUT /v3/clusters/(string: cluster_id)/topics/(string: topic_name)/configs/(string: name)

Set the value of a config.

Parameters:
  • cluster_id (string) – The cluster ID of the cluster the config belongs to.
  • topic_name (string) – The name of the topic the config belongs to.
  • name (string) – The name of the config.
Request JSON Object:
 
  • data.attributes.value (string) – The value of the config.

Example request:

PUT /v3/clusters/cluster-1/topics/topic-1/configs/cleanup.policy HTTP/1.1
Host: kafkaproxy.example.com
Content-Type: application/vnd.api+json

{
  "data": {
    "attributes": {
      "value": "compact"
    }
  }
}

Example response:

HTTP/1.1 204 No Content
DELETE /v3/clusters/(string: cluster_id)/topics/(string: topic_name)/configs/(string: name)

Reset a config to its default value.

Parameters:
  • cluster_id (string) – The cluster ID of the cluster the config belongs to.
  • topic_name (string) – The name of the topic the config belongs to.
  • name (string) – The name of the config.

Example request:

DELETE /v3/clusters/cluster-1/topics/topic-1/configs/cleanup.policy HTTP/1.1
Host: kafkaproxy.example.com

Example response:

HTTP/1.1 204 No Content

Partitions

GET /v3/clusters/(string: cluster_id)/topics/(string: topic_name)/partitions

List the partitions of a given topic.

Parameters:
  • cluster_id (string) – The cluster ID of the cluster the Partitions belong to.
  • topic_name (string) – The name of the topic the Partitions belong to.
Response JSON Object:
 
  • links.self (string) – An absolute link for this collection.
  • data[i].links.self (string) – An absolute link for the i-th Partition.
  • data[i].type (string) – The Confluent resource type for the i-th Partition.
  • data[i].id (string) – The Confluent resource name for the i-th Partition.
  • data[i].attributes.cluster_id (string) – The cluster ID of the cluster the i-th Partitions belongs to.
  • data[i].attributes.topic_name (string) – The name of the topic the i-th Partition belongs to.
  • data[i].attributes.partition_id (int) – The Partition ID of the i-th Partition.
  • data[i].relationships.leader.links.related (string) – An absolute link to the leader replica of the i-th Partition.
  • data[i].relationships.replicas.links.related (string) – An absolute link to the collection of replicas of the i-th Partition.

Example request:

GET /v3/clusters/cluster-1/topics/topic-1/partitions HTTP/1.1
Host: kafkaproxy.example.com
Accept: application/vnd.api+json

Example response:

HTTP/1.1 200 OK
Content-Type: application/vnd.api+json

{
  "links": {
    "self": "http://kafkaproxy.example.com/v3/clusters/cluster-1/topics/topic-1/partitions"
  },
  "data": [
    {
      "links" {
        "self": "http://kafkaproxy.example.com/v3/clusters/cluster-1/topics/topic-1/partitions/0"
      },
      "type": "KafkaPartition",
      "id": "crn:///kafka=cluster-1/topic=topic-1/partition=0",
      "attributes": {
        "cluster_id": "cluster-1",
        "topic_name": "topic-1",
        "partition_id": 0
      },
      "relationships": {
        "leader": {
          "links": {
            "related": "http://kafkaproxy.example.com/v3/clusters/cluster-1/topics/topic-1/partitions/0/replicas/1"
          }
        },
        "replicas": {
          "links": {
            "related": "http://kafkaproxy.example.com/v3/clusters/cluster-1/topics/topic-1/partitions/0/replicas"
          }
        }
      }
    },
    {
      "links" {
        "self": "http://kafkaproxy.example.com/v3/clusters/cluster-1/topics/topic-1/partitions/1"
      },
      "type": "KafkaPartition",
      "id": "crn:///kafka=cluster-1/topic=topic-1/partition=1",
      "attributes": {
        "cluster_id": "cluster-1",
        "topic_name": "topic-1",
        "partition_id": 1
      },
      "relationships": {
        "leader": {
          "links": {
            "related": "http://kafkaproxy.example.com/v3/clusters/cluster-1/topics/topic-1/partitions/1/replicas/2"
          }
        },
        "replicas": {
          "links": {
            "related": "http://kafkaproxy.example.com/v3/clusters/cluster-1/topics/topic-1/partitions/1/replicas"
          }
        }
      }
    }
  ]
}
GET /v3/clusters/(string: cluster_id)/topics/(string: topic_name)/partitions/(int: partition_id)

Get a Partition.

Parameters:
  • cluster_id (string) – The cluster ID of the cluster the Partition belongs to.
  • topic_name (string) – The name of the topic the Partition belongs to.
  • partition_id (int) – The Partition ID of the Partition.
Response JSON Object:
 
  • data.links.self (string) – An absolute link for this Partition.
  • data.type (string) – The Confluent resource type for this Partition.
  • data.id (string) – The Confluent resource name for this Partition.
  • data.attributes.cluster_id (string) – The cluster ID of the cluster this Partition belongs to.
  • data.attributes.topic_name (string) – The name of the topic this Partition belongs to.
  • data.attributes.partition_id (int) – The Partition ID of this Partition.
  • data.relationships.leader.links.related (string) – An absolute link to the leader replica of this Partition.
  • data.relationships.replicas.links.related (string) – An absolute link to the collection of replicas of this Partition.

Example request:

GET /v3/clusters/cluster-1/topics/topic-1/partitions/0 HTTP/1.1
Host: kafkaproxy.example.com
Accept: application/vnd.api+json

Example response:

HTTP/1.1 200 OK
Content-Type: application/vnd.api+json

{
  "data": {
    "links" {
      "self": "http://kafkaproxy.example.com/v3/clusters/cluster-1/topics/topic-1/partitions/0"
    },
    "type": "KafkaPartition",
    "id": "crn:///kafka=cluster-1/topic=topic-1/partition=0",
    "attributes": {
      "cluster_id": "cluster-1",
      "topic_name": "topic-1",
      "partition_id": 0
    },
    "relationships": {
      "leader": {
        "links": {
          "related": "http://kafkaproxy.example.com/v3/clusters/cluster-1/topics/topic-1/partitions/0/replicas/1"
        }
      },
      "replicas": {
        "links": {
          "related": "http://kafkaproxy.example.com/v3/clusters/cluster-1/topics/topic-1/partitions/0/replicas"
        }
      }
    }
  }
}

Replicas

GET /v3/clusters/(string: cluster_id)/topics/(string: topic_name)/partitions/(int: partition_id)/replicas

List the replicas of a given Partition.

Parameters:
  • cluster_id (string) – The cluster ID of the cluster the replicas belong to.
  • topic_name (string) – The name of the topic the replicas belong to.
  • partition_id (int) – The Partition ID of the Partition the replicas belong to.
Response JSON Object:
 
  • links.self (string) – An absolute link for this collection.
  • data[i].links.self (string) – An absolute link for the i-th replica.
  • data[i].type (string) – The Confluent resource type for the i-th replica.
  • data[i].id (string) – The Confluent resource name for the i-th replica.
  • data[i].attributes.cluster_id (string) – The cluster ID of the cluster the i-th replica belongs to.
  • data[i].attributes.topic_name (string) – The name of the topic the i-th replica belongs to.
  • data[i].attributes.partition_id (int) – The Partition ID of the Partition the i-th replica belong to.
  • data[i].attributes.broker_id (int) – The broker ID of the broker the i-th replica belongs to.
  • data[i].relationships.broker.links.related (string) – An absolute link to the broker the i-th replica belongs to.

Example request:

GET /v3/clusters/cluster-1/topics/topic-1/partitions/0/replicas HTTP/1.1
Host: kafkaproxy.example.com
Accept: application/vnd.api+json

Example response:

HTTP/1.1 200 OK
Content-Type: application/vnd.api+json

{
  "links": {
    "self": "http://kafkaproxy.example.com/v3/clusters/cluster-1/topics/topic-1/partitions/0/replicas"
  },
  "data": [
    {
      "links" {
        "self": "http://kafkaproxy.example.com/v3/clusters/cluster-1/topics/topic-1/partitions/0/replicas/1"
      },
      "type": "KafkaReplica",
      "id": "crn:///kafka=cluster-1/topic=topic-1/partition=0/replica=1",
      "attributes": {
        "cluster_id": "cluster-1",
        "topic_name": "topic-1",
        "partition_id": 0,
        "broker_id": 1,
        "is_leader": true,
        "is_in_sync": true
      },
      "relationships": {
        "broker": {
          "links": {
            "related": "http://kafkaproxy.example.com/v3/clusters/cluster-1/brokers/1"
          }
        }
      }
    },
    {
      "links" {
        "self": "http://kafkaproxy.example.com/v3/clusters/cluster-1/topics/topic-1/partitions/1/replicas/2"
      },
      "type": "KafkaReplica",
      "id": "crn:///kafka=cluster-1/topic=topic-1/partition=0/replica=2",
      "attributes": {
        "cluster_id": "cluster-1",
        "topic_name": "topic-1",
        "partition_id": 0,
        "broker_id": 2,
        "is_leader": false,
        "is_in_sync": true
      },
      "relationships": {
        "broker": {
          "links": {
            "related": "http://kafkaproxy.example.com/v3/clusters/cluster-1/brokers/2"
          }
        }
      }
    }
  ]
}
GET /v3/clusters/(string: cluster_id)/topics/(string: topic_name)/partitions/(int: partition_id)/replicas/(int: broker_id)

Get a replica.

Parameters:
  • cluster_id (string) – The cluster ID of the cluster the replica belongs to.
  • topic_name (string) – The name of the topic the replica belongs to.
  • partition_id (int) – The Partition ID of the Partition the replica belongs to.
  • broker_id (int) –
Response JSON Object:
 
  • data.links.self (string) – An absolute link for this replica.
  • data.type (string) – The Confluent resource type for this replica.
  • data.id (string) – The Confluent resource name for this replica.
  • data.attributes.cluster_id (string) – The cluster ID of the cluster this replica belongs to.
  • data.attributes.topic_name (string) – The name of the topic this replica belongs to.
  • data.attributes.partition_id (int) – The Partition ID of the Partition this replica belongs to.
  • data.attributes.broker_id (int) – The broker ID of the broker this replica belongs to.
  • data.relationships.broker.links.related (string) – An absolute link to the broker this replica belongs to.

Example request:

GET /v3/clusters/cluster-1/topics/topic-1/partitions/0/replicas/1 HTTP/1.1
Host: kafkaproxy.example.com
Accept: application/vnd.api+json

Example response:

HTTP/1.1 200 OK
Content-Type: application/vnd.api+json

{
  "data": {
    "links" {
      "self": "http://kafkaproxy.example.com/v3/clusters/cluster-1/topics/topic-1/partitions/0/replicas/1"
    },
    "type": "KafkaReplica",
    "id": "crn:///kafka=cluster-1/topic=topic-1/partition=0/replica=1",
    "attributes": {
      "cluster_id": "cluster-1",
      "topic_name": "topic-1",
      "partition_id": 0,
      "broker_id": 1,
      "is_leader": true,
      "is_in_sync": true
    },
    "relationships": {
      "broker": {
        "links": {
          "related": "http://kafkaproxy.example.com/v3/clusters/cluster-1/brokers/1"
        }
      }
    }
  }
}