HTTP Sink V2 Connector for Confluent Cloud

The fully-managed HTTP Sink V2 connector for Confluent Cloud integrates Apache Kafka® with an API using HTTP or HTTPS. It allows you to configure one or more APIs seamlessly with an OpenAPI/Swagger specification file, reducing overall configuration time and helping you achieve better performance when compared to the HTTP Sink Connector for Confluent Cloud. In this page, you will find all the features the HTTP Sink V2 connector offers and discover everything you need to begin using the connector.

Features

The HTTP Sink V2 connector includes the following features:

  • Multiple API path support: The connector allows you to configure up to 15 API paths having the same base URL and authentication mechanism.

  • OpenAPI Specification-based configuration: The connector provides seamless configuration through an OpenAPI specification file.

  • Secure access and data exchange: The connector supports the following authentication mechanisms:

    • Basic
    • Bearer
    • OAuth 2.0 Client Credentials grant flow
  • API error reporting management: You can configure the connector to notify you when an API error occurs through email or through the Confluent Cloud user interface. You also can configure the connector to ignore when an API error occurs.

  • API validation: The connector allows you to test the API using a test record and view the test API response logs in the Confluent Cloud user interface.

  • Template variables: The connector allows you to specify template variables, such as ${topic} and ${key}, along with fields from the Kafka record for use in an HTTP request:

    • Headers
    • Query parameters
    • Path parameters
    • Body parameters

    The connector constructs a unique URL using these parameters and enables substitution of template variables in headers, parameters, and body content.

  • Supported data formats: The connector supports Avro, Bytes, JSON (schemaless), JSON Schema, and Protobuf data formats. Schema Registry must be enabled to use a Schema Registry-based format like Avro, JSON Schema, or Protobuf. For additional information, see Schema Registry Enabled Environments.

  • Custom offset support: The connector allows you to configure custom offsets using the Confluent Cloud user interface to prevent data loss and data duplication.

  • Configurable retry functionality: The connector allows you to customize retry settings based on your requirements.

For more information and examples to use with the Confluent Cloud API for Connect, see the Confluent Cloud API for Managed and Custom Connectors section.

Limitations

Be sure to review the following information.

Quick Start

Use this quick start to get up and running with the Confluent Cloud HTTP Sink V2 connector.

Prerequisites
  • Authorized access to a Confluent Cloud cluster on Amazon Web Services (AWS), Microsoft Azure (Azure), or Google Cloud).
  • The Confluent CLI installed and configured for the cluster. For help, see Install the Confluent CLI.
  • Schema Registry must be enabled to use a Schema Registry-based format (for example, Avro, JSON_SR (JSON Schema), or Protobuf). For more information, see Schema Registry Enabled Environments.
  • OpenAPI Specification file version 3.0.
  • Relevant authentication credentials for both Kafka and your data system.
  • At least one source Kafka topic must exist in your Confluent Cloud cluster before creating the Sink connector.

Using the Confluent Cloud Console

Step 1: Launch your Confluent Cloud cluster

See the Quick Start for Confluent Cloud for installation instructions.

Step 2: Add a connector

In the left navigation menu, click Connectors. If you already have connectors in your cluster, click + Add connector.

Step 3: Select your connector

Click the HTTP Sink V2 connector card.

HTTP Sink V2 Connector Card

Step 4: Enter the connector details

Note

  • Ensure you have all your prerequisites completed.
  • An asterisk ( * ) designates a required entry.

At the Add HTTP Sink V2 Connector screen, complete the following:

Enter the following details:

  1. Provide the connector name in the Connector name field.
  2. Add the OpenAPI specification file (OAS 3.0 or higher) by adding a URL endpoint or by uploading a YAML/JSON formatted specification file. Note that you can convert Swagger 1.x or 2.0 definitions to OpenAPI 3.0 using the Swagger Converter.
    • To add a URL endpoint, enter the URL in the Add via URL field. Note that the maximum file size is 3 MB.
    • To upload a YAML/JSON formatted specification file, select Add a file, then click Upload file to upload the file. Note that the maximum file size is 1 MB.
  3. Select the Input Kafka record value format (data coming from the Kafka topic): AVRO, BYTES, JSON, JSON_SR (JSON Schema), or PROTOBUF. A valid schema must be available in Schema Registry to use a schema-based message format (for example, Avro, JSON Schema, or Protobuf). For more information, see Schema Registry Enabled Environments. Note that to consume STRING data, select schemaless JSON.

Step 5: Check for records

Verify that records are being produced at the endpoint.

For more information and examples to use with the Confluent Cloud API for Connect, see the Confluent Cloud API for Managed and Custom Connectors section.

Note that Dead Letter Queue (DLQ), success, and error topics are automatically created for the connector. For more details, see Confluent Cloud Dead Letter Queue.

Using the Confluent CLI

To set up and run the connector using the Confluent CLI, complete the following steps, but ensure you have met all prerequisites.

Step 1: List the available connectors

Enter the following command to list available connectors:

confluent connect plugin list

Step 2: List the connector configuration properties

Enter the following command to show the connector configuration properties:

confluent connect plugin describe <connector-plugin-name>

The command output shows the required and optional configuration properties.

Step 3: Create the connector configuration file

Create a JSON file that contains the connector configuration properties. The following example shows the required connector properties.

{
  "topics": "topic_0",
  "schema.context.name": "default",
  "value.subject.name.strategy": "TopicNameStrategy",
  "input.data.format": "AVRO",
  "connector.class": "HttpSinkV2",
  "name": "HttpSinkV2Connector_0",
  "kafka.auth.mode": "KAFKA_API_KEY",
  "kafka.api.key": "<my-kafka-api-key>",
  "kafka.api.secret": "<my-kafka-api-secret>",
  "max.poll.interval.ms": "300000",
  "max.poll.records": "500",
  "tasks.max": "1",
  "http.api.base.url": "http://example.com/absenceManagement/v1",
  "auth.type": "NONE",
  "https.ssl.enabled": "false",
  "https.host.verifier.enabled": "true",
  "behavior.on.error": "FAIL",
  "apis.num": "1",
  "api1.http.request.method": "POST",
  "api1.http.connect.timeout.ms": "30000",
  "api1.http.request.timeout.ms": "30000",
  "api1.behavior.on.null.values": "IGNORE",
  "api1.max.retries": "5",
  "api1.request.body.format": "JSON",
  "api1.retry.backoff.policy": "EXPONENTIAL_WITH_JITTER",
  "api1.max.batch.size": "1",
  "api1.retry.backoff.ms": "3000",
  "api1.retry.on.status.codes": "400-",
  "api1.http.request.headers.separator": "|",
  "api1.http.request.parameters.separator": "&",
  "api1.batch.separator": ",",
  "api1.batch.json.as.array": "false",
  "api1.http.path.parameters.separator": "|",
  "api1.test.api": "false",
  "api1.allow.get.request.body": "false",
}

Note the following property definitions:

  • "connector.class": Identifies the connector plugin name.
  • "input.data.format": Sets the input Kafka record value format (data coming from the Kafka topic). Valid entries are AVRO, JSON_SR, PROTOBUF, JSON, or BYTES. You must have Confluent Cloud Schema Registry configured if using a schema-based message format (for example, Avro, JSON Schema, or Protobuf). Note that you can select schemaless JSON to consume STRING data.
  • "name": Sets a name for your new connector.
  • "kafka.auth.mode": Identifies the connector authentication mode you want to use. There are two options: SERVICE_ACCOUNT or KAFKA_API_KEY (the default). To use an API key and secret, specify the configuration properties kafka.api.key and kafka.api.secret, as shown in the example configuration (above). To use a service account, specify the Resource ID in the property kafka.service.account.id=<service-account-resource-ID>. To list the available service account resource IDs, use the following command:

    confluent iam service-account list
    

    For example:

    confluent iam service-account list
    
       Id     | Resource ID |       Name        |    Description
    +---------+-------------+-------------------+-------------------
       123456 | sa-l1r23m   | sa-1              | Service account 1
       789101 | sa-l4d56p   | sa-2              | Service account 2
    
  • "request.method": Enter an HTTP API Request Method: PUT, POST, GET, or PATCH. Defaults to POST.

  • "topics": Enter the topic name or a comma-separated list of topic names.

Single Message Transforms: For details about adding SMTs using the CLI, see the Single Message Transforms (SMT) documentation. For all property values and descriptions, see Configuration Properties.

Step 3: Load the properties file and create the connector

Enter the following command to load the configuration and start the connector:

confluent connect cluster create --config-file <file-name>.json

For example:

confluent connect cluster create --config-file http-sink-v2-config.json

Example output:

Created connector HttpSinkV2Connector_0 lcc-do6vzd

Step 4: Check the connector status

Enter the following command to check the connector status:

confluent connect cluster list

Example output:

ID           |             Name              | Status  | Type | Trace |
+------------+-------------------------------+---------+------+-------+
lcc-do6vzd   | HttpSinkV2Connector_0         | RUNNING | sink |       |

Step 5: Check for records

Verify that records are populating the endpoint.

For more information and examples to use with the Confluent Cloud API for Connect, see the Confluent Cloud API for Managed and Custom Connectors section.

Note that Dead Letter Queue (DLQ), success, and error topics are automatically created for the connector. For more details, see Confluent Cloud Dead Letter Queue.

Test API

Use the Test API functionality to test the API with a sample record and view the logs directly in the Confluent Cloud user interface.

Important

  • This feature is only available for publicly accessible endpoints.
  • Invoking the Test API on an API may change data on the end system, depending on the API’s behavior.

When using this feature with the HTTP Sink V2 connector, add your details to the following fields:

  • ${topic}: Topic that the API is expected to consume from. Note that the connector will not actually read anything from the topic, and the topic name configured here will be used wherever you have configured the ${topic} template variable.

  • Test message: This message should reflect the data present in the Kafka topic. You must configure the expected Kafka key, headers, and values as applicable as shown in the following example:

    {
      "key": "key1",
      "headers": [
        {
          "header1": "h1"
        },
        {
          "header2": "h2"
        }
      ],
      "value": "{\"msg\": \"hello world\"}"
    }
    

Configuration Properties

Use the following configuration properties with the fully-managed HTTP V2 Sink connector.

Which topics do you want to get data from?

topics

Identifies the topic name or a comma-separated list of topic names.

  • Type: list
  • Importance: high

Schema Config

schema.context.name

Add a schema context name. A schema context represents an independent scope in Schema Registry. It is a separate sub-schema tied to topics in different Kafka clusters that share the same Schema Registry instance. If not used, the connector uses the default schema configured for Schema Registry in your Confluent Cloud environment.

  • Type: string
  • Default: default
  • Importance: medium
value.subject.name.strategy

Determines how to construct the subject name under which the value schema is registered with Schema Registry.

  • Type: string
  • Default: TopicNameStrategy
  • Valid Values: RecordNameStrategy, TopicNameStrategy, TopicRecordNameStrategy
  • Importance: medium

Input messages

input.data.format

Sets the input Kafka record value format. Valid entries are AVRO, JSON_SR, PROTOBUF, JSON or BYTES. Note that you need to have Confluent Cloud Schema Registry configured if using a schema-based message format like AVRO, JSON_SR, and PROTOBUF.

  • Type: string
  • Default: JSON
  • Importance: high

How should we connect to your data?

name

Sets a name for your connector.

  • Type: string
  • Valid Values: A string at most 64 characters long
  • Importance: high

Kafka Cluster credentials

kafka.auth.mode

Kafka Authentication mode. It can be one of KAFKA_API_KEY or SERVICE_ACCOUNT. It defaults to KAFKA_API_KEY mode.

  • Type: string
  • Default: KAFKA_API_KEY
  • Valid Values: KAFKA_API_KEY, SERVICE_ACCOUNT
  • Importance: high
kafka.api.key

Kafka API Key. Required when kafka.auth.mode==KAFKA_API_KEY.

  • Type: password
  • Importance: high
kafka.service.account.id

The Service Account that will be used to generate the API keys to communicate with Kafka Cluster.

  • Type: string
  • Importance: high
kafka.api.secret

Secret associated with Kafka API key. Required when kafka.auth.mode==KAFKA_API_KEY.

  • Type: password
  • Importance: high

CSFLE

csfle.enabled

Determines whether the connector honours CSFLE rules or not

  • Type: boolean
  • Default: false
  • Importance: high
csfle.onFailure

Configures the behavior for decryption failures. If set to ERROR, the connector will FAIL. If set to NONE, the connector will ignore the decryption failure and proceed to write the data in its encrypted form.

  • Type: string
  • Default: ERROR
  • Valid Values: ERROR, NONE
  • Importance: medium
sr.service.account.id

The Service Account that will be used to generate the API keys to communicate with SR Cluster.

  • Type: string
  • Default: “”
  • Importance: high

Reporter

report.errors.as

Dictates the content of records produced to the error topic. If set to Error string the value would be a human readable string describing the failure. The value will include some or all of the following information if available: http response code, reason phrase, submitted payload, url, response content, exception and error message. If set to http_response, the value would be the plain response content for the request which failed to write the record. In both modes, any information about the failure will also be included in the error records headers.

  • Type: string
  • Default: Error string
  • Importance: low

Consumer configuration

max.poll.interval.ms

The maximum delay between subsequent consume requests to Kafka. This configuration property may be used to improve the performance of the connector, if the connector cannot send records to the sink system. Defaults to 300000 milliseconds (5 minutes).

  • Type: long
  • Default: 300000 (5 minutes)
  • Valid Values: [60000,…,1800000]
  • Importance: low
max.poll.records

The maximum number of records to consume from Kafka in a single request. This configuration property may be used to improve the performance of the connector, if the connector cannot send records to the sink system. Defaults to 500 records.

  • Type: long
  • Default: 500
  • Valid Values: [1,…,500]
  • Importance: low

Number of tasks for this connector

tasks.max

Maximum number of tasks for the connector.

  • Type: int
  • Valid Values: [1,…]
  • Importance: high

Authentication

http.api.base.url

The HTTP API Base URL. For example: http://example.com/absenceManagement/v1.

  • Type: string
  • Importance: high
auth.type

Authentication type of the endpoint. Valid values are NONE, BASIC, OAUTH2 (Client Credentials grant type only), BEARER.

  • Type: string
  • Default: NONE
  • Importance: high
connection.user

The username to be used with an endpoint requiring basic authentication.

  • Type: string
  • Importance: medium
connection.password

The password to be used with an endpoint requiring basic authentication.

  • Type: password
  • Importance: medium
bearer.token

The bearer authentication token to be used with an endpoint requiring bearer token based authentication.

  • Type: password
  • Importance: medium
oauth2.token.url

The URL to be used for fetching the OAuth2 token. Client Credentials is the only supported grant type.

  • Type: string
  • Importance: medium
oauth2.client.id

The client id used when fetching the OAuth2 token.

  • Type: string
  • Importance: medium
oauth2.client.secret

The client secret used when fetching the OAuth2 token.

  • Type: password
  • Importance: medium
oauth2.token.property

The name of the property containing the OAuth2 token returned by the OAuth2 token URL (defaults to access_token).

  • Type: string
  • Default: access_token
  • Importance: medium
oauth2.client.scope

The scope parameter sent to the service when fetching the OAuth2 token.

  • Type: string
  • Default: any
  • Importance: medium
oauth2.client.auth.mode

Specifies how to encode client_id and client_secret in the OAuth2 authorization request. If set to header, the credentials are encoded as an ‘Authorization: Basic <base-64 encoded client_id:client_secret>’ HTTP header. If set to ‘url’, then client_id and client_secret are sent as URL encoded parameters.

  • Type: string
  • Default: header
  • Importance: medium
oauth2.client.headers

HTTP headers to be included in the OAuth2 client endpoint. Individual headers should be separated by |

  • Type: password
  • Importance: low
https.ssl.enabled

Whether or not to connect to the endpoint via SSL.

  • Type: boolean
  • Default: false
  • Importance: medium
https.ssl.keystorefile

The key store containing the server certificate.

  • Type: password
  • Importance: low
https.ssl.keystore.password

The store password for the key store file.

  • Type: password
  • Importance: high
https.ssl.key.password

The password for the private key in the key store file.

  • Type: password
  • Importance: high
https.ssl.truststorefile

The trust store containing a server CA certificate.

  • Type: password
  • Importance: high
https.ssl.truststore.password

The trust store password containing a server CA certificate.

  • Type: password
  • Importance: high
https.ssl.protocol

The protocol to use for SSL connections

  • Type: string
  • Default: TLSv1.3
  • Importance: medium

Behavior On Error

behavior.on.error

Error handling behavior setting for handling error response from HTTP requests.

  • Type: string
  • Default: FAIL
  • Importance: low

APIs

apis.num

The number of http(s) APIs to configure. This value should be less than or equal to 15

  • Type: int
  • Default: 1
  • Importance: high

API-1 Configs

api1.http.api.path

The HTTP API path together with the ‘http.api.base.url’ will form the complete HTTP(S) URL. This path can be templated with offset information. For example: /resource1/${offset} where ${offset} will be substituted with the offset generated from the previous request’s response (or if it’s the first request, from ‘http.initial.offset’).

  • Type: string
  • Importance: high
api1.topics

List of topics for this API

  • Type: list
  • Default: “”
  • Importance: high
api1.http.request.headers

HTTP headers to be included in each request. Header names and values should be separated by :. Distinct headers should be separated by |. For example: From:abcxyz@confluent.io|Content-Length:348.

  • Type: string
  • Importance: medium
api1.http.request.method
  • Type: string
  • Default: POST
  • Importance: high
api1.http.request.parameters

HTTP parameters to be added to each request. Parameter names and values should be separated by =. Distinct parameters should be separated by &.

  • Type: string
  • Importance: medium
api1.http.connect.timeout.ms

The time in milliseconds to wait for a connection to be established

  • Type: int
  • Default: 30000 (30 seconds)
  • Importance: medium
api1.http.request.timeout.ms

The time in milliseconds to wait for a request response from the server

  • Type: int
  • Default: 30000 (30 seconds)
  • Importance: medium
api1.behavior.on.null.values

How to handle records with a non-null key and a null value (i.e. Kafka tombstone records). Valid options are IGNORE, DELETE and FAIL

  • Type: string
  • Default: IGNORE
  • Importance: low
api1.max.retries

The maximum number of times to retry on errors before failing the task.

  • Type: int
  • Default: 5
  • Importance: medium
api1.request.body.format

Used to produce request body in either JSON or String format. The default value is JSON.

  • Type: string
  • Default: JSON
  • Importance: medium
api1.batch.key.pattern

Pattern used to build the key for a given batch. ${key} and ${topic} can be used to include message attributes here

  • Type: string
  • Importance: high
api1.retry.backoff.policy

The backoff policy to use in terms of retry - CONSTANT_VALUE or EXPONENTIAL_WITH_JITTER

  • Type: string
  • Default: EXPONENTIAL_WITH_JITTER
  • Importance: medium
api1.max.batch.size

The number of records accumulated in a batch before the HTTP API is invoked. Note that Basic and Standard Clusters may experience throughput limitations, even with a higher batch size.

  • Type: int
  • Default: 1
  • Importance: high
api1.retry.backoff.ms

The initial duration in milliseconds to wait following an error before a retry attempt is made. Subsequent backoff attempts can be a constant value or exponential with jitter (can be configured using api*.retry.backoff.policy parameter). Jitter adds randomness to the exponential backoff algorithm to prevent synchronized retries.

  • Type: int
  • Default: 3000 (3 seconds)
  • Valid Values: [100,…]
  • Importance: medium
api1.batch.prefix

Prefix added to record batches. This is applied once at the beginning of the batch of records

  • Type: string
  • Importance: high
api1.retry.on.status.codes

Comma-separated list of HTTP status codes or range of codes to retry on. Ranges are specified with start and optional end code. Range boundaries are inclusive. For instance, 400- includes all codes greater than or equal to 400. 400-500 includes codes from 400 to 500, including 500. Multiple ranges and single codes can be specified together to achieve fine-grained control over retry behavior. For example, 404,408,500- will retry on 404 NOT FOUND, 408 REQUEST TIMEOUT, and all 5xx error codes. Note that some status codes will always be retried, such as unauthorized, timeouts and too many requests.

  • Type: string
  • Default: 400-
  • Importance: medium
api1.batch.suffix

Suffix added to record batches. This is applied once at the end of the batch of records

  • Type: string
  • Importance: high
api1.http.path.parameters

HTTP path parameters to be added to the request. Parameter names and values should be separated by :. Distinct parameters should be separated by |. Parameter values can be templated with different template values like ${key}, ${topic}, ${offset} or other field references from kafka record.

  • Type: string
  • Importance: medium
api1.batch.json.as.array

Whether or not to use an array to bundle json records. Only used when request.body.format is set to json. This can be disabled only when max.batch.size is set to 1.

  • Type: boolean
  • Default: false
  • Importance: high
api1.http.request.body

The custom payload that will be send to the destination instead of record. The value can be templated with key, topic and any other record key (for example: search_after: ${key}) where ${key} will be substituted with the key obtained from the record.

  • Type: string
  • Importance: medium

Next Steps

For an example that shows fully-managed Confluent Cloud connectors in action with Confluent Cloud ksqlDB, see the Cloud ETL Demo. This example also shows how to use Confluent CLI to manage your resources in Confluent Cloud.

../_images/topology.png