重要

このページの日本語コンテンツは古くなっている可能性があります。最新の英語版コンテンツをご覧になるには、こちらをクリックしてください。

Kafka REST API Cluster Admin for Kafka (v3)

The Kafka REST API (v3) exists within the Confluent Cloud API family, focused on managing the Confluent Cloud cluster, Cluster Linking configuration, and producing records to a Confluent Cloud topic.

The Kafka REST API is a set of cloud-native APIs that are compatible with the Kafka REST Proxy v3 APIs. The API endpoint is available by default for all Basic, Standard, and Dedicated clusters and can be accessed via the Confluent Cloud Console.

../../_images/apis-cluster-settings.png

Or, on the Confluent CLI, use the following command to get a readout of cluster details:

confluent kafka cluster describe <cluster-ID>
Copy
+--------------+--------------------------------------------------------+
| Id           | lkc-vo9pz                                              |
| Name         | my-first-cluster                                       |
| Type         | STANDARD                                               |
| Ingress      |                                                    100 |
| Egress       |                                                    100 |
| Storage      | Infinite                                               |
| Provider     | gcp                                                    |
| Availability | single-zone                                            |
| Region       | us-west4                                               |
| Status       | UP                                                     |
| Endpoint     | SASL_SSL://pkc-abcde.us-west4.gcp.confluent.cloud:9092 |
| RestEndpoint | https://pkc-abcde.us-west4.gcp.confluent.cloud:443     |
+--------------+--------------------------------------------------------+
Copy

This endpoint can be used to perform operations against the APIs listed under the /kafka/v3/ group, as shown in the Cluster (v3) API reference.

ちなみに

  • Kafka REST endpoints are per-cluster, rather than single, control plane API endpoints.
  • You must have network access to the cluster as a prerequisite for using Kafka REST. For example; if you are using a laptop and your cluster is privately networked (not on the internet), you must configure a network path from your laptop to the cluster in order to use Kafka REST.

Using Base64 Encoded Data and Credentials

To communicate with the REST API you must send your Confluent Cloud API key and API secret as BINARY, base64-encoded data. (You also need write permissions to the target Kafka topic.)

This overview provides use cases and examples which require you to know how to create and use these credentials. The REST API Quick Start for Confluent Cloud Developers provides detailed information on how to do this.

Specifically, see Step 2: Create credentials to access the Kafka cluster resources for a how to create credentials, base64 encode them, and use them in your API calls.

You would base64 data similarly. As a simple example, to base64 encode the word "bonjour", use the following command on Mac OS.:

echo -n "bonjour" | base64
Copy

Once you have the base64 encoding of this word, you would use that string as data in your API call, in place of word "bonjour".

Use cases

The Kafka REST APIs allow you to fully manage your Confluent Cloud cluster as well as produce records to your Confluent Cloud. This enables you to use Confluent as your central nervous system by enabling the following use cases.

  • Supercharge your business workflows – Automatically create topics, grant privileges to existing ones to onboard new customers or data partners.
  • Integrate with 3rd party data partners, providers and solutions – Allow your customers to directly post records to specific topics on your Confluent Cloud cluster, or build cluster links between their Confluent Cloud clusters and yours, all without having to learn and use the Kafka protocol.
  • Integrate with Serverless solutions in a cloud of your choice – Use the simple Kafka REST API to build workflows with bleeding edge, serverless solutions of your choice.
  • Scale beyond the connection limits for Kafka – If you have a large number of clients with very low throughput requirements, maintaining a Kafka protocol connection has additional overhead. The request-response semantics of Kafka REST APIs mean that you can scale your clients independently of Kafka connection limits.

Supported Operations

The Confluent Cloud REST APIs support both cluster administration operations as well as producing records (or events) directly to a topic.

Cluster Administration

The Kafka REST API (Cluster Admin for |ak| (v3) provides an Admin API to enable you to build workflows to better manage your Confluent Cluster. This includes:

  • Listing the cluster information
  • Listing, creating and deleting topics on your Confluent Cloud cluster
  • Listing and modifying the cluster and Topic configuration
  • Listing, creating and deleting ACLs on your Confluent Cloud cluster
  • Listing information on consumer groups, consumers, and consumer lag (for dedicated cluster only)
  • Listing partitions for a given topic
  • Managing your Cluster Linking configuration

The detailed reference documentation and examples for this are listed here https://docs.confluent.io/cloud/current/api.html#tag/Cluster-(v3)

Producing to a Kafka Topic

重要

Producing to a topic with the v3 API is currently available as an "Early Access" feature. Early access features are available to a limited set of customers for evaluation and non-production testing purposes, or to provide feedback to Confluent. For more examples, see Step 7: Produce data to the topic.

The Records API is currently listed under the Cluster Admin for Kafka (v3) APIs in the reference guide, but is covered here separately, as it is not truly applicable to admin functions.

Producing records to a Kafka topic involves writing and configuring a KafkaProducer within a client application that utilizes the Kafka binary protocol. For most workloads, this introduces additional complexity in getting data into the Kafka topic.

The REST API makes producing records to a Kafka topic as simple as making an HTTP POST request, without worrying about the details of the Kafka protocol.

There are two ways to use the API to produce records to Confluent Cloud, as covered in the sections below: streaming mode (recommended) and non-streaming mode.

Producing a single record to a topic

Individual calls are made to the REST Produce endpoint (/kafka/v3/clusters/<cluster-id>/topics/<topic-name>/records) as shown in the following example:

curl -X POST -H "Content-Type: application/json"  -H \
"Authorization: Basic <BASE64-encoded-key-and-secret>” \
"https://<REST-endpoint>/kafka/v3/clusters/<cluster-id>/topics/<topic-name>/records" -d \
'{"value": {"type": "BINARY", "data": "<base64-encoded-data>"}}'
Copy

For this example, the endpoint returns the delivery report, complete with metadata about the accepted record.:

{
  "cluster_id": "lkc-vo0pz",
  "topic_name": "testTopic1",
  "partition_id": 0,
  "offset": 67217,
  "timestamp": "2021-12-13T16:29:10.951Z",
  "value": {
    "type": "BINARY",
    "size": 6
  }
}
Copy

Producing a batch of records to a topic

Non-streaming mode also permits sending multiple records in a single request. Multiple records can be concatenated and placed in the data payload of a single, non-streamed request as shown in the following example.:

curl -X POST -H "Content-Type: application/json"  -H  \
"Authorization: Basic <BASE64-encoded-key-and-secret>” \
"https://<REST-endpoint>/kafka/v3/clusters/<cluster-id>/topics/<topic-name>/records" -d \
'{"value": {"type": "BINARY", "data": "<base64-encoded-data>"}} {"value": {"type": "BINARY", "data": "<base64-encoded-data>"}}'
Copy

The endpoint returns the delivery report for each of the accepted records, as shown below.:

{
  "cluster_id": lkc-vo0pz,
  "topic_name": "testTopic1",
  "partition_id": 0,
  "offset": 67217,
  "timestamp": "2021-12-13T16:29:10.951Z",
  "key": {
    "type": "BINARY",
    "size": 10
  },
  "value": {
    "type": "JSON",
    "size": 26
  }
}
{
  "cluster_id": lkc-vo0pz,
  "topic_name": "testTopic1",
  "partition_id": 0,
  "offset": 67218,
  "timestamp": "2021-12-13T16:29:10.951Z",
  "key": {
    "type": "BINARY",
    "size": 10
  },
  "value": {
    "type": "JSON",
    "size": 26
  }
}
Copy

注釈

The batch of records is not an array of records, but rather a concatenated stream of records. Sending an array of records is not supported.

Data payload specification

The OpenAPI specification of the request body describes the parameters that can be passed in the request payload:

ProduceRequest:
    type: object
    properties:
      partition_id:
        type: integer
        nullable: true
      headers:
        type: array
        items:
          $ref: '#/components/schemas/ProduceRequestHeader'
      key:
        $ref: '#/components/schemas/ProduceRequestData'
      value:
        $ref: '#/components/schemas/ProduceRequestData'
      timestamp:
        type: string
        format: date-time
        nullable: true

ProduceRequestHeader:
      type: object
      required:
        - name
      properties:
        name:
          type: string
        value:
          type: string
          format: byte
          nullable: true

ProduceRequestData:
      type: object
      properties:
        type:
          type: string
          x-extensible-enum:
            - BINARY
            - JSON
        data:
          $ref: '#/components/schemas/AnyValue'
      nullable: true

AnyValue:
      nullable: true
Copy

All the fields, including data, are optional. An example payload using all the options is shown below:

{
  "partition_id": 0,
  "headers": [
    {
      "name": "Hello header"
    }
  ],
  "key": {
    "type": "BINARY",
    "data": "SGVsbG8ga2V5Cg=="
  },
  "value": {
    "type": "JSON",
    "data": "{\"Hello \" : \"World\"}"
  },
  "timestamp": "2021-12-13T16:29:10.951Z"
}
Copy

The key and data fields can be of type BINARY or JSON. For binary data, the data must be base64 encoded.

An example JSON payload is shown here:

'{"value": {"type": "JSON", "data": "{\"Hello \" : \"World\"}"}}'
Copy