HTTP Source V2 Connector for Confluent Cloud

The fully-managed HTTP Source V2 connector for Confluent Cloud integrates Apache Kafka® with an API using HTTP or HTTPS. It allows you to configure one or more APIs seamlessly with an OpenAPI/Swagger specification file, reducing overall configuration time and helping you achieve better performance when compared to the HTTP Source Connector for Confluent Cloud. In this page, you will find all the features the HTTP Source V2 connector offers and discover everything you need to begin using the connector.

Note

If you require private networking for fully-managed connectors, make sure to set up the proper networking beforehand. For more information, see Manage Networking for Confluent Cloud Connectors.

Features

The HTTP Source V2 connector includes the following features:

  • Multiple API path support: The connector allows you to configure up to 15 API paths per connector having the same base URL and authentication mechanism.
  • OpenAPI Specification-based configuration: The connector provides seamless configuration through an OpenAPI specification file.
  • Secure access and data exchange: The connector supports the following authentication mechanisms:
    • Basic
    • Bearer
    • OAuth 2.0 Client Credentials grant flow
  • Client-side field level encryption (CSFLE) support: The connector supports CSFLE for sensitive data. For more information about CSFLE setup, see the Manage CSFLE for connectors.
  • API error reporting management: You can configure the connector to notify you when an API error occurs through email or through the Confluent Cloud user interface. You also can configure the connector to ignore when an API error occurs.
  • API validation: The connector allows you to test the API using a test record and view the test API response in the Confluent Cloud user interface.
  • Template variables: The connector allows you to use the ${offset} template variable and gives you the ability to substitute template variables in parameters, headers, and body content.
  • At-least-once delivery: The connector guarantees that records are delivered at-least-once to the Kafka topic.
  • Supported data formats: The connector supports Schema Registry-based formats: Avro, JSON Schema, and Protobuf data formats. Schema Registry must be enabled to use a Schema Registry-based format. For more information, see Schema Registry Enabled Environments.
  • Schema Registry and Schema Context support: The connector allows you to map an API to a specific schema context so that you can use the schema context feature in different environments.
  • Configurable retry functionality: The connector allows you to customize retry settings based on your requirements.

For more information and examples to use with the Confluent Cloud API for Connect, see the Confluent Cloud API for Connect Usage Examples section.

Manage CSFLE

If you plan to enable CSFLE for the HTTP Sink V2 connector, take care of the following sensitive information that may get written to your Kafka topics:

Warning

  • Error topic: The error topic may include sensitive data from the API response.

Limitations

Be sure to review the following information.

Manage custom offsets

You can manage the offsets for this connector. Offsets provide information on the point in the system from which the connector is accessing data. For more information, see Manage Offsets for Fully-Managed Connectors in Confluent Cloud.

To manage offsets:

Chaining offset mode

This section describes the configuration used to apply chaining offset mode for all custom offset functionalities.

To get the current offset, make a GET request that specifies the environment, Kafka cluster, and connector name.

curl -X GET \
  'https://api.confluent.cloud/api/connect/v1/environments/<env-id>/clusters/<lkc-id>/connectors/<connector-name>/offsets' \
  -H 'Content-Type: application/json' \
  -H 'Authorization: Basic <base64(api key:api secret)>' | jq
Copy

Response

Successful calls return HTTP 200 with a JSON payload that describes the offset.

{
   "id": "lcc-devc61j0j6",
   "name": "HttpSourceV2ES",
   "offsets": [
      {
         "partition": {
           "url": "http://35.247.11.62:9200/inventory/_search"
         },
         "offset": {
           "offset": "1647948089985"
         }
      },
      {
         "partition": {
           "url": "http://35.247.11.62:9200/product/_search"
         },
         "offset": {
           "offset": "1647948089985"
         }
      },
      {
         "partition": {
           "url": "http://35.247.11.62:9200/order/_search"
         },
         "offset": {
           "offset": "1647948089985"
         }
      }
   ],
   "metadata": {
      "observed_at": "2024-04-09T09:59:36.678750446Z"
   }
}
Copy

Offset structure

Here, there are three partitions because three APIs are configured (three different topics for each API). partition is the key-value pair of url, which is a combination of http.api.base.url plus api[i].http.api.path from the connector configuration. offset is the key-value pair of offset and the value of the api[i].http.offset.json.pointer field for the source record value being processed.

For example, a source record looks similar to:

{
   "_index": "order",
   "_id": "1NSzDY4BSCpsoSyj5kz8",
   "_score": null,
   "_source": {
      "name": "Name2",
      "time": "1647948089985"
   },
   "sort": [
      1647948089985
   ]
}
Copy

The configured value of api[i].http.offset.json.pointer is /sort/0. In the source record, this value is 1647948089985 which is also the offset value in the GET connector API result.

Simple incrementing offset mode

This section describes the configuration used to apply simple incrementing offset mode for all custom offset functionalities.

To get the current offset, make a GET request that specifies the environment, Kafka cluster, and connector name.

curl -X GET \
  'https://api.confluent.cloud/api/connect/v1/environments/env-zpj85d/clusters/lkc-abcd123/connectors/HttpSourceV2SimpleIncrementing/offsets' \
  -H 'Content-Type: application/json' \
  -H 'Authorization: Basic UlJDWU5NVkU1NlBGMk5DRjpCWHF0ZmZaRXF5eDFxbm56bGlYQmx5b21vMXFQQ1JGdHZXZzc3SlFvaTcwTVdRZnFtZ3hEdzJ2VEZ5MEFCbjQr' | jq
Copy

Response

{
   "id": "lcc-devcoj89zo",
   "name": "HttpSourceV2SimpleIncrementing",
   "offsets": [
     {
       "partition": {
         "url": "https://api.github.com/repos/apache/kafka/issues/${offset}"
       },
       "offset": {
         "offset": "244"
       }
     },
     {
       "partition": {
         "url": "https://api.github.com/repos/apache/airflow/issues/${offset}"
       },
       "offset": {
         "offset": "244"
       }
     }
   ],
   "metadata": {
     "observed_at": "2024-04-10T10:18:50.218012085Z"
   }
}
Copy

Offset structure

In this example, there are two partitions since there are 2 APIs configured (two different topics for each API). partition is the key-value pair of url, which is a combination of http.api.base.url plus api[i].http.api.path from the connector configuration. offset is the key-value pair of offset and api[i].http.initial.offset plus the number of records processed incrementally minus one.

Cursor pagination offset mode

This section describes the configuration used to apply cursor pagination offset mode for all custom offset functionalities.

To get the current offset, make a GET request that specifies the environment, Kafka cluster, and connector name.

curl -X GET \
  'https://api.confluent.cloud/api/connect/v1/environments/env-zpj85d/clusters/lkc-abcd123/connectors/HttpSourceV2Zendesk/offsets' \
  -H 'Content-Type: application/json' \
  -H 'Authorization: Basic UlJDWU5NVkU1NlBGMk5DRjpCWHF0ZmZaRXF5eDFxbm56bGlYQmx5b21vMXFQQ1JGdHZXZzc3SlFvaTcwTVdRZnFtZ3hEdzJ2VEZ5MEFCbjQr' | jq
Copy

Response

{
   "id": "lcc-devcxv93jq",
   "name": "HttpSourceV2Zendesk",
   "offsets": [
      {
            "partition": {
               "url": "https://example.com/api/v2/users/18266623893394/tickets/assigned"
            },
            "offset": {
               "offset": "eyJvIjoibmljZV9pZCIsInYiOiJhUmtBQUFBQUFBQUEifQ=="
            }
      }
   ],
   "metadata": {
      "observed_at": "2024-04-12T13:24:53.218799432Z"
   }
}
Copy

Offset structure

partition is the key-value pair of url, which is a combination of http.api.base.url plus api[i].http.api.path from the connector configuration. offset is the key-value pair of offset and api[i].http.next.page.json.pointer value of the source record.

The sample record looks similar to:

{
   "url": "https://example.com/api/v2/tickets/25.json",
   "id": 25,
   "external_id": null,
   "via": {
     "channel": "sample_ticket",
     "source": {
       "from": {},
       "to": {},
       "rel": null
      }
   },
   "created_at": "2024-04-12T04:56:46Z",
   "updated_at": "2024-04-12T04:56:46Z",
   "generated_timestamp": 1712897807,
   "type": null,
   "subject": "SAMPLE TICKET: Gift card expiring",
   "raw_subject": "SAMPLE TICKET: Gift card expiring",
   "description": "Hey there, I was lucky enough to receive a gift card from a friend as a housewarming gift. Small problem, I’ve been so swamped with the move I totally forgot about it until now and it expires in a week!\n\nCan you extend the expiration date?\n\nHelp,\nLuka Jensen",
   "priority": "normal",
   "status": "open",
   "recipient": null,
   "requester_id": 18266648849426,
   "submitter_id": 18266648849426,
   "assignee_id": 18266623893394,
   "organization_id": null,
   "group_id": 18266630261906,
   "collaborator_ids": [],
   "follower_ids": [],
   "email_cc_ids": [],
   "forum_topic_id": null,
   "problem_id": null,
   "has_incidents": false,
   "is_public": true,
   "due_at": null,
   "tags": [
     "gift_cards",
     "sample_ticket"
   ],
   "custom_fields": [
     {
       "id": 18266619894546,
       "value": null
     }
   ],
   "satisfaction_rating": null,
   "sharing_agreement_ids": [],
   "custom_status_id": 18266630258322,
   "followup_ids": [],
   "ticket_form_id": 18266615254674,
   "brand_id": 18266615288210,
   "allow_channelback": false,
   "allow_attachments": true,
   "from_messaging_channel": false,
   "meta": {
     "has_more": false,
     "after_cursor": "eyJvIjoibmljZV9pZCIsInYiOiJhUmtBQUFBQUFBQUEifQ==",
     "before_cursor": "eyJvIjoibmljZV9pZCIsInYiOiJhUUVBQUFBQUFBQUEifQ=="
   },
   "links": {
     "prev": "https://example.com/api/v2/users/18266623893394/tickets/assigned.json?page%5Bbefore%5D=eyJvIjoibmljZV9pZCIsInYiOiJhUUVBQUFBQUFBQUEifQ%3D%3D",
     "next": "https://example.com/api/v2/users/18266623893394/tickets/assigned.json?page%5Bafter%5D=eyJvIjoibmljZV9pZCIsInYiOiJhUmtBQUFBQUFBQUEifQ%3D%3D"
   }
}
Copy

http.next.page.json.pointer has been configured to /meta/after_cursor, which is also the offset value for each record.

Quick Start

Use this quick start to get up and running with the Confluent Cloud HTTP Source V2 connector.

Prerequisites

  • Authorized access to a Confluent Cloud cluster on Amazon Web Services (AWS), Microsoft Azure (Azure), or Google Cloud).
  • The Confluent CLI installed and configured for the cluster. For help, see Install the Confluent CLI.
  • Schema Registry must be enabled to use a Schema Registry-based format (for example, Avro, JSON Schema, or Protobuf). For more information, see Schema Registry Enabled Environments.
  • OpenAPI Specification file version 3.0.
  • Relevant authentication credentials for both Kafka and your data system.
  • At least one source Kafka topic must exist in your Confluent Cloud cluster before creating the Source connector.

Using the Confluent Cloud Console

Step 1: Launch your Confluent Cloud cluster

See the Quick Start for Confluent Cloud for installation instructions.

Step 2: Add a connector

In the left navigation menu, click Connectors. If you already have connectors in your cluster, click + Add connector.

Step 3: Select your connector

Click the HTTP Source V2 connector card.

HTTP Source V2 Connector Card

Step 4: Enter the connector details

Note

  • Ensure you have all your prerequisites completed.
  • An asterisk ( * ) designates a required entry.

At the Add HTTP Source V2 Connector screen, complete the following:

  1. Provide the connector name in the Connector name field.
  2. Add the OpenAPI specification file (OAS 3.0 or higher) by adding a URL endpoint or by uploading a YAML/JSON formatted specification file. Note that you can convert Swagger 1.x or 2.0 definitions to OpenAPI 3.0 using the Swagger Converter.
    • To add a URL endpoint, enter the URL in the Add via URL field. Note that the maximum file size is 3 MB.
    • To upload a YAML/JSON formatted specification file, select Add a file, then click Upload file to upload the file. Note that the maximum file size is 1 MB.
  3. Select the Output Kafka record value format (data coming from the connector): AVRO, JSON_SR (JSON Schema), or PROTOBUF. A valid schema must be available in Schema Registry to use a schema-based message format (for example, Avro, JSON Schema, or Protobuf). For more information, see Schema Registry Enabled Environments.

Step 5: Check for records

Verify that records are being produced at the endpoint.

For more information and examples to use with the Confluent Cloud API for Connect, see the Confluent Cloud API for Connect Usage Examples section.

Tip

When you launch a connector, a Dead Letter Queue topic is automatically created. See View Connector Dead Letter Queue Errors in Confluent Cloud for details.

Using the Confluent CLI

To set up and run the connector using the Confluent CLI, complete the following steps, but ensure you have met all prerequisites.

Step 1: List the available connectors

Enter the following command to list available connectors:

confluent connect plugin list
Copy

Step 2: List the connector configuration properties

Enter the following command to show the connector configuration properties:

confluent connect plugin describe <connector-plugin-name>
Copy

The command output shows the required and optional configuration properties.

Step 3: Create the connector configuration file

Create a JSON file that contains the connector configuration properties. The following example shows the required connector properties.

{
  "config": {
    "connector.class": "HttpSourceV2",
    "name": "HttpSourceV2Connector_0",
    "kafka.auth.mode": "KAFKA_API_KEY",
    "kafka.api.key": "<my-kafka-api-key>",
    "kafka.api.secret": "<my-kafka-api-secret>",
    "schema.context.name": "default",
    "value.subject.name.strategy": "TopicNameStrategy",
    "output.data.format": "AVRO",
    "tasks.max": "1",
    "behavior.on.error": "IGNORE",
    "http.api.base.url": "http://example.com/absenceManagement/v1",
    "auth.type": "NONE",
    "https.ssl.enabled": "false",
    "apis.num": "1",
    "api1.http.request.method": "GET",
    "api1.http.connect.timeout.ms": "30000",
    "api1.http.request.timeout.ms": "30000",
    "api1.http.offset.mode": "SIMPLE_INCREMENTING",
    "api1.max.retries": "5",
    "api1.retry.backoff.policy": "EXPONENTIAL_WITH_JITTER",
    "api1.retry.backoff.ms": "3000",
    "api1.retry.on.status.codes": "400-",
    "api1.http.request.headers.separator": "|",
    "api1.http.request.parameters.separator": "&",
    "api1.request.interval.ms": "60000",
    "api1.http.path.parameters.separator": "|",
    "api1.test.api": "false",
  }
}
Copy

Note the following property definitions:

  • "connector.class": Identifies the connector plugin name.
  • "output.data.format": Sets the output Kafka record value format (data coming from the connector). Valid entries are: AVRO, JSON_SR (JSON Schema), or PROTOBUF. Note that you must have Confluent Cloud Schema Registry configured when using a schema-based format (for example, Avro).
  • "name": Sets a name for your new connector.
  • "kafka.auth.mode": Identifies the connector authentication mode you want to use. There are two options: SERVICE_ACCOUNT or KAFKA_API_KEY (the default). To use an API key and secret, specify the configuration properties kafka.api.key and kafka.api.secret, as shown in the example configuration (above). To use a service account, specify the Resource ID in the property kafka.service.account.id=<service-account-resource-ID>. To list the available service account resource IDs, use the following command:

    confluent iam service-account list
    
    Copy

    For example:

    confluent iam service-account list
    
       Id     | Resource ID |       Name        |    Description
    +---------+-------------+-------------------+-------------------
       123456 | sa-l1r23m   | sa-1              | Service account 1
       789101 | sa-l4d56p   | sa-2              | Service account 2
    
    Copy
  • "http.request.method": Enter an HTTP request method: GET and POST. Defaults to GET.

Note

(Optional) To enable CSFLE for data encryption, specify the following properties:

  • csfle.enabled: Flag to indicate whether the connector honors CSFLE rules.
  • sr.service.account.id: A Service Account to access the Schema Registry and associated encryption rules or keys with that schema.

For more information on CSFLE setup, see Manage CSFLE for connectors.

Single Message Transforms: For details about adding SMTs using the CLI, see the Single Message Transforms (SMT) documentation. For all property values and descriptions, see Configuration Properties.

Step 3: Load the properties file and create the connector

Enter the following command to load the configuration and start the connector:

confluent connect cluster create --config-file <file-name>.json
Copy

For example:

confluent connect cluster create --config-file http-source-v2-config.json
Copy

Example output:

Created connector HttpSourceV2Connector_0 lcc-do6vzd
Copy

Step 4: Check the connector status

Enter the following command to check the connector status:

confluent connect cluster list
Copy

Example output:

ID           |             Name              | Status  | Type   | Trace |
+------------+-------------------------------+---------+--------+-------+
lcc-do6vzd   | HttpSourceV2Connector_0       | RUNNING | Source |       |
Copy

Step 5: Check for records

Verify that records are populating the endpoint.

For more information and examples to use with the Confluent Cloud API for Connect, see the Confluent Cloud API for Connect Usage Examples section.

Tip

When you launch a connector, a Dead Letter Queue topic is automatically created. See View Connector Dead Letter Queue Errors in Confluent Cloud for details.

Test API

Use the Test API functionality to test the API with a sample record and view the logs directly in the Confluent Cloud user interface.

Important

  • This feature is only available for publicly accessible endpoints.
  • Invoking the Test API on an API may change data on the end system, depending on the API’s behavior.

When using this feature with the HTTP Source V2 connector, configure the following required parameter in the user interface: ${offset}. This is the offset that the connector should read from. The connector uses this value to replace the ${offset} template variable wherever it is configured.

HTTP Source V2 Test API

API Chaining

API Chaining refers to obtaining the desired data, by navigating through multiple APIs. The response from the first API determines the value for the template placeholders in the URL for the next API, thereby creating a ‘chain.’ For example, let’s assume there is a SaaS system that supports the following APIs:

Parent API

  GET /users : fetches all the users registered in the service

Child API

  GET /users/${id}/places : fetches all the places visited by a particular user
Copy

If the requirement is to create a connector that captures all the places visited by registered users in the system, the steps would broadly include:

  1. Retrieve all users in the system using the /users endpoint.
  2. Obtain all places visited by calling the /users/${id}/places API for each user fetched in the first step.

The configuration property, api.chaining.parent.child.relationship can be used to define the relationship between the APIs. For more information, see Configuration Properties.

Considerations

Important points to note with respect to API Chaining:

  • This feature can only be used with API/CLI/TF.
  • When this feature is enabled, an additional metadata topic is created in your cluster, and you are responsible for the associated costs.
  • The value for the placeholders in the child API path (like ${id} in above example) and the offset field values are stored in the metadata topic in unencrypted form. If you are using CSFLE with the HTTP Source V2 connector, you need to ensure that the placeholder and the offset fields do not contain Personally Identifiable Information (PII) data.
  • Currently, only one level of chaining is supported. This means that a child API can not have a child of its own.

Limitations

  • The ordering between the records may not be maintained in API chaining.
  • The connector may produce duplicate records in the Kafka topic.
  • Custom offsets are not supported in API chaining (for both parent and child APIs).

Permissions

You need to provide permissions for additional metadata topic and consumer group created as part of API chaining:

  1. Metadata topic is prefixed with HTTPv2-work-queue-.
  2. Consumer group is prefixed with HTTPv2-work-queue-consumers-.

SNAPSHOT_PAGINATION mode

Some API’s doesn’t allow offset pagination (like /api/path/${offset}), instead it provides a static URL which can have incremental records over time as shown in the example below:

GET /api/path/users at time t1       → R1 to R10
GET /api/path/users at time t2 (>t1) → R1 to R20
GET /api/path/users at time t3 (>t2) → R1 to R27
Copy

In such cases, you can use SNAPSHOT_PAGINATION mode to paginate the data. You can use the api<index number>.http.offset.mode configuration to set its value.

Note

All the records should be in sorted order according to some unique key. The same key is used to set under api<index number>.http.offset.json.pointer configuration.

Configuration Properties

Use the following configuration properties with the fully-managed HTTP V2 Source connector. When configuring your APIs, use the configuration properties listed in the API-1 Config section. You’ll just need to update the prefix name for each of the configurations accordingly. For example, for the http.request.headers parameter, it would look similar to: api<index number>.http.request.headers (for example, api2.http.request.headers, api3.http.request.headers, and so forth)

How should we connect to your data?

name

Sets a name for your connector.

  • Type: string
  • Valid Values: A string at most 64 characters long
  • Importance: high

Kafka Cluster credentials

kafka.auth.mode

Kafka Authentication mode. It can be one of KAFKA_API_KEY or SERVICE_ACCOUNT. It defaults to KAFKA_API_KEY mode.

  • Type: string
  • Default: KAFKA_API_KEY
  • Valid Values: KAFKA_API_KEY, SERVICE_ACCOUNT
  • Importance: high
kafka.api.key

Kafka API Key. Required when kafka.auth.mode==KAFKA_API_KEY.

  • Type: password
  • Importance: high
kafka.service.account.id

The Service Account that will be used to generate the API keys to communicate with Kafka Cluster.

  • Type: string
  • Importance: high
kafka.api.secret

Secret associated with Kafka API key. Required when kafka.auth.mode==KAFKA_API_KEY.

  • Type: password
  • Importance: high

Schema Config

schema.context.name

Add a schema context name. A schema context represents an independent scope in Schema Registry. It is a separate sub-schema tied to topics in different Kafka clusters that share the same Schema Registry instance. If not used, the connector uses the default schema configured for Schema Registry in your Confluent Cloud environment.

  • Type: string
  • Default: default
  • Importance: medium

Output messages

output.data.format

Sets the output Kafka record value format. Valid entries are AVRO, JSON_SR, or PROTOBUF. Please configure Confluent Cloud Schema Registry.

  • Type: string
  • Default: JSON_SR
  • Importance: high

Number of tasks for this connector

tasks.max

Maximum number of tasks for the connector.

  • Type: int
  • Valid Values: [1,…]
  • Importance: high

Behavior on error

behavior.on.error

Error handling behavior setting for handling error response from HTTP requests.

  • Type: string
  • Default: FAIL
  • Importance: low

Authentication

http.api.base.url

The HTTP API Base URL. For example: http://example.com/absenceManagement/v1.

  • Type: string
  • Importance: high
auth.type

Authentication type of the endpoint. Valid values are NONE, BASIC, OAUTH2 (Client Credentials grant type only), BEARER.

  • Type: string
  • Default: NONE
  • Importance: high
connection.user

The username to be used with an endpoint requiring basic authentication.

  • Type: string
  • Importance: medium
connection.password

The password to be used with an endpoint requiring basic authentication.

  • Type: password
  • Importance: medium
bearer.token

The bearer authentication token to be used with an endpoint requiring bearer token based authentication.

  • Type: password
  • Importance: medium
oauth2.token.url

The URL to be used for fetching the OAuth2 token. Client Credentials is the only supported grant type.

  • Type: string
  • Importance: medium
oauth2.client.id

The client id used when fetching the OAuth2 token.

  • Type: string
  • Importance: medium
oauth2.client.secret

The client secret used when fetching the OAuth2 token.

  • Type: password
  • Importance: medium
oauth2.token.property

The name of the property containing the OAuth2 token returned by the OAuth2 token URL (defaults to access_token).

  • Type: string
  • Default: access_token
  • Importance: medium
oauth2.client.scope

The scope parameter sent to the service when fetching the OAuth2 token.

  • Type: string
  • Default: any
  • Importance: medium
oauth2.client.auth.mode

Specifies how to encode client_id and client_secret in the OAuth2 authorization request. If set to header, the credentials are encoded as an ‘Authorization: Basic <base-64 encoded client_id:client_secret>’ HTTP header. If set to ‘url’, then client_id and client_secret are sent as URL encoded parameters.

  • Type: string
  • Default: header
  • Importance: medium
oauth2.client.headers

HTTP headers to be included in the OAuth2 client endpoint. Individual headers should be separated by |

  • Type: password
  • Importance: low
https.ssl.enabled

Whether or not to connect to the endpoint via SSL.

  • Type: boolean
  • Default: false
  • Importance: medium
https.ssl.keystorefile

The key store containing the server certificate.

  • Type: password
  • Default: [hidden]
  • Importance: low
https.ssl.keystore.password

The store password for the key store file.

  • Type: password
  • Importance: high
https.ssl.key.password

The password for the private key in the key store file.

  • Type: password
  • Importance: high
https.ssl.truststorefile

The trust store containing a server CA certificate.

  • Type: password
  • Default: [hidden]
  • Importance: high
https.ssl.truststore.password

The trust store password containing a server CA certificate.

  • Type: password
  • Importance: high
https.ssl.protocol

The protocol to use for SSL connections

  • Type: string
  • Default: TLSv1.3
  • Importance: medium

APIs

apis.num

The number of http(s) APIs to configure. This value should be less than or equal to 15

  • Type: int
  • Default: 1
  • Importance: high
api.chaining.parent.child.relationship

Comma separated list of parent-child relationship in case of API Chaining. For ex - If api1 is the parent API, and api2,api3 are the child APIs of api1, then the value will be set to api1::api2,api1::api3.

  • Type: string
  • Default: “”
  • Importance: high

Reporter

report.errors.as

Dictates the content of records produced to the error topic. If set to Error string the value would be a human readable string describing the failure. The value will include some or all of the following information if available: http response code, reason phrase, submitted payload, url, response content, exception and error message. If set to http_response, the value would be the plain response content for the request which failed to write the record. In both modes, any information about the failure will also be included in the error records headers.

  • Type: string
  • Default: Error string
  • Importance: low

Additional Configs

header.converter

The converter class for the headers. This is used to serialize and deserialize the headers of the messages.

  • Type: string
  • Importance: low
producer.override.compression.type

The compression type for all data generated by the producer.

  • Type: string
  • Importance: low
producer.override.linger.ms

The producer groups together any records that arrive in between request transmissions into a single batched request. More details can be found in the documentation: https://docs.confluent.io/platform/current/installation/configuration/producer-configs.html#linger-ms.

  • Type: long
  • Valid Values: [100,…,1000]
  • Importance: low
value.converter.allow.optional.map.keys

Allow optional string map key when converting from Connect Schema to Avro Schema. Applicable for Avro Converters.

  • Type: boolean
  • Importance: low
value.converter.auto.register.schemas

Specify if the Serializer should attempt to register the Schema.

  • Type: boolean
  • Importance: low
value.converter.connect.meta.data

Allow the Connect converter to add its metadata to the output schema. Applicable for Avro Converters.

  • Type: boolean
  • Importance: low
value.converter.enhanced.avro.schema.support

Enable enhanced schema support to preserve package information and Enums. Applicable for Avro Converters.

  • Type: boolean
  • Importance: low
value.converter.enhanced.protobuf.schema.support

Enable enhanced schema support to preserve package information. Applicable for Protobuf Converters.

  • Type: boolean
  • Importance: low
value.converter.flatten.unions

Whether to flatten unions (oneofs). Applicable for Protobuf Converters.

  • Type: boolean
  • Importance: low
value.converter.generate.index.for.unions

Whether to generate an index suffix for unions. Applicable for Protobuf Converters.

  • Type: boolean
  • Importance: low
value.converter.generate.struct.for.nulls

Whether to generate a struct variable for null values. Applicable for Protobuf Converters.

  • Type: boolean
  • Importance: low
value.converter.int.for.enums

Whether to represent enums as integers. Applicable for Protobuf Converters.

  • Type: boolean
  • Importance: low
value.converter.latest.compatibility.strict

Verify latest subject version is backward compatible when use.latest.version is true.

  • Type: boolean
  • Importance: low
value.converter.object.additional.properties

Whether to allow additional properties for object schemas. Applicable for JSON_SR Converters.

  • Type: boolean
  • Importance: low
value.converter.optional.for.nullables

Whether nullable fields should be specified with an optional label. Applicable for Protobuf Converters.

  • Type: boolean
  • Importance: low
value.converter.optional.for.proto2

Whether proto2 optionals are supported. Applicable for Protobuf Converters.

  • Type: boolean
  • Importance: low
value.converter.scrub.invalid.names

Whether to scrub invalid names by replacing invalid characters with valid characters. Applicable for Avro and Protobuf Converters.

  • Type: boolean
  • Importance: low
value.converter.use.latest.version

Use latest version of schema in subject for serialization when auto.register.schemas is false.

  • Type: boolean
  • Importance: low
value.converter.use.optional.for.nonrequired

Whether to set non-required properties to be optional. Applicable for JSON_SR Converters.

  • Type: boolean
  • Importance: low
value.converter.wrapper.for.nullables

Whether nullable fields should use primitive wrapper messages. Applicable for Protobuf Converters.

  • Type: boolean
  • Importance: low
value.converter.wrapper.for.raw.primitives

Whether a wrapper message should be interpreted as a raw primitive at root level. Applicable for Protobuf Converters.

  • Type: boolean
  • Importance: low
errors.tolerance

Use this property if you would like to configure the connector’s error handling behavior. WARNING: This property should be used with CAUTION for SOURCE CONNECTORS as it may lead to dataloss. If you set this property to ‘all’, the connector will not fail on errant records, but will instead log them (and send to DLQ for Sink Connectors) and continue processing. If you set this property to ‘none’, the connector task will fail on errant records.

  • Type: string
  • Default: none
  • Importance: low
key.converter.key.subject.name.strategy

How to construct the subject name for key schema registration.

  • Type: string
  • Default: TopicNameStrategy
  • Importance: low
value.converter.decimal.format

Specify the JSON/JSON_SR serialization format for Connect DECIMAL logical type values with two allowed literals:

BASE64 to serialize DECIMAL logical types as base64 encoded binary data and

NUMERIC to serialize Connect DECIMAL logical type values in JSON/JSON_SR as a number representing the decimal value.

  • Type: string
  • Default: BASE64
  • Importance: low
value.converter.flatten.singleton.unions

Whether to flatten singleton unions. Applicable for Avro and JSON_SR Converters.

  • Type: boolean
  • Default: false
  • Importance: low
value.converter.ignore.default.for.nullables

When set to true, this property ensures that the corresponding record in Kafka is NULL, instead of showing the default column value. Applicable for AVRO,PROTOBUF and JSON_SR Converters.

  • Type: boolean
  • Default: false
  • Importance: low
value.converter.reference.subject.name.strategy

Set the subject reference name strategy for value. Valid entries are DefaultReferenceSubjectNameStrategy or QualifiedReferenceSubjectNameStrategy. Note that the subject reference name strategy can be selected only for PROTOBUF format with the default strategy being DefaultReferenceSubjectNameStrategy.

  • Type: string
  • Default: DefaultReferenceSubjectNameStrategy
  • Importance: low
value.converter.value.subject.name.strategy

Determines how to construct the subject name under which the value schema is registered with Schema Registry.

  • Type: string
  • Default: TopicNameStrategy
  • Importance: low

API-1 Configs

api1.http.api.path

The HTTP API path together with the ‘http.api.base.url’ will form the complete HTTP(S) URL. This path can be templated with offset information. For example: /resource1/${offset} where ${offset} will be substituted with the offset generated from the previous request’s response (or if it’s the first request, from ‘http.initial.offset’).

  • Type: string
  • Importance: high
api1.topics

The name of the Kafka topics to publish data to.

  • Type: string
  • Importance: high
api1.http.request.headers

HTTP headers to be included in each request. Header names and values should be separated by :. Distinct headers should be separated by |. For example: From:abcxyz@confluent.io|Content-Length:348.

  • Type: string
  • Importance: medium
api1.http.request.method

HTTP Request Method. Valid options are GET and POST.

  • Type: string
  • Default: GET
  • Importance: high
api1.http.request.parameters

HTTP parameters to be added to each request. Parameter names and values should be separated by =. Distinct parameters should be separated by &. Parameter values can be templated with offset information (for example: search_after=${offset}) where ${offset} will be substituted with the offset generated from the previous request’s response (or if it’s the first request, from ‘http.initial.offset’). The parameters are only set if ‘http.request.method’ = GET.

  • Type: string
  • Importance: medium
api1.http.connect.timeout.ms

The time in milliseconds to wait for a connection to be established

  • Type: int
  • Default: 30000 (30 seconds)
  • Valid Values: [1,…,600000]
  • Importance: medium
api1.http.request.body

The payload to be sent along with each HTTP request. The value can be templated with offset (for example: search_after: ${offset}) where ${offset} will be substituted with the offset generated from the previous request’s response (or if it’s the first request, from ‘http.initial.offset’). The body is only set if ‘http.request.method’ = POST.

  • Type: string
  • Importance: medium
api1.http.request.timeout.ms

The time in milliseconds to wait for a request response from the server

  • Type: int
  • Default: 30000 (30 seconds)
  • Valid Values: [1,…,600000]
  • Importance: medium
api1.http.offset.mode

This config indicates how offsets are computed and how requests are generated. If set to SIMPLE_INCREMENTING, the ${offset} used to generate requests is simply the previous offset (or http.initial.offset) incremented by 1 per sourced record. In this mode, http.initial.offset needs to be set to an integer value. If set to CHAINING, the config ‘http.offset.json.pointer’ needs to be set, and the offset for a record is set to the value for the configured key in the response data. If the value is CURSOR_PAGINATION, then the config ‘http.next.page.json.pointer’ needs to be set and the offset for the last record in each page will be set to the next page value. If set to SNAPSHOT_PAGINATION, only the config ‘http.offset.json.pointer’ needs to be set.

  • Type: string
  • Default: SIMPLE_INCREMENTING
  • Importance: high
api1.max.retries

The maximum number of times to retry on errors before failing the task.

  • Type: int
  • Default: 5
  • Importance: medium
api1.http.initial.offset

The initial offset to be used to generate the first request. This needs to be set if either one or more of the configs - ‘url’, ‘http.request.parameters’, or ‘http.request.body’ contain the template variable ${offset}.

  • Type: string
  • Default: “”
  • Importance: high
api1.retry.backoff.policy

The backoff policy to use in terms of retry - CONSTANT_VALUE or EXPONENTIAL_WITH_JITTER

  • Type: string
  • Default: EXPONENTIAL_WITH_JITTER
  • Importance: medium
api1.http.response.data.json.pointer

The JSON Pointer to the entity in the JSON response containing the actual data that should be written to Kafka as records. The entity can be an array (multiple records in a single response) or an object / scalar value (single record).

  • Type: string
  • Importance: high
api1.retry.backoff.ms

The initial duration in milliseconds to wait following an error before a retry attempt is made. Subsequent backoff attempts can be a constant value or exponential with jitter (can be configured using api*.retry.backoff.policy parameter). Jitter adds randomness to the exponential backoff algorithm to prevent synchronized retries.

  • Type: int
  • Default: 3000 (3 seconds)
  • Valid Values: [100,…]
  • Importance: medium
api1.http.offset.json.pointer

The JSON Pointer to the value in each record that corresponds to the offset for that record (it is relative to ‘http.response.data.json.pointer’). The offset will be available to the subsequent request as ${offset} and it will also be used for checkpointing and recovery in case of connector failures or restarts. This config should only be set if ‘http.offset.mode’ is set to CHAINING.

  • Type: string
  • Importance: medium
api1.retry.on.status.codes

Comma-separated list of HTTP status codes or range of codes to retry on. Ranges are specified with start and optional end code. Range boundaries are inclusive. For instance, 400- includes all codes greater than or equal to 400. 400-500 includes codes from 400 to 500, including 500. Multiple ranges and single codes can be specified together to achieve fine-grained control over retry behavior. For example, 404,408,500- will retry on 404 NOT FOUND, 408 REQUEST TIMEOUT, and all 5xx error codes. Note that some status codes will always be retried, such as unauthorized, timeouts and too many requests.

  • Type: string
  • Default: 400-
  • Importance: medium
api1.http.next.page.json.pointer

The JSON pointer to the value in the response which corresponds to the next page reference (either a page token, a full URL or a URL fragment). This will be stored as the offset and will be available to the subsequent request via the template variable ${offset}. This config should only be set if ‘http.offset.mode’ is set to CURSOR_PAGINATION.

  • Type: string
  • Importance: medium
api1.request.interval.ms

The time in milliseconds to wait between consecutive requests.

  • Type: int
  • Default: 60000 (1 minute)
  • Valid Values: [100,…]
  • Importance: medium
api1.http.path.parameters

HTTP path parameters to be added to the request. Parameter names and values should be separated by :. Distinct parameters should be separated by |. Parameter values can be templated with different template values like ${key}, ${topic}, ${offset} or other field references from kafka record.

  • Type: string
  • Importance: medium

Auto-restart policy

auto.restart.on.user.error

Enable connector to automatically restart on user-actionable errors.

  • Type: boolean
  • Default: true
  • Importance: medium

Suggested Reading

The following blog post provides an introduction to the fully-managed HTTP V2 Source connector and a scenario walkthrough.

Blog post: Optimize SaaS Integration with Fully-Managed HTTP Connectors V2 for Confluent Cloud

Next Steps

For an example that shows fully-managed Confluent Cloud connectors in action with Confluent Cloud ksqlDB, see the Cloud ETL Demo. This example also shows how to use Confluent CLI to manage your resources in Confluent Cloud.

../_images/topology.png