Produce Records
POST/kafka/v3/clusters/:cluster_id/topics/:topic_name/records
Produce records to the given topic, returning delivery reports for each record produced. This API can be used in streaming mode by setting "Transfer-Encoding: chunked" header. For as long as the connection is kept open, the server will keep accepting records. Records are streamed to and from the server as Concatenated JSON. For each record sent to the server, the server will asynchronously send back a delivery report, in the same order, each with its own error_code. An error_code of 200 indicates success. The HTTP status code will be HTTP 200 OK as long as the connection is successfully established. To identify records that have encountered an error, check the error_code of each delivery report.
Note that the cluster_id is validated only when running in Confluent Cloud.
This API currently does not support Schema Registry integration. Sending schemas is not supported. Only BINARY, JSON, and STRING formats are supported.
Request
Responses
- 200
- 400
- 401
- 403
- 404
- 413
- 415
- 422
- 429
- 5XX
The response containing a delivery report for a record produced to a topic. In streaming mode, for each record sent, a separate delivery report will be returned, in the same order, each with its own error_code.
Indicates a bad request error. It could be caused by an unexpected request body format or other forms of request validation failure.
Indicates a client authentication error. Kafka authentication failures will contain error code 40101 in the response body.
Indicates a client authorization error. Kafka authorization failures will contain error code 40301 in the response body.
Indicates attempted access to an unreachable or non-existing resource like e.g. an unknown topic or partition. GET requests to endpoints not allowed in the accesslists will also result in this response.
This implies the client is sending a request payload that is larger than the maximum message size the server can accept.
This implies the client is sending the request payload format in an unsupported format.
Indicates a bad request error. It could be caused by an unexpected request body format or other forms of request validation failure.
Indicates that a rate limit threshold has been reached, and the client should retry again later.
A server-side problem that might not be addressable from the client side. Retriable Kafka errors will contain error code 50003 in the response body.
OpenAPI definition (YAML)
paths:
/kafka/v3/clusters/{cluster_id}/topics/{topic_name}/records:
post:
operationId: produceRecord
description: '[](#section/Versioning/API-Lifecycle-Policy)
Produce records to the given topic, returning delivery reports for each
record produced. This API can be used in streaming mode by setting
"Transfer-Encoding: chunked" header. For as long as the connection is
kept open, the server will keep accepting records. Records are streamed
to and from the server as Concatenated JSON. For each record sent to the
server, the server will asynchronously send back a delivery report, in
the same order, each with its own error_code. An error_code of 200
indicates success. The HTTP status code will be HTTP 200 OK as long as
the connection is successfully established. To identify records that
have encountered an error, check the error_code of each delivery report.
Note that the cluster_id is validated only when running in Confluent Cloud.
This API currently does not support Schema Registry integration. Sending
schemas is not supported. Only BINARY, JSON, and STRING formats are
supported.'
tags:
- Records (v3)
security:
- resource-api-key: []
- external-access-token: []
requestBody:
description: A single record to be produced to Kafka. To produce multiple records in the same
request, simply concatenate the records. The delivery reports are concatenated in the same order
as the records are sent.
content:
application/json:
schema:
type: object
properties:
partition_id:
type: integer
nullable: true
format: int32
headers:
type: array
items:
type: object
required:
- name
properties:
name:
type: string
value:
type: string
format: byte
nullable: true
title: ProduceRequestHeader
key:
type: object
properties:
type:
type: string
enum:
- BINARY
- JSON
- STRING
data:
nullable: true
title: AnyValue
nullable: true
title: ProduceRequestData
value:
type: object
properties:
type:
type: string
enum:
- BINARY
- JSON
- STRING
data:
nullable: true
title: AnyValue
nullable: true
title: ProduceRequestData
timestamp:
type: string
format: date-time
nullable: true
title: ProduceRequest
examples:
binary_and_json:
description: If using type, one of "BINARY", "JSON" or "STRING" is required.
value:
partition_id: 1
headers:
- name: Header-1
value: SGVhZGVyLTE=
- name: Header-2
value: SGVhZGVyLTI=
key:
type: BINARY
data: Zm9vYmFy
value:
type: JSON
data:
foo: bar
timestamp: '2021-02-05T19:14:42Z'
string:
description: If using type, one of "BINARY", "JSON" or "STRING" is required.
value:
value:
type: STRING
data: My message
empty_value:
description: key or value can be omitted entirely.
value:
key:
data: 1000
responses:
'200':
description: 'The response containing a delivery report for a record produced to a topic. In
streaming mode,
for each record sent, a separate delivery report will be returned, in the same order,
each with its own error_code.'
content:
application/json:
schema:
type: object
required:
- error_code
properties:
error_code:
type: integer
format: int32
message:
type: string
cluster_id:
type: string
topic_name:
type: string
partition_id:
type: integer
format: int32
offset:
type: integer
format: int64
timestamp:
type: string
format: date-time
nullable: true
key:
type: object
required:
- size
- type
properties:
size:
type: integer
type:
type: string
enum:
- BINARY
- JSON
- STRING
nullable: true
title: ProduceResponseData
value:
type: object
required:
- size
- type
properties:
size:
type: integer
type:
type: string
enum:
- BINARY
- JSON
- STRING
nullable: true
title: ProduceResponseData
title: ProduceResponse
examples:
produce_record_success:
description: The record was successfully produced to the topic.
value:
error_code: 200
cluster_id: cluster-1
topic_name: topic-1
partition_id: 1
offset: 0
timestamp: '2021-02-05T19:14:42Z'
key:
type: BINARY
size: 7
value:
type: JSON
size: 15
produce_record_bad_binary_data:
description: Thrown when sending a BINARY value which is not a base64-encoded string.
value:
error_code: 400
message: 'Bad Request: data=1 is not a base64 string.'
'400':
description: Indicates a bad request error. It could be caused by an unexpected request body
format or other forms of request validation failure.
content:
application/json:
schema:
type: object
description: Describes a particular error encountered while performing an operation.
properties:
id:
description: A unique identifier for this particular occurrence of the problem.
type: string
maxLength: 255
status:
description: The HTTP status code applicable to this problem, expressed as a string
value.
type: string
code:
description: An application-specific error code, expressed as a string value.
type: string
title:
description: A short, human-readable summary of the problem. It **SHOULD NOT** change
from occurrence to occurrence of the problem, except for purposes of localization.
type: string
detail:
description: A human-readable explanation specific to this occurrence of the problem.
type: string
source:
type: object
description: If this error was caused by a particular part of the API request, the
source will point to the query string parameter or request body property that caused
it.
properties:
pointer:
description: A JSON Pointer [RFC6901] to the associated entity in the request
document [e.g. "/spec" for a spec object, or "/spec/title" for a specific field].
type: string
parameter:
description: A string indicating which query parameter caused the error.
type: string
error_code:
type: integer
format: int32
message:
type: string
nullable: true
additionalProperties: false
title: Error
examples:
header_not_base64_encoded:
description: Thrown when headers in the produce-record are not base64 encoded.
value:
error_code: 400
message: 'Cannot deserialize value of type `byte[]` from String "": Unexpected end
of base64-encoded String: base64 variant ''MIME-NO-LINEFEEDS'' expects padding (one
or more ''='' characters) at the end. This Base64Variant might have been incorrectly
configured'
'401':
description: Indicates a client authentication error. Kafka authentication failures will contain
error code 40101 in the response body.
content:
application/json:
schema:
type: object
description: Describes a particular error encountered while performing an operation.
properties:
id:
description: A unique identifier for this particular occurrence of the problem.
type: string
maxLength: 255
status:
description: The HTTP status code applicable to this problem, expressed as a string
value.
type: string
code:
description: An application-specific error code, expressed as a string value.
type: string
title:
description: A short, human-readable summary of the problem. It **SHOULD NOT** change
from occurrence to occurrence of the problem, except for purposes of localization.
type: string
detail:
description: A human-readable explanation specific to this occurrence of the problem.
type: string
source:
type: object
description: If this error was caused by a particular part of the API request, the
source will point to the query string parameter or request body property that caused
it.
properties:
pointer:
description: A JSON Pointer [RFC6901] to the associated entity in the request
document [e.g. "/spec" for a spec object, or "/spec/title" for a specific field].
type: string
parameter:
description: A string indicating which query parameter caused the error.
type: string
error_code:
type: integer
format: int32
message:
type: string
nullable: true
additionalProperties: false
title: Error
examples:
kafka_authentication_failed:
description: Thrown when using Basic authentication with wrong Kafka credentials.
value:
error_code: 40101
message: Authentication failed
'403':
description: Indicates a client authorization error. Kafka authorization failures will contain
error code 40301 in the response body.
content:
application/json:
schema:
type: object
description: Describes a particular error encountered while performing an operation.
properties:
id:
description: A unique identifier for this particular occurrence of the problem.
type: string
maxLength: 255
status:
description: The HTTP status code applicable to this problem, expressed as a string
value.
type: string
code:
description: An application-specific error code, expressed as a string value.
type: string
title:
description: A short, human-readable summary of the problem. It **SHOULD NOT** change
from occurrence to occurrence of the problem, except for purposes of localization.
type: string
detail:
description: A human-readable explanation specific to this occurrence of the problem.
type: string
source:
type: object
description: If this error was caused by a particular part of the API request, the
source will point to the query string parameter or request body property that caused
it.
properties:
pointer:
description: A JSON Pointer [RFC6901] to the associated entity in the request
document [e.g. "/spec" for a spec object, or "/spec/title" for a specific field].
type: string
parameter:
description: A string indicating which query parameter caused the error.
type: string
error_code:
type: integer
format: int32
message:
type: string
nullable: true
additionalProperties: false
title: Error
examples:
kafka_authorization_failed:
description: Thrown when the caller is not authorized to perform the underlying operation.
value:
error_code: 40301
message: Request is not authorized
'404':
description: Indicates attempted access to an unreachable or non-existing resource like e.g.
an unknown topic or partition. GET requests to endpoints not allowed in the accesslists will
also result in this response.
content:
application/json:
schema:
type: object
description: Describes a particular error encountered while performing an operation.
properties:
id:
description: A unique identifier for this particular occurrence of the problem.
type: string
maxLength: 255
status:
description: The HTTP status code applicable to this problem, expressed as a string
value.
type: string
code:
description: An application-specific error code, expressed as a string value.
type: string
title:
description: A short, human-readable summary of the problem. It **SHOULD NOT** change
from occurrence to occurrence of the problem, except for purposes of localization.
type: string
detail:
description: A human-readable explanation specific to this occurrence of the problem.
type: string
source:
type: object
description: If this error was caused by a particular part of the API request, the
source will point to the query string parameter or request body property that caused
it.
properties:
pointer:
description: A JSON Pointer [RFC6901] to the associated entity in the request
document [e.g. "/spec" for a spec object, or "/spec/title" for a specific field].
type: string
parameter:
description: A string indicating which query parameter caused the error.
type: string
error_code:
type: integer
format: int32
message:
type: string
nullable: true
additionalProperties: false
title: Error
examples:
endpoint_not_found:
description: Thrown for generic HTTP 404 errors.
value:
error_code: 404
message: HTTP 404 Not Found
cluster_not_found:
description: Thrown when using a non-existing cluster ID.
value:
error_code: 404
message: Cluster my-cluster cannot be found.
unknown_topic_or_partition:
description: Thrown when using a non-existing topic name or partition ID.
value:
error_code: 40403
message: This server does not host this topic-partition.
'413':
description: This implies the client is sending a request payload that is larger than the maximum
message size the server can accept.
content:
application/json:
schema:
type: object
description: Describes a particular error encountered while performing an operation.
properties:
id:
description: A unique identifier for this particular occurrence of the problem.
type: string
maxLength: 255
status:
description: The HTTP status code applicable to this problem, expressed as a string
value.
type: string
code:
description: An application-specific error code, expressed as a string value.
type: string
title:
description: A short, human-readable summary of the problem. It **SHOULD NOT** change
from occurrence to occurrence of the problem, except for purposes of localization.
type: string
detail:
description: A human-readable explanation specific to this occurrence of the problem.
type: string
source:
type: object
description: If this error was caused by a particular part of the API request, the
source will point to the query string parameter or request body property that caused
it.
properties:
pointer:
description: A JSON Pointer [RFC6901] to the associated entity in the request
document [e.g. "/spec" for a spec object, or "/spec/title" for a specific field].
type: string
parameter:
description: A string indicating which query parameter caused the error.
type: string
error_code:
type: integer
format: int32
message:
type: string
nullable: true
additionalProperties: false
title: Error
examples:
produce_records_expects_json:
description: Thrown by /records API if payload size exceeds the message max size
value:
error_code: 413
message: The request included a message larger than the maximum message size the server
can accept.
'415':
description: This implies the client is sending the request payload format in an unsupported
format.
content:
application/json:
schema:
type: object
description: Describes a particular error encountered while performing an operation.
properties:
id:
description: A unique identifier for this particular occurrence of the problem.
type: string
maxLength: 255
status:
description: The HTTP status code applicable to this problem, expressed as a string
value.
type: string
code:
description: An application-specific error code, expressed as a string value.
type: string
title:
description: A short, human-readable summary of the problem. It **SHOULD NOT** change
from occurrence to occurrence of the problem, except for purposes of localization.
type: string
detail:
description: A human-readable explanation specific to this occurrence of the problem.
type: string
source:
type: object
description: If this error was caused by a particular part of the API request, the
source will point to the query string parameter or request body property that caused
it.
properties:
pointer:
description: A JSON Pointer [RFC6901] to the associated entity in the request
document [e.g. "/spec" for a spec object, or "/spec/title" for a specific field].
type: string
parameter:
description: A string indicating which query parameter caused the error.
type: string
error_code:
type: integer
format: int32
message:
type: string
nullable: true
additionalProperties: false
title: Error
examples:
produce_records_expects_json:
description: Thrown by /records API if payload format content-type doesn't match expected
application/json
value:
error_code: 415
message: HTTP 415 Unsupported Media Type
'422':
description: Indicates a bad request error. It could be caused by an unexpected request body
format or other forms of request validation failure.
content:
application/json:
schema:
type: object
description: Describes a particular error encountered while performing an operation.
properties:
id:
description: A unique identifier for this particular occurrence of the problem.
type: string
maxLength: 255
status:
description: The HTTP status code applicable to this problem, expressed as a string
value.
type: string
code:
description: An application-specific error code, expressed as a string value.
type: string
title:
description: A short, human-readable summary of the problem. It **SHOULD NOT** change
from occurrence to occurrence of the problem, except for purposes of localization.
type: string
detail:
description: A human-readable explanation specific to this occurrence of the problem.
type: string
source:
type: object
description: If this error was caused by a particular part of the API request, the
source will point to the query string parameter or request body property that caused
it.
properties:
pointer:
description: A JSON Pointer [RFC6901] to the associated entity in the request
document [e.g. "/spec" for a spec object, or "/spec/title" for a specific field].
type: string
parameter:
description: A string indicating which query parameter caused the error.
type: string
error_code:
type: integer
format: int32
message:
type: string
nullable: true
additionalProperties: false
title: Error
examples:
produce_record_empty_request_body:
description: Thrown when the request body is empty.
value:
error_code: 422
message: Payload error. Request body is empty. Data is required.
'429':
description: Indicates that a rate limit threshold has been reached, and the client should retry
again later.
content:
text/html:
schema:
type: string
example:
description: A sample response from Jetty's DoSFilter.
value: <html> <head> <meta http-equiv="Content-Type" content="text/html;charset=utf-8"/>
<title>Error 429 Too Many Requests</title> </head> <body> <h2>HTTP ERROR 429 Too Many
Requests</h2> <table> <tr> <th>URI:</th> <td>/v3/clusters/my-cluster</td> </tr> <tr>
<th>STATUS:</th> <td>429</td> </tr> <tr> <th>MESSAGE:</th> <td>Too Many Requests</td>
</tr> <tr> <th>SERVLET:</th> <td>default</td> </tr> </table> </body> </html>
5XX:
description: A server-side problem that might not be addressable from the client side. Retriable
Kafka errors will contain error code 50003 in the response body.
content:
application/json:
schema:
type: object
description: Describes a particular error encountered while performing an operation.
properties:
id:
description: A unique identifier for this particular occurrence of the problem.
type: string
maxLength: 255
status:
description: The HTTP status code applicable to this problem, expressed as a string
value.
type: string
code:
description: An application-specific error code, expressed as a string value.
type: string
title:
description: A short, human-readable summary of the problem. It **SHOULD NOT** change
from occurrence to occurrence of the problem, except for purposes of localization.
type: string
detail:
description: A human-readable explanation specific to this occurrence of the problem.
type: string
source:
type: object
description: If this error was caused by a particular part of the API request, the
source will point to the query string parameter or request body property that caused
it.
properties:
pointer:
description: A JSON Pointer [RFC6901] to the associated entity in the request
document [e.g. "/spec" for a spec object, or "/spec/title" for a specific field].
type: string
parameter:
description: A string indicating which query parameter caused the error.
type: string
error_code:
type: integer
format: int32
message:
type: string
nullable: true
additionalProperties: false
title: Error
examples:
generic_internal_server_error:
description: Thrown for generic HTTP 500 errors.
value:
error_code: 500
message: Internal Server Error
parameters:
- name: cluster_id
description: The Kafka cluster ID.
in: path
required: true
schema:
type: string
example: cluster-1
- name: topic_name
description: The topic name.
in: path
required: true
schema:
type: string
example: topic-1
servers:
- url: https://pkc-00000.region.provider.confluent.cloud
x-audience: business-unit-internal
description: Confluent Cloud REST Endpoint. For example https://pkc-00000.region.provider.confluent.cloud
jsonRequestBodyExample:
partition_id: 0
headers:
- name: string
value: string
key:
type: BINARY
value:
type: BINARY
timestamp: '2024-07-29T15:51:28.071Z'