Kafka REST APIs

The Kafka REST API (v3) is a part of the Confluent Cloud API, focused on managing the Confluent Cloud cluster, Cluster Linking configuration, and producing records to a Confluent Cloud topic.

The Kafka REST API is a set of cloud-native APIs that are compatible with the Kafka REST Proxy v3 APIs. The API endpoint is available by default for all Basic, Standard, Enterprise and Dedicated Kafka clusters and can be accessed via the Confluent Cloud Console. The cluster types documentation includes details on what’s supported for Kafka REST Produce in terms of limits per cluster for each type (Basic, Standard, Enterprise and Dedicated).

../_images/apis-cluster-settings.png

Or, on the Confluent CLI, use the following command to get a readout of cluster details:

confluent kafka cluster describe <cluster-ID>
+--------------+--------------------------------------------------------+
| Id           | lkc-vo9pz                                              |
| Name         | my-first-cluster                                       |
| Type         | STANDARD                                               |
| Ingress      |                                                    100 |
| Egress       |                                                    100 |
| Storage      | Infinite                                               |
| Provider     | gcp                                                    |
| Availability | single-zone                                            |
| Region       | us-west4                                               |
| Status       | UP                                                     |
| Endpoint     | SASL_SSL://pkc-abcde.us-west4.gcp.confluent.cloud:9092 |
| RestEndpoint | https://pkc-abcde.us-west4.gcp.confluent.cloud:443     |
+--------------+--------------------------------------------------------+

This endpoint can be used to perform operations against the APIs listed under the /kafka/v3/ group, as shown in the Cluster (v3) API reference.

Tip

  • Kafka REST endpoints are per-cluster, rather than single, control plane API endpoints.
  • You must have network access to the cluster as a prerequisite for using Kafka REST. For example; if you are using a laptop and your cluster is privately networked (not on the internet), you must configure a network path from your laptop to the cluster in order to use Kafka REST.

Using Base64 Encoded Data and Credentials

To communicate with the REST API you must send your Confluent Cloud API key and API secret as BINARY, base64-encoded data. (You also need write permissions to the target Kafka topic.)

This overview provides use cases and examples which require you to know how to create and use these credentials. The Kafka REST API Quick Start for Confluent Cloud Developers provides detailed information on how to do this.

Specifically, see Step 2: Create credentials to access the Kafka cluster resources for a how to create credentials, base64 encode them, and use them in your API calls.

You would base64 data similarly. As a simple example, to base64 encode the word “bonjour”, use the following command on Mac OS.:

echo -n "bonjour" | base64

Once you have the base64 encoding of this word, you would use that string as data in your API call, in place of word “bonjour”.

Use cases

The Kafka REST APIs allow you to fully manage your Confluent Cloud cluster as well as produce records to your Confluent Cloud. This enables you to use Confluent as your central nervous system by enabling the following use cases.

  • Supercharge your business workflows – Automatically create topics, grant privileges to existing ones to onboard new customers or data partners.
  • Integrate with 3rd party data partners, providers and solutions – Allow your customers to directly post records to specific topics on your Confluent Cloud cluster, or build cluster links between their Confluent Cloud clusters and yours, all without having to learn and use the Kafka protocol.
  • Integrate with Serverless solutions in a cloud of your choice – Use the simple Kafka REST API to build workflows with bleeding edge, serverless solutions of your choice.
  • Scale beyond the connection limits for Kafka – If you have a large number of clients with very low throughput requirements, maintaining a Kafka protocol connection has additional overhead. The request-response semantics of Kafka REST APIs mean that you can scale your clients independently of Kafka connection limits.

Cluster Admin API

The Confluent Cloud REST APIs support both cluster administration operations as well as producing records (or events) directly to a topic.

The KAFKA API (v3) provides an Admin API to enable you to build workflows to better manage your Confluent Cluster. This includes:

  • Listing the cluster information
  • Listing, creating and deleting topics on your Confluent Cloud cluster
  • Listing and modifying the cluster and Topic configuration
  • Listing, creating and deleting ACLs on your Confluent Cloud cluster. For additional details, see Principal IDs for ACLs.
  • Listing information on consumer groups, consumers, and consumer lag (for dedicated cluster only)
  • Listing partitions for a given topic
  • Managing your Cluster Linking configuration

For detailed reference documentation and examples, see Cluster (v3).

Principal IDs for ACLs

Confluent Cloud currently supports two REST API principal formats for ACLs: Resource ID and Integer ID. For example, a principal using an Integer ID looks like this: User:1234. A principal using a Resource ID looks like this: User:sa-1234, where sa stands for service account. The preferred principal format is Resource ID. Note that the principal format Integer ID will be deprecated and is not recommended when creating principals for ACLs.

For additional information, see ACL operation details.

Produce API

The Records API is listed under the KAFKA API (v3) APIs in the reference guide, but is covered here separately, as it is not truly applicable to admin functions.

Producing records to a Kafka topic involves writing and configuring a KafkaProducer within a client application that utilizes the Kafka binary protocol. For most workloads, this introduces additional complexity in getting data into the Kafka topic.

The REST API makes producing records to a Kafka topic as simple as making an HTTP POST request, without worrying about the details of the Kafka protocol.

There are two ways to use the API to produce records to Confluent Cloud, as covered in the sections below: streaming mode (recommended) and non-streaming mode.

Producing a single record to a topic

Individual calls are made to the REST Produce endpoint (/kafka/v3/clusters/<cluster-id>/topics/<topic-name>/records) as shown in the following example:

curl -X POST -H "Content-Type: application/json"  -H \
"Authorization: Basic <BASE64-encoded-key-and-secret>” \
"https://<REST-endpoint>/kafka/v3/clusters/<cluster-id>/topics/<topic-name>/records" -d \
'{"value": {"type": "BINARY", "data": "<base64-encoded-data>"}}'

For this example, the endpoint returns the delivery report, complete with metadata about the accepted record.:

{
  "error_code": 200,
  "cluster_id": "lkc-vo0pz",
  "topic_name": "testTopic1",
  "partition_id": 0,
  "offset": 67217,
  "timestamp": "2021-12-13T16:29:10.951Z",
  "value": {
    "type": "BINARY",
    "size": 6
  }
}

Producing a batch of records to a topic

Non-streaming mode also permits sending multiple records in a single request. Multiple records can be concatenated and placed in the data payload of a single, non-streamed request as shown in the following example.:

curl -X POST -H "Content-Type: application/json"  -H  \
"Authorization: Basic <BASE64-encoded-key-and-secret>” \
"https://<REST-endpoint>/kafka/v3/clusters/<cluster-id>/topics/<topic-name>/records" -d \
'{"value": {"type": "BINARY", "data": "<base64-encoded-data>"}} {"value": {"type": "BINARY", "data": "<base64-encoded-data>"}}'

The endpoint returns the delivery report for each of the accepted records, as shown below.:

{
  "error_code": 200,
  "cluster_id": lkc-vo0pz,
  "topic_name": "testTopic1",
  "partition_id": 0,
  "offset": 67217,
  "timestamp": "2021-12-13T16:29:10.951Z",
  "key": {
    "type": "BINARY",
    "size": 10
  },
  "value": {
    "type": "JSON",
    "size": 26
  }
}
{
  "error_code": 200,
  "cluster_id": lkc-vo0pz,
  "topic_name": "testTopic1",
  "partition_id": 0,
  "offset": 67218,
  "timestamp": "2021-12-13T16:29:10.951Z",
  "key": {
    "type": "BINARY",
    "size": 10
  },
  "value": {
    "type": "JSON",
    "size": 26
  }
}

Note

The batch of records is not an array of records, but rather a concatenated stream of records. Sending an array of records is not supported.

Data payload specification

The OpenAPI specification of the request body describes the parameters that can be passed in the request payload:

ProduceRequest:
    type: object
    properties:
      partition_id:
        type: integer
        nullable: true
        format: int32
      headers:
        type: array
        items:
          $ref: '#/components/schemas/ProduceRequestHeader'
      key:
        $ref: '#/components/schemas/ProduceRequestData'
      value:
        $ref: '#/components/schemas/ProduceRequestData'
      timestamp:
        type: string
        format: date-time
        nullable: true

ProduceRequestHeader:
      type: object
      required:
        - name
      properties:
        name:
          type: string
        value:
          type: string
          format: byte
          nullable: true

ProduceRequestData:
      type: object
      properties:
        type:
          type: string
          x-extensible-enum:
            - BINARY
            - JSON
            - STRING
        data:
          $ref: '#/components/schemas/AnyValue'
      nullable: true

AnyValue:
      nullable: true

All the fields, including data, are optional. An example payload using all the options is shown below:

{
  "partition_id": 0,
  "headers": [
    {
      "Header-1": "SGVhZGVyLTE="
    }
  ],
  "key": {
    "type": "BINARY",
    "data": "SGVsbG8ga2V5Cg=="
  },
  "value": {
    "type": "JSON",
    "data": "{\"Hello \" : \"World\"}"
  },
  "timestamp": "2021-12-13T16:29:10.951Z"
}

The key and data fields can be of type BINARY, JSON or STRING. For binary data, the data must be base64 encoded. The headers must be base64 encoded.

An example JSON payload is shown here:

'{"value": {"type": "JSON", "data": "{\"Hello \" : \"World\"}"}}'

An example STRING payload is shown here:

'{"value": {"type": "STRING", "data": "Hello World"}}'

Connection bias and request limits

REST API connection request limits are set for each cluster type available in Confluent Cloud. For example, a Dedicated cluster has a fixed limit of 300 REST API connection requests per second. If you have eight Confluent Kafka Units (CKUs), the request limit is 2400 requests per second.

The distribution of API connection requests for REST instances is balanced across clusters. However, a long-lived connection may result in a bias for API requests to continue through this connection. The connection bias may result in a returned 429 status code (too many requests), even though the limit has not been exceeded.

The best practice for avoiding connection bias is to close connections. To do this, add a Connection: close header on requests. For example:

Request.Post(restUrl).connectTimeout(connectTimeout).socketTimeout(socketTimeout)
                   .addHeader("Content-Type", CONTENT_TYPE)
                   .addHeader("Accept", CONTENT_TYPE)
                   .addHeader("Authorization", "Basic " + this.basicAuth)
                   .addHeader("Connection", close)