Skip to main content

Produce Records

POST 

/kafka/v3/clusters/:cluster_id/topics/:topic_name/records

Generally Available

Produce records to the given topic, returning delivery reports for each record produced. This API can be used in streaming mode by setting "Transfer-Encoding: chunked" header. For as long as the connection is kept open, the server will keep accepting records. Records are streamed to and from the server as Concatenated JSON. For each record sent to the server, the server will asynchronously send back a delivery report, in the same order, each with its own error_code. An error_code of 200 indicates success. The HTTP status code will be HTTP 200 OK as long as the connection is successfully established. To identify records that have encountered an error, check the error_code of each delivery report.

Note that the cluster_id is validated only when running in Confluent Cloud.

This API currently does not support Schema Registry integration. Sending schemas is not supported. Only BINARY, JSON, and STRING formats are supported.

Request

Responses

The response containing a delivery report for a record produced to a topic. In streaming mode, for each record sent, a separate delivery report will be returned, in the same order, each with its own error_code.

OpenAPI definition (YAML)
paths:
  /kafka/v3/clusters/{cluster_id}/topics/{topic_name}/records:
    post:
      operationId: produceRecord
      description: '[![Generally Available](https://img.shields.io/badge/Lifecycle%20Stage-Generally%20Available-%2345c6e8)](#section/Versioning/API-Lifecycle-Policy)


        Produce records to the given topic, returning delivery reports for each

        record produced. This API can be used in streaming mode by setting

        "Transfer-Encoding: chunked" header. For as long as the connection is

        kept open, the server will keep accepting records. Records are streamed

        to and from the server as Concatenated JSON. For each record sent to the

        server, the server will asynchronously send back a delivery report, in

        the same order, each with its own error_code. An error_code of 200

        indicates success. The HTTP status code will be HTTP 200 OK as long as

        the connection is successfully established. To identify records that

        have encountered an error, check the error_code of each delivery report.


        Note that the cluster_id is validated only when running in Confluent Cloud.


        This API currently does not support Schema Registry integration. Sending

        schemas is not supported. Only BINARY, JSON, and STRING formats are

        supported.'
      tags:
      - Records (v3)
      security:
      - resource-api-key: []
      - external-access-token: []
      requestBody:
        description: A single record to be produced to Kafka. To produce multiple records in the same
          request, simply concatenate the records. The delivery reports are concatenated in the same order
          as the records are sent.
        content:
          application/json:
            schema:
              type: object
              properties:
                partition_id:
                  type: integer
                  nullable: true
                  format: int32
                headers:
                  type: array
                  items:
                    type: object
                    required:
                    - name
                    properties:
                      name:
                        type: string
                      value:
                        type: string
                        format: byte
                        nullable: true
                    title: ProduceRequestHeader
                key:
                  type: object
                  properties:
                    type:
                      type: string
                      enum:
                      - BINARY
                      - JSON
                      - STRING
                    data:
                      nullable: true
                      title: AnyValue
                  nullable: true
                  title: ProduceRequestData
                value:
                  type: object
                  properties:
                    type:
                      type: string
                      enum:
                      - BINARY
                      - JSON
                      - STRING
                    data:
                      nullable: true
                      title: AnyValue
                  nullable: true
                  title: ProduceRequestData
                timestamp:
                  type: string
                  format: date-time
                  nullable: true
              title: ProduceRequest
            examples:
              binary_and_json:
                description: If using type, one of "BINARY", "JSON" or "STRING" is required.
                value:
                  partition_id: 1
                  headers:
                  - name: Header-1
                    value: SGVhZGVyLTE=
                  - name: Header-2
                    value: SGVhZGVyLTI=
                  key:
                    type: BINARY
                    data: Zm9vYmFy
                  value:
                    type: JSON
                    data:
                      foo: bar
                  timestamp: '2021-02-05T19:14:42Z'
              string:
                description: If using type, one of "BINARY", "JSON" or "STRING" is required.
                value:
                  value:
                    type: STRING
                    data: My message
              empty_value:
                description: key or value can be omitted entirely.
                value:
                  key:
                    data: 1000
      responses:
        '200':
          description: 'The response containing a delivery report for a record produced to a topic. In
            streaming mode,

            for each record sent, a separate delivery report will be returned, in the same order,

            each with its own error_code.'
          content:
            application/json:
              schema:
                type: object
                required:
                - error_code
                properties:
                  error_code:
                    type: integer
                    format: int32
                  message:
                    type: string
                  cluster_id:
                    type: string
                  topic_name:
                    type: string
                  partition_id:
                    type: integer
                    format: int32
                  offset:
                    type: integer
                    format: int64
                  timestamp:
                    type: string
                    format: date-time
                    nullable: true
                  key:
                    type: object
                    required:
                    - size
                    - type
                    properties:
                      size:
                        type: integer
                      type:
                        type: string
                        enum:
                        - BINARY
                        - JSON
                        - STRING
                    nullable: true
                    title: ProduceResponseData
                  value:
                    type: object
                    required:
                    - size
                    - type
                    properties:
                      size:
                        type: integer
                      type:
                        type: string
                        enum:
                        - BINARY
                        - JSON
                        - STRING
                    nullable: true
                    title: ProduceResponseData
                title: ProduceResponse
              examples:
                produce_record_success:
                  description: The record was successfully produced to the topic.
                  value:
                    error_code: 200
                    cluster_id: cluster-1
                    topic_name: topic-1
                    partition_id: 1
                    offset: 0
                    timestamp: '2021-02-05T19:14:42Z'
                    key:
                      type: BINARY
                      size: 7
                    value:
                      type: JSON
                      size: 15
                produce_record_bad_binary_data:
                  description: Thrown when sending a BINARY value which is not a base64-encoded string.
                  value:
                    error_code: 400
                    message: 'Bad Request: data=1 is not a base64 string.'
        '400':
          description: Indicates a bad request error. It could be caused by an unexpected request body
            format or other forms of request validation failure.
          content:
            application/json:
              schema:
                type: object
                description: Describes a particular error encountered while performing an operation.
                properties:
                  id:
                    description: A unique identifier for this particular occurrence of the problem.
                    type: string
                    maxLength: 255
                  status:
                    description: The HTTP status code applicable to this problem, expressed as a string
                      value.
                    type: string
                  code:
                    description: An application-specific error code, expressed as a string value.
                    type: string
                  title:
                    description: A short, human-readable summary of the problem. It **SHOULD NOT** change
                      from occurrence to occurrence of the problem, except for purposes of localization.
                    type: string
                  detail:
                    description: A human-readable explanation specific to this occurrence of the problem.
                    type: string
                  source:
                    type: object
                    description: If this error was caused by a particular part of the API request, the
                      source will point to the query string parameter or request body property that caused
                      it.
                    properties:
                      pointer:
                        description: A JSON Pointer [RFC6901] to the associated entity in the request
                          document [e.g. "/spec" for a spec object, or "/spec/title" for a specific field].
                        type: string
                      parameter:
                        description: A string indicating which query parameter caused the error.
                        type: string
                  error_code:
                    type: integer
                    format: int32
                  message:
                    type: string
                    nullable: true
                additionalProperties: false
                title: Error
              examples:
                header_not_base64_encoded:
                  description: Thrown when headers in the produce-record are not base64 encoded.
                  value:
                    error_code: 400
                    message: 'Cannot deserialize value of type `byte[]` from String "": Unexpected end
                      of base64-encoded String: base64 variant ''MIME-NO-LINEFEEDS'' expects padding (one
                      or more ''='' characters) at the end. This Base64Variant might have been incorrectly
                      configured'
        '401':
          description: Indicates a client authentication error. Kafka authentication failures will contain
            error code 40101 in the response body.
          content:
            application/json:
              schema:
                type: object
                description: Describes a particular error encountered while performing an operation.
                properties:
                  id:
                    description: A unique identifier for this particular occurrence of the problem.
                    type: string
                    maxLength: 255
                  status:
                    description: The HTTP status code applicable to this problem, expressed as a string
                      value.
                    type: string
                  code:
                    description: An application-specific error code, expressed as a string value.
                    type: string
                  title:
                    description: A short, human-readable summary of the problem. It **SHOULD NOT** change
                      from occurrence to occurrence of the problem, except for purposes of localization.
                    type: string
                  detail:
                    description: A human-readable explanation specific to this occurrence of the problem.
                    type: string
                  source:
                    type: object
                    description: If this error was caused by a particular part of the API request, the
                      source will point to the query string parameter or request body property that caused
                      it.
                    properties:
                      pointer:
                        description: A JSON Pointer [RFC6901] to the associated entity in the request
                          document [e.g. "/spec" for a spec object, or "/spec/title" for a specific field].
                        type: string
                      parameter:
                        description: A string indicating which query parameter caused the error.
                        type: string
                  error_code:
                    type: integer
                    format: int32
                  message:
                    type: string
                    nullable: true
                additionalProperties: false
                title: Error
              examples:
                kafka_authentication_failed:
                  description: Thrown when using Basic authentication with wrong Kafka credentials.
                  value:
                    error_code: 40101
                    message: Authentication failed
        '403':
          description: Indicates a client authorization error. Kafka authorization failures will contain
            error code 40301 in the response body.
          content:
            application/json:
              schema:
                type: object
                description: Describes a particular error encountered while performing an operation.
                properties:
                  id:
                    description: A unique identifier for this particular occurrence of the problem.
                    type: string
                    maxLength: 255
                  status:
                    description: The HTTP status code applicable to this problem, expressed as a string
                      value.
                    type: string
                  code:
                    description: An application-specific error code, expressed as a string value.
                    type: string
                  title:
                    description: A short, human-readable summary of the problem. It **SHOULD NOT** change
                      from occurrence to occurrence of the problem, except for purposes of localization.
                    type: string
                  detail:
                    description: A human-readable explanation specific to this occurrence of the problem.
                    type: string
                  source:
                    type: object
                    description: If this error was caused by a particular part of the API request, the
                      source will point to the query string parameter or request body property that caused
                      it.
                    properties:
                      pointer:
                        description: A JSON Pointer [RFC6901] to the associated entity in the request
                          document [e.g. "/spec" for a spec object, or "/spec/title" for a specific field].
                        type: string
                      parameter:
                        description: A string indicating which query parameter caused the error.
                        type: string
                  error_code:
                    type: integer
                    format: int32
                  message:
                    type: string
                    nullable: true
                additionalProperties: false
                title: Error
              examples:
                kafka_authorization_failed:
                  description: Thrown when the caller is not authorized to perform the underlying operation.
                  value:
                    error_code: 40301
                    message: Request is not authorized
        '404':
          description: Indicates attempted access to an unreachable or non-existing resource like e.g.
            an unknown topic or partition. GET requests to endpoints not allowed in the accesslists will
            also result in this response.
          content:
            application/json:
              schema:
                type: object
                description: Describes a particular error encountered while performing an operation.
                properties:
                  id:
                    description: A unique identifier for this particular occurrence of the problem.
                    type: string
                    maxLength: 255
                  status:
                    description: The HTTP status code applicable to this problem, expressed as a string
                      value.
                    type: string
                  code:
                    description: An application-specific error code, expressed as a string value.
                    type: string
                  title:
                    description: A short, human-readable summary of the problem. It **SHOULD NOT** change
                      from occurrence to occurrence of the problem, except for purposes of localization.
                    type: string
                  detail:
                    description: A human-readable explanation specific to this occurrence of the problem.
                    type: string
                  source:
                    type: object
                    description: If this error was caused by a particular part of the API request, the
                      source will point to the query string parameter or request body property that caused
                      it.
                    properties:
                      pointer:
                        description: A JSON Pointer [RFC6901] to the associated entity in the request
                          document [e.g. "/spec" for a spec object, or "/spec/title" for a specific field].
                        type: string
                      parameter:
                        description: A string indicating which query parameter caused the error.
                        type: string
                  error_code:
                    type: integer
                    format: int32
                  message:
                    type: string
                    nullable: true
                additionalProperties: false
                title: Error
              examples:
                endpoint_not_found:
                  description: Thrown for generic HTTP 404 errors.
                  value:
                    error_code: 404
                    message: HTTP 404 Not Found
                cluster_not_found:
                  description: Thrown when using a non-existing cluster ID.
                  value:
                    error_code: 404
                    message: Cluster my-cluster cannot be found.
                unknown_topic_or_partition:
                  description: Thrown when using a non-existing topic name or partition ID.
                  value:
                    error_code: 40403
                    message: This server does not host this topic-partition.
        '413':
          description: This implies the client is sending a request payload that is larger than the maximum
            message size the server can accept.
          content:
            application/json:
              schema:
                type: object
                description: Describes a particular error encountered while performing an operation.
                properties:
                  id:
                    description: A unique identifier for this particular occurrence of the problem.
                    type: string
                    maxLength: 255
                  status:
                    description: The HTTP status code applicable to this problem, expressed as a string
                      value.
                    type: string
                  code:
                    description: An application-specific error code, expressed as a string value.
                    type: string
                  title:
                    description: A short, human-readable summary of the problem. It **SHOULD NOT** change
                      from occurrence to occurrence of the problem, except for purposes of localization.
                    type: string
                  detail:
                    description: A human-readable explanation specific to this occurrence of the problem.
                    type: string
                  source:
                    type: object
                    description: If this error was caused by a particular part of the API request, the
                      source will point to the query string parameter or request body property that caused
                      it.
                    properties:
                      pointer:
                        description: A JSON Pointer [RFC6901] to the associated entity in the request
                          document [e.g. "/spec" for a spec object, or "/spec/title" for a specific field].
                        type: string
                      parameter:
                        description: A string indicating which query parameter caused the error.
                        type: string
                  error_code:
                    type: integer
                    format: int32
                  message:
                    type: string
                    nullable: true
                additionalProperties: false
                title: Error
              examples:
                produce_records_expects_json:
                  description: Thrown by /records API if payload size exceeds the message max size
                  value:
                    error_code: 413
                    message: The request included a message larger than the maximum message size the server
                      can accept.
        '415':
          description: This implies the client is sending the request payload format in an unsupported
            format.
          content:
            application/json:
              schema:
                type: object
                description: Describes a particular error encountered while performing an operation.
                properties:
                  id:
                    description: A unique identifier for this particular occurrence of the problem.
                    type: string
                    maxLength: 255
                  status:
                    description: The HTTP status code applicable to this problem, expressed as a string
                      value.
                    type: string
                  code:
                    description: An application-specific error code, expressed as a string value.
                    type: string
                  title:
                    description: A short, human-readable summary of the problem. It **SHOULD NOT** change
                      from occurrence to occurrence of the problem, except for purposes of localization.
                    type: string
                  detail:
                    description: A human-readable explanation specific to this occurrence of the problem.
                    type: string
                  source:
                    type: object
                    description: If this error was caused by a particular part of the API request, the
                      source will point to the query string parameter or request body property that caused
                      it.
                    properties:
                      pointer:
                        description: A JSON Pointer [RFC6901] to the associated entity in the request
                          document [e.g. "/spec" for a spec object, or "/spec/title" for a specific field].
                        type: string
                      parameter:
                        description: A string indicating which query parameter caused the error.
                        type: string
                  error_code:
                    type: integer
                    format: int32
                  message:
                    type: string
                    nullable: true
                additionalProperties: false
                title: Error
              examples:
                produce_records_expects_json:
                  description: Thrown by /records API if payload format content-type doesn't match expected
                    application/json
                  value:
                    error_code: 415
                    message: HTTP 415 Unsupported Media Type
        '422':
          description: Indicates a bad request error. It could be caused by an unexpected request body
            format or other forms of request validation failure.
          content:
            application/json:
              schema:
                type: object
                description: Describes a particular error encountered while performing an operation.
                properties:
                  id:
                    description: A unique identifier for this particular occurrence of the problem.
                    type: string
                    maxLength: 255
                  status:
                    description: The HTTP status code applicable to this problem, expressed as a string
                      value.
                    type: string
                  code:
                    description: An application-specific error code, expressed as a string value.
                    type: string
                  title:
                    description: A short, human-readable summary of the problem. It **SHOULD NOT** change
                      from occurrence to occurrence of the problem, except for purposes of localization.
                    type: string
                  detail:
                    description: A human-readable explanation specific to this occurrence of the problem.
                    type: string
                  source:
                    type: object
                    description: If this error was caused by a particular part of the API request, the
                      source will point to the query string parameter or request body property that caused
                      it.
                    properties:
                      pointer:
                        description: A JSON Pointer [RFC6901] to the associated entity in the request
                          document [e.g. "/spec" for a spec object, or "/spec/title" for a specific field].
                        type: string
                      parameter:
                        description: A string indicating which query parameter caused the error.
                        type: string
                  error_code:
                    type: integer
                    format: int32
                  message:
                    type: string
                    nullable: true
                additionalProperties: false
                title: Error
              examples:
                produce_record_empty_request_body:
                  description: Thrown when the request body is empty.
                  value:
                    error_code: 422
                    message: Payload error. Request body is empty. Data is required.
        '429':
          description: Indicates that a rate limit threshold has been reached, and the client should retry
            again later.
          content:
            text/html:
              schema:
                type: string
              example:
                description: A sample response from Jetty's DoSFilter.
                value: <html> <head> <meta http-equiv="Content-Type" content="text/html;charset=utf-8"/>
                  <title>Error 429 Too Many Requests</title> </head> <body> <h2>HTTP ERROR 429 Too Many
                  Requests</h2> <table> <tr> <th>URI:</th> <td>/v3/clusters/my-cluster</td> </tr> <tr>
                  <th>STATUS:</th> <td>429</td> </tr> <tr> <th>MESSAGE:</th> <td>Too Many Requests</td>
                  </tr> <tr> <th>SERVLET:</th> <td>default</td> </tr> </table> </body> </html>
        5XX:
          description: A server-side problem that might not be addressable from the client side. Retriable
            Kafka errors will contain error code 50003 in the response body.
          content:
            application/json:
              schema:
                type: object
                description: Describes a particular error encountered while performing an operation.
                properties:
                  id:
                    description: A unique identifier for this particular occurrence of the problem.
                    type: string
                    maxLength: 255
                  status:
                    description: The HTTP status code applicable to this problem, expressed as a string
                      value.
                    type: string
                  code:
                    description: An application-specific error code, expressed as a string value.
                    type: string
                  title:
                    description: A short, human-readable summary of the problem. It **SHOULD NOT** change
                      from occurrence to occurrence of the problem, except for purposes of localization.
                    type: string
                  detail:
                    description: A human-readable explanation specific to this occurrence of the problem.
                    type: string
                  source:
                    type: object
                    description: If this error was caused by a particular part of the API request, the
                      source will point to the query string parameter or request body property that caused
                      it.
                    properties:
                      pointer:
                        description: A JSON Pointer [RFC6901] to the associated entity in the request
                          document [e.g. "/spec" for a spec object, or "/spec/title" for a specific field].
                        type: string
                      parameter:
                        description: A string indicating which query parameter caused the error.
                        type: string
                  error_code:
                    type: integer
                    format: int32
                  message:
                    type: string
                    nullable: true
                additionalProperties: false
                title: Error
              examples:
                generic_internal_server_error:
                  description: Thrown for generic HTTP 500 errors.
                  value:
                    error_code: 500
                    message: Internal Server Error
      parameters:
      - name: cluster_id
        description: The Kafka cluster ID.
        in: path
        required: true
        schema:
          type: string
        example: cluster-1
      - name: topic_name
        description: The topic name.
        in: path
        required: true
        schema:
          type: string
        example: topic-1
      servers:
      - url: https://pkc-00000.region.provider.confluent.cloud
        x-audience: business-unit-internal
        description: Confluent Cloud REST Endpoint. For example https://pkc-00000.region.provider.confluent.cloud
      jsonRequestBodyExample:
        partition_id: 0
        headers:
        - name: string
          value: string
        key:
          type: BINARY
        value:
          type: BINARY
        timestamp: '2024-07-29T15:51:28.071Z'