重要
このページの日本語コンテンツは古くなっている可能性があります。最新の英語版コンテンツをご覧になるには、こちらをクリックしてください。
Kafka REST API Cluster Admin for Kafka (v3)¶
The Kafka REST API (v3) exists within the Confluent Cloud API family, focused on managing the Confluent Cloud cluster, Cluster Linking configuration, and producing records to a Confluent Cloud topic.
The Kafka REST API is a set of cloud-native APIs that are compatible with the Kafka REST Proxy v3 APIs. The API endpoint is available by default for all Basic, Standard, and Dedicated clusters and can be accessed via the Confluent Cloud Console.
Or, on the Confluent CLI, use the following command to get a readout of cluster details:
confluent kafka cluster describe <cluster-ID>
+--------------+--------------------------------------------------------+
| Id | lkc-vo9pz |
| Name | my-first-cluster |
| Type | STANDARD |
| Ingress | 100 |
| Egress | 100 |
| Storage | Infinite |
| Provider | gcp |
| Availability | single-zone |
| Region | us-west4 |
| Status | UP |
| Endpoint | SASL_SSL://pkc-abcde.us-west4.gcp.confluent.cloud:9092 |
| RestEndpoint | https://pkc-abcde.us-west4.gcp.confluent.cloud:443 |
+--------------+--------------------------------------------------------+
This endpoint can be used to perform operations against the APIs listed under
the /kafka/v3/
group, as shown in the Cluster (v3) API reference.
ちなみに
- Kafka REST endpoints are per-cluster, rather than single, control plane API endpoints.
- You must have network access to the cluster as a prerequisite for using Kafka REST. For example; if you are using a laptop and your cluster is privately networked (not on the internet), you must configure a network path from your laptop to the cluster in order to use Kafka REST.
Using Base64 Encoded Data and Credentials¶
To communicate with the REST API you must send your Confluent Cloud API key and API secret as BINARY, base64-encoded data. (You also need write permissions to the target Kafka topic.)
This overview provides use cases and examples which require you to know how to create and use these credentials. The REST API Quick Start for Confluent Cloud Developers provides detailed information on how to do this.
Specifically, see Step 2: Create credentials to access the Kafka cluster resources for a how to create credentials, base64 encode them, and use them in your API calls.
You would base64 data similarly. As a simple example, to base64 encode the word "bonjour", use the following command on Mac OS.:
echo -n "bonjour" | base64
Once you have the base64 encoding of this word, you would use that string as data in your API call, in place of word "bonjour".
Use cases¶
The Kafka REST APIs allow you to fully manage your Confluent Cloud cluster as well as produce records to your Confluent Cloud. This enables you to use Confluent as your central nervous system by enabling the following use cases.
- Supercharge your business workflows – Automatically create topics, grant privileges to existing ones to onboard new customers or data partners.
- Integrate with 3rd party data partners, providers and solutions – Allow your customers to directly post records to specific topics on your Confluent Cloud cluster, or build cluster links between their Confluent Cloud clusters and yours, all without having to learn and use the Kafka protocol.
- Integrate with Serverless solutions in a cloud of your choice – Use the simple Kafka REST API to build workflows with bleeding edge, serverless solutions of your choice.
- Scale beyond the connection limits for Kafka – If you have a large number of clients with very low throughput requirements, maintaining a Kafka protocol connection has additional overhead. The request-response semantics of Kafka REST APIs mean that you can scale your clients independently of Kafka connection limits.
Supported Operations¶
The Confluent Cloud REST APIs support both cluster administration operations as well as producing records (or events) directly to a topic.
Cluster Administration¶
The Kafka REST API (Cluster Admin for |ak| (v3) provides an Admin API to enable you to build workflows to better manage your Confluent Cluster. This includes:
- Listing the cluster information
- Listing, creating and deleting topics on your Confluent Cloud cluster
- Listing and modifying the cluster and Topic configuration
- Listing, creating and deleting ACLs on your Confluent Cloud cluster
- Listing information on consumer groups, consumers, and consumer lag (for dedicated cluster only)
- Listing partitions for a given topic
- Managing your Cluster Linking configuration
The detailed reference documentation and examples for this are listed here https://docs.confluent.io/cloud/current/api.html#tag/Cluster-(v3)
Producing to a Kafka Topic¶
重要
Producing to a topic with the v3 API is currently available as an "Early Access" feature. Early access features are available to a limited set of customers for evaluation and non-production testing purposes, or to provide feedback to Confluent. For more examples, see Step 7: Produce data to the topic.
The Records API is currently listed under the Cluster Admin for Kafka (v3) APIs in the reference guide, but is covered here separately, as it is not truly applicable to admin functions.
Producing records to a Kafka topic involves writing and configuring a
KafkaProducer
within a client application that utilizes the Kafka binary
protocol. For most workloads, this introduces additional complexity in getting
data into the Kafka topic.
The REST API makes producing records to a Kafka topic as simple as making an HTTP POST request, without worrying about the details of the Kafka protocol.
There are two ways to use the API to produce records to Confluent Cloud, as covered in the sections below: streaming mode (recommended) and non-streaming mode.
Streaming mode (recommended)¶
Streaming mode is the more efficient way to send multiple records. The performance difference is under a hundred requests per second for individual calls, as compared to several 1000 per second for streaming mode.
Streaming mode supports sending multiple records over a single stream. This
can be achieved by setting an additional header "Transfer-Encoding: chunked”
on
the initial request as shown in the following example.:
curl -X POST -H "Transfer-Encoding: chunked" -H "Content-Type: application/json” -H \
"Authorization: Basic <BASE64-encoded-key-and-secret>" https://<REST-endpoint>/kafka/v3/clusters/<cluster-id>/topics/<topic-name>/records -T-
Once the stream has been established, you can start sending multiple records on the same stream, each on a different line, as shown in the following example.:
{"value": {"type": "JSON", "data": "{\"Hello \" : \"World\"}"}}
{"value": {"type": "JSON", "data": "{\"Hi \" : \"World\"}"}}
{"value": {"type": "JSON", "data": "{\"Hey \" : \"World\"}"}}
ちなみに
As long as the connection can be established in streaming mode; the HTTP status code is 200, which indicates success. The individual records have their own responses, each containing their own response code.
Non-streaming mode (not recommended)¶
Non-streaming mode to produce records to follows the more traditional, request-response model, where you can make requests to send one (or more) record/s at a time.
Performance of non-streaming mode is less than optimal, as compared to Streaming mode (recommended).
Producing a single record to a topic¶
Individual calls are made to the REST Produce endpoint (/kafka/v3/clusters/<cluster-id>/topics/<topic-name>/records) as shown in the following example:
curl -X POST -H "Content-Type: application/json" -H \
"Authorization: Basic <BASE64-encoded-key-and-secret>” \
"https://<REST-endpoint>/kafka/v3/clusters/<cluster-id>/topics/<topic-name>/records" -d \
'{"value": {"type": "BINARY", "data": "<base64-encoded-data>"}}'
For this example, the endpoint returns the delivery report, complete with metadata about the accepted record.:
{
"cluster_id": "lkc-vo0pz",
"topic_name": "testTopic1",
"partition_id": 0,
"offset": 67217,
"timestamp": "2021-12-13T16:29:10.951Z",
"value": {
"type": "BINARY",
"size": 6
}
}
Producing a batch of records to a topic¶
Non-streaming mode also permits sending multiple records in a single request. Multiple records can be concatenated and placed in the data payload of a single, non-streamed request as shown in the following example.:
curl -X POST -H "Content-Type: application/json" -H \
"Authorization: Basic <BASE64-encoded-key-and-secret>” \
"https://<REST-endpoint>/kafka/v3/clusters/<cluster-id>/topics/<topic-name>/records" -d \
'{"value": {"type": "BINARY", "data": "<base64-encoded-data>"}} {"value": {"type": "BINARY", "data": "<base64-encoded-data>"}}'
The endpoint returns the delivery report for each of the accepted records, as shown below.:
{
"cluster_id": lkc-vo0pz,
"topic_name": "testTopic1",
"partition_id": 0,
"offset": 67217,
"timestamp": "2021-12-13T16:29:10.951Z",
"key": {
"type": "BINARY",
"size": 10
},
"value": {
"type": "JSON",
"size": 26
}
}
{
"cluster_id": lkc-vo0pz,
"topic_name": "testTopic1",
"partition_id": 0,
"offset": 67218,
"timestamp": "2021-12-13T16:29:10.951Z",
"key": {
"type": "BINARY",
"size": 10
},
"value": {
"type": "JSON",
"size": 26
}
}
注釈
The batch of records is not an array of records, but rather a concatenated stream of records. Sending an array of records is not supported.
Data payload specification¶
The OpenAPI specification of the request body describes the parameters that can be passed in the request payload:
ProduceRequest:
type: object
properties:
partition_id:
type: integer
nullable: true
headers:
type: array
items:
$ref: '#/components/schemas/ProduceRequestHeader'
key:
$ref: '#/components/schemas/ProduceRequestData'
value:
$ref: '#/components/schemas/ProduceRequestData'
timestamp:
type: string
format: date-time
nullable: true
ProduceRequestHeader:
type: object
required:
- name
properties:
name:
type: string
value:
type: string
format: byte
nullable: true
ProduceRequestData:
type: object
properties:
type:
type: string
x-extensible-enum:
- BINARY
- JSON
data:
$ref: '#/components/schemas/AnyValue'
nullable: true
AnyValue:
nullable: true
All the fields, including data, are optional. An example payload using all the options is shown below:
{
"partition_id": 0,
"headers": [
{
"name": "Hello header"
}
],
"key": {
"type": "BINARY",
"data": "SGVsbG8ga2V5Cg=="
},
"value": {
"type": "JSON",
"data": "{\"Hello \" : \"World\"}"
},
"timestamp": "2021-12-13T16:29:10.951Z"
}
The key and data fields can be of type BINARY or JSON. For binary data, the data must be base64 encoded.
An example JSON payload is shown here:
'{"value": {"type": "JSON", "data": "{\"Hello \" : \"World\"}"}}'