HTTP Source V2 Connector for Confluent Cloud¶
The fully-managed HTTP Source V2 connector for Confluent Cloud integrates Apache Kafka® with an API using HTTP or HTTPS. It allows you to configure one or more APIs seamlessly with an OpenAPI/Swagger specification file, reducing overall configuration time and helping you achieve better performance when compared to the HTTP Source Connector for Confluent Cloud. In this page, you will find all the features the HTTP Source V2 connector offers and discover everything you need to begin using the connector.
Features¶
The HTTP Source V2 connector includes the following features:
- Multiple API path support: The connector allows you to configure up to 15 API paths per connector having the same base URL and authentication mechanism.
- OpenAPI Specification-based configuration: The connector provides seamless configuration through an OpenAPI specification file.
- Secure access and data exchange: The connector supports the following
authentication mechanisms:
- Basic
- Bearer
- OAuth 2.0 Client Credentials grant flow
- API error reporting management: You can configure the connector to notify you when an API error occurs through email or through the Confluent Cloud user interface. You also can configure the connector to ignore when an API error occurs.
- API validation: The connector allows you to test the API using a test record and view the test API response in the Confluent Cloud user interface.
- Template variables: The connector allows you to use the
${offset}
template variable and gives you the ability to substitute template variables in parameters, headers, and body content. - At-least-once delivery: The connector guarantees that records are delivered at-least-once to the Kafka topic.
- Supported data formats: The connector supports Schema Registry-based formats: Avro, JSON Schema, and Protobuf data formats. Schema Registry must be enabled to use a Schema Registry-based format. For more information, see Schema Registry Enabled Environments.
- Schema Registry and Schema Context support: The connector allows you to map an API to a specific schema context so that you can use the schema context feature in different environments.
- Configurable retry functionality: The connector allows you to customize retry settings based on your requirements.
For more information and examples to use with the Confluent Cloud API for Connect, see the Confluent Cloud API for Managed and Custom Connectors section.
Limitations¶
Be sure to review the following information.
- For connector limitations, see HTTP Source V2 Connector limitations.
- If you plan to use one or more Single Message Transformations (SMTs), see SMT Limitations.
- If you plan to use Confluent Cloud Schema Registry, see Schema Registry Enabled Environments.
Manage custom offsets¶
You can manage the offsets for this connector. Offsets provide information on the point in the system from which the connector is accessing data. For more information, see Manage Offsets for Fully-Managed Connectors in Confluent Cloud.
To manage offsets:
- Manage offsets using Confluent Cloud APIs. For more information, see Cluster API reference.
Chaining offset mode¶
This section describes the configuration used to apply chaining offset mode for all custom offset functionalities.
To get the current offset, make a GET
request that specifies the
environment, Kafka cluster, and connector name.
curl -X GET \
'https://api.confluent.cloud/api/connect/v1/environments/<env-id>/clusters/<lkc-id>/connectors/<connector-name>/offsets' \
-H 'Content-Type: application/json' \
-H 'Authorization: Basic <base64(api key:api secret)>' | jq
Response
Successful calls return HTTP 200
with a JSON payload that describes
the offset.
{
"id": "lcc-devc61j0j6",
"name": "HttpSourceV2ES",
"offsets": [
{
"partition": {
"url": "http://35.247.11.62:9200/inventory/_search"
},
"offset": {
"offset": "1647948089985"
}
},
{
"partition": {
"url": "http://35.247.11.62:9200/product/_search"
},
"offset": {
"offset": "1647948089985"
}
},
{
"partition": {
"url": "http://35.247.11.62:9200/order/_search"
},
"offset": {
"offset": "1647948089985"
}
}
],
"metadata": {
"observed_at": "2024-04-09T09:59:36.678750446Z"
}
}
Offset structure
Here, there are three partitions because three APIs are configured (three
different topics for each API). partition
is the key-value pair of
url
, which is a combination of http.api.base.url
plus
api[i].http.api.path
from the connector configuration. offset
is
the key-value pair of offset
and the value of the
api[i].http.offset.json.pointer
field for the source record value
being processed.
For example, a source record looks similar to:
{
"_index": "order",
"_id": "1NSzDY4BSCpsoSyj5kz8",
"_score": null,
"_source": {
"name": "Name2",
"time": "1647948089985"
},
"sort": [
1647948089985
]
}
The configured value of api[i].http.offset.json.pointer
is
/sort/0
. In the source record, this value is 1647948089985
which
is also the offset value in the GET
connector API result.
To update the offset, make a POST
request that specifies the environment,
Kafka cluster, and connector name. Include a JSON payload that specifies new
offset and a patch type.
curl -X POST \
'https://api.confluent.cloud/api/connect/v1/environments/env-zpj85d/clusters/lkc-devcjg7py8/connectors/HttpSourceV2ES/offsets/request' \
-H 'Content-Type: application/json' \
-H 'Authorization: Basic UlJDWU5NVkU1NlBGMk5DRjpCWHF0ZmZaRXF5eDFxbm56bGlYQmx5b21vMXFQQ1JGdHZXZzc3SlFvaTcwTVdRZnFtZ3hEdzJ2VEZ5MEFCbjQr' \
-d '{
"type":"PATCH",
"offsets": [
{
"partition": {
"url": "http://35.247.11.62:9200/inventory/_search"
},
"offset": {
"offset": "1647948089979"
}
},
{
"partition": {
"url": "http://35.247.11.62:9200/product/_search"
},
"offset": {
"offset": "1647948089979"
}
},
{
"partition": {
"url": "http://35.247.11.62:9200/order/_search"
},
"offset": {
"offset": "1647948089979"
}
}
]
}' | jq
Response
{
"id": "lcc-devc61j0j6",
"name": "HttpSourceV2ES",
"offsets": [
{
"partition": {
"url": "http://35.247.11.62:9200/inventory/_search"
},
"offset": {
"offset": "1647948089979"
}
},
{
"partition": {
"url": "http://35.247.11.62:9200/product/_search"
},
"offset": {
"offset": "1647948089979"
}
},
{
"partition": {
"url": "http://35.247.11.62:9200/order/_search"
},
"offset": {
"offset": "1647948089979"
}
}
],
"requested_at": "2024-04-10T07:47:35.999516104Z",
"type": "PATCH"
}
Outcome
The connector sources the record from the source system again from the
source offset 1647948089979
. You can verify this in your Kafka topic.
To get the status of a previous offset request, make a GET
request
that specifies the environment, Kafka cluster, and connector name.
curl -X GET \
'https://api.confluent.cloud/api/connect/v1/environments/env-zpj85d/clusters/lkc-devcjg7py8/connectors/HttpSourceV2ES/offsets/request/status' \
-H 'Content-Type: application/json' \
-H 'Authorization: Basic UlJDWU5NVkU1NlBGMk5DRjpCWHF0ZmZaRXF5eDFxbm56bGlYQmx5b21vMXFQQ1JGdHZXZzc3SlFvaTcwTVdRZnFtZ3hEdzJ2VEZ5MEFCbjQr' | jq
Response
{
"request": {
"id": "lcc-devc61j0j6",
"name": "HttpSourceV2ES",
"offsets": [
{
"partition": {
"url": "http://35.247.11.62:9200/inventory/_search"
},
"offset": {
"offset": "1647948089979"
}
},
{
"partition": {
"url": "http://35.247.11.62:9200/product/_search"
},
"offset": {
"offset": "1647948089979"
}
},
{
"partition": {
"url": "http://35.247.11.62:9200/order/_search"
},
"offset": {
"offset": "1647948089979"
}
}
],
"requested_at": "2024-04-10T07:47:35.999516104Z",
"type": "PATCH"
},
"status": {
"phase": "APPLIED",
"message": "The Connect framework-managed offsets for this connector have been altered successfully. However, if this connector manages offsets externally, they will need to be manually altered in the system that the connector uses."
},
"previous_offsets": [
{
"partition": {
"url": "http://35.247.11.62:9200/inventory/_search"
},
"offset": {
"offset": "1647948089985"
}
},
{
"partition": {
"url": "http://35.247.11.62:9200/product/_search"
},
"offset": {
"offset": "1647948089985"
}
},
{
"partition": {
"url": "http://35.247.11.62:9200/order/_search"
},
"offset": {
"offset": "1647948089985"
}
}
],
"applied_at": "2024-04-10T07:47:38.021730317Z"
}
Outcome
The status endpoint always shows the status of the most recent PATCH/DELETE operation.
To delete the offset, make a POST
request that specifies the
environment, Kafka cluster, and connector name. Include a JSON payload that
specifies the delete type.
curl -X POST \
'https://api.confluent.cloud/api/connect/v1/environments/env-zpj85d/clusters/lkc-devcjg7py8/connectors/HttpSourceV2ES/offsets/request' \
-H 'Content-Type: application/json' \
-H 'Authorization: Basic UlJDWU5NVkU1NlBGMk5DRjpCWHF0ZmZaRXF5eDFxbm56bGlYQmx5b21vMXFQQ1JGdHZXZzc3SlFvaTcwTVdRZnFtZ3hEdzJ2VEZ5MEFCbjQr' \
-d '{
"type":"DELETE"
}' | jq
Response
{
"id": "lcc-devc61j0j6",
"name": "HttpSourceV2ES",
"offsets": [],
"requested_at": "2024-04-10T08:02:28.847218614Z",
"type": "DELETE"
}
Outcome
The connector starts sourcing records from the beginning. You can verify
this in your Kafka topics. You will see an influx of the same messages as
if the connector was created again with the same configurations. Note that
you can also check the status of your DELETE
request using the following
command:
curl -X GET \
'https://api.confluent.cloud/api/connect/v1/environments/env-zpj85d/clusters/lkc-devcjg7py8/connectors/HttpSourceV2ES/offsets/request/status' \
-H 'Content-Type: application/json' \
-H 'Authorization: Basic UlJDWU5NVkU1NlBGMk5DRjpCWHF0ZmZaRXF5eDFxbm56bGlYQmx5b21vMXFQQ1JGdHZXZzc3SlFvaTcwTVdRZnFtZ3hEdzJ2VEZ5MEFCbjQr' | jq
The output should be similar to:
{
"request": {
"id": "lcc-devc61j0j6",
"name": "HttpSourceV2ES",
"offsets": [],
"requested_at": "2024-04-10T08:02:28.847218614Z",
"type": "DELETE"
},
"status": {
"phase": "APPLIED",
"message": "The Connect framework-managed offsets for this connector have been reset successfully. However, if this connector manages offsets externally, they will need to be manually reset in the system that the connector uses."
},
"previous_offsets": [
{
"partition": {
"url": "http://35.247.11.62:9200/inventory/_search"
},
"offset": {
"offset": "1647948089985"
}
},
{
"partition": {
"url": "http://35.247.11.62:9200/product/_search"
},
"offset": {
"offset": "1647948089985"
}
},
{
"partition": {
"url": "http://35.247.11.62:9200/order/_search"
},
"offset": {
"offset": "1647948089985"
}
}
],
"applied_at": "2024-04-10T08:02:30.041866120Z"
}
Simple incrementing offset mode¶
This section describes the configuration used to apply simple incrementing offset mode for all custom offset functionalities.
To get the current offset, make a GET
request that specifies the
environment, Kafka cluster, and connector name.
curl -X GET \
'https://api.confluent.cloud/api/connect/v1/environments/env-zpj85d/clusters/lkc-devcjg7py8/connectors/HttpSourceV2SimpleIncrementing/offsets' \
-H 'Content-Type: application/json' \
-H 'Authorization: Basic UlJDWU5NVkU1NlBGMk5DRjpCWHF0ZmZaRXF5eDFxbm56bGlYQmx5b21vMXFQQ1JGdHZXZzc3SlFvaTcwTVdRZnFtZ3hEdzJ2VEZ5MEFCbjQr' | jq
Response
{
"id": "lcc-devcoj89zo",
"name": "HttpSourceV2SimpleIncrementing",
"offsets": [
{
"partition": {
"url": "https://api.github.com/repos/apache/kafka/issues/${offset}"
},
"offset": {
"offset": "244"
}
},
{
"partition": {
"url": "https://api.github.com/repos/apache/airflow/issues/${offset}"
},
"offset": {
"offset": "244"
}
}
],
"metadata": {
"observed_at": "2024-04-10T10:18:50.218012085Z"
}
}
Offset structure
In this example, there are two partitions since there are 2 APIs
configured (two different topics for each API). partition
is the
key-value pair of url
, which is a combination of
http.api.base.url
plus api[i].http.api.path
from the connector
configuration. offset
is the key-value pair of offset
and
api[i].http.initial.offset
plus the number of records processed
incrementally minus one.
To update the offset, make a POST
request that specifies the
environment, Kafka cluster, and connector name. Include a JSON payload that
specifies new offset and a patch type.
curl -X POST \
'https://api.confluent.cloud/api/connect/v1/environments/env-zpj85d/clusters/lkc-devcjg7py8/connectors/HttpSourceV2SimpleIncrementing/offsets/request' \
-H 'Content-Type: application/json' \
-H 'Authorization: Basic UlJDWU5NVkU1NlBGMk5DRjpCWHF0ZmZaRXF5eDFxbm56bGlYQmx5b21vMXFQQ1JGdHZXZzc3SlFvaTcwTVdRZnFtZ3hEdzJ2VEZ5MEFCbjQr' \
-d '{
"type":"PATCH",
"offsets": [
{
"partition": {
"url": "https://api.github.com/repos/apache/kafka/issues/${offset}"
},
"offset": {
"offset": "10"
}
},
{
"partition": {
"url": "https://api.github.com/repos/apache/airflow/issues/${offset}"
},
"offset": {
"offset": "10"
}
}
]
}' | jq
Response
{
"id": "lcc-devcoj89j9",
"name": "HttpSourceV2SimpleIncrementing",
"offsets": [
{
"partition": {
"url": "https://api.github.com/repos/apache/kafka/issues/${offset}"
},
"offset": {
"offset": "10"
}
},
{
"partition": {
"url": "https://api.github.com/repos/apache/airflow/issues/${offset}"
},
"offset": {
"offset": "10"
}
}
],
"requested_at": "2024-04-10T10:55:48.418477341Z",
"type": "PATCH"
}
Outcome
The connector starts sourcing record from
https://api.github.com/repos/apache/airflow/issues/10
and
https://api.github.com/repos/apache/kafka/issues/10
.
curl -X GET \
'https://api.confluent.cloud/api/connect/v1/environments/env-zpj85d/clusters/lkc-devcjg7py8/connectors/HttpSourceV2SimpleIncrementing/offsets/request/status' \
-H 'Content-Type: application/json' \
-H 'Authorization: Basic UlJDWU5NVkU1NlBGMk5DRjpCWHF0ZmZaRXF5eDFxbm56bGlYQmx5b21vMXFQQ1JGdHZXZzc3SlFvaTcwTVdRZnFtZ3hEdzJ2VEZ5MEFCbjQr' | jq
Response
{
"request": {
"id": "lcc-devcoj89j9",
"name": "HttpSourceV2SimpleIncrementing",
"offsets": [
{
"partition": {
"url": "https://api.github.com/repos/apache/kafka/issues/${offset}"
},
"offset": {
"offset": "10"
}
},
{
"partition": {
"url": "https://api.github.com/repos/apache/airflow/issues/${offset}"
},
"offset": {
"offset": "10"
}
}
],
"requested_at": "2024-04-10T10:55:48.418477341Z",
"type": "PATCH"
},
"status": {
"phase": "APPLIED",
"message": "The Connect framework-managed offsets for this connector have been altered successfully. However, if this connector manages offsets externally, they will need to be manually altered in the system that the connector uses."
},
"previous_offsets": [
{
"partition": {
"url": "https://api.github.com/repos/apache/kafka/issues/${offset}"
},
"offset": {
"offset": "128"
}
},
{
"partition": {
"url": "https://api.github.com/repos/apache/airflow/issues/${offset}"
},
"offset": {
"offset": "129"
}
}
],
"applied_at": "2024-04-10T10:55:49.552437779Z"
}
To delete the offset, make a POST
request that specifies the
environment, Kafka cluster, and connector name. Include a JSON payload that
specifies the delete type.
curl -X POST \
'https://api.confluent.cloud/api/connect/v1/environments/env-zpj85d/clusters/lkc-devcjg7py8/connectors/HttpSourceV2SimpleIncrementing/offsets/request' \
-H 'Content-Type: application/json' \
-H 'Authorization: Basic UlJDWU5NVkU1NlBGMk5DRjpCWHF0ZmZaRXF5eDFxbm56bGlYQmx5b21vMXFQQ1JGdHZXZzc3SlFvaTcwTVdRZnFtZ3hEdzJ2VEZ5MEFCbjQr' \
-d '{
"type":"DELETE"
}' | jq
Response
{
"id": "lcc-devcoj89zo",
"name": "HttpSourceV2SimpleIncrementing",
"offsets": [],
"requested_at": "2024-04-10T10:45:10.890811731Z",
"type": "DELETE"
}
Cursor pagination offset mode¶
This section describes the configuration used to apply cursor pagination offset mode for all custom offset functionalities.
To get the current offset, make a GET
request that specifies the
environment, Kafka cluster, and connector name.
curl -X GET \
'https://api.confluent.cloud/api/connect/v1/environments/env-zpj85d/clusters/lkc-devcjg7py8/connectors/HttpSourceV2Zendesk/offsets' \
-H 'Content-Type: application/json' \
-H 'Authorization: Basic UlJDWU5NVkU1NlBGMk5DRjpCWHF0ZmZaRXF5eDFxbm56bGlYQmx5b21vMXFQQ1JGdHZXZzc3SlFvaTcwTVdRZnFtZ3hEdzJ2VEZ5MEFCbjQr' | jq
Response
{
"id": "lcc-devcxv93jq",
"name": "HttpSourceV2Zendesk",
"offsets": [
{
"partition": {
"url": "https://example.com/api/v2/users/18266623893394/tickets/assigned"
},
"offset": {
"offset": "eyJvIjoibmljZV9pZCIsInYiOiJhUmtBQUFBQUFBQUEifQ=="
}
}
],
"metadata": {
"observed_at": "2024-04-12T13:24:53.218799432Z"
}
}
Offset structure
partition
is the key-value pair of url
, which is a combination of
http.api.base.url
plus api[i].http.api.path
from the connector
configuration. offset
is the key-value pair of offset
and
api[i].http.next.page.json.pointer
value of the source record.
The sample record looks similar to:
{
"url": "https://example.com/api/v2/tickets/25.json",
"id": 25,
"external_id": null,
"via": {
"channel": "sample_ticket",
"source": {
"from": {},
"to": {},
"rel": null
}
},
"created_at": "2024-04-12T04:56:46Z",
"updated_at": "2024-04-12T04:56:46Z",
"generated_timestamp": 1712897807,
"type": null,
"subject": "SAMPLE TICKET: Gift card expiring",
"raw_subject": "SAMPLE TICKET: Gift card expiring",
"description": "Hey there, I was lucky enough to receive a gift card from a friend as a housewarming gift. Small problem, I’ve been so swamped with the move I totally forgot about it until now and it expires in a week!\n\nCan you extend the expiration date?\n\nHelp,\nLuka Jensen",
"priority": "normal",
"status": "open",
"recipient": null,
"requester_id": 18266648849426,
"submitter_id": 18266648849426,
"assignee_id": 18266623893394,
"organization_id": null,
"group_id": 18266630261906,
"collaborator_ids": [],
"follower_ids": [],
"email_cc_ids": [],
"forum_topic_id": null,
"problem_id": null,
"has_incidents": false,
"is_public": true,
"due_at": null,
"tags": [
"gift_cards",
"sample_ticket"
],
"custom_fields": [
{
"id": 18266619894546,
"value": null
}
],
"satisfaction_rating": null,
"sharing_agreement_ids": [],
"custom_status_id": 18266630258322,
"followup_ids": [],
"ticket_form_id": 18266615254674,
"brand_id": 18266615288210,
"allow_channelback": false,
"allow_attachments": true,
"from_messaging_channel": false,
"meta": {
"has_more": false,
"after_cursor": "eyJvIjoibmljZV9pZCIsInYiOiJhUmtBQUFBQUFBQUEifQ==",
"before_cursor": "eyJvIjoibmljZV9pZCIsInYiOiJhUUVBQUFBQUFBQUEifQ=="
},
"links": {
"prev": "https://example.com/api/v2/users/18266623893394/tickets/assigned.json?page%5Bbefore%5D=eyJvIjoibmljZV9pZCIsInYiOiJhUUVBQUFBQUFBQUEifQ%3D%3D",
"next": "https://example.com/api/v2/users/18266623893394/tickets/assigned.json?page%5Bafter%5D=eyJvIjoibmljZV9pZCIsInYiOiJhUmtBQUFBQUFBQUEifQ%3D%3D"
}
}
http.next.page.json.pointer
has been configured to
/meta/after_cursor
, which is also the offset value for each record.
To update the offset, make a POST
request that specifies the
environment, Kafka cluster, and connector name. Include a JSON payload that
specifies new offset and a patch type.
curl -X POST \
'https://api.confluent.cloud/api/connect/v1/environments/env-zpj85d/clusters/lkc-devcjg7py8/connectors/HttpSourceV2Zendesk/offsets/request' \
-H 'Content-Type: application/json' \
-H 'Authorization: Basic UlJDWU5NVkU1NlBGMk5DRjpCWHF0ZmZaRXF5eDFxbm56bGlYQmx5b21vMXFQQ1JGdHZXZzc3SlFvaTcwTVdRZnFtZ3hEdzJ2VEZ5MEFCbjQr' \
-d '{
"type":"PATCH",
"offsets": [
{
"partition": {
"url": "https://example.com/api/v2/users/18266623893394/tickets/assigned"
},
"offset": {
"offset": "eyJvIjoibmljZV9pZCIsInYiOiJhUUVBQUFBQUFBQUEifQ=="
}
}
]
}' | jq
Response
{
"id": "lcc-devcxv93jq",
"name": "HttpSourceV2Zendesk",
"offsets": [
{
"partition": {
"url": "https://example.com/api/v2/users/18266623893394/tickets/assigned"
},
"offset": {
"offset": "eyJvIjoibmljZV9pZCIsInYiOiJhUUVBQUFBQUFBQUEifQ=="
}
}
],
"requested_at": "2024-04-12T13:39:48.497054666Z",
"type": "PATCH"
}
Outcome
The connector starts sourcing from “https://example.com/api/v2/users/18266623893394/tickets/assigned?page[after]=eyJvIjoibmljZV9pZCIsInYiOiJhUUVBQUFBQUFBQUEifQ==”. You can verify this in your topic and see records being processed again.
curl -X GET \
'https://api.confluent.cloud/api/connect/v1/environments/env-zpj85d/clusters/lkc-devcjg7py8/connectors/HttpSourceV2Zendesk/offsets/request/status' \
-H 'Content-Type: application/json' \
-H 'Authorization: Basic UlJDWU5NVkU1NlBGMk5DRjpCWHF0ZmZaRXF5eDFxbm56bGlYQmx5b21vMXFQQ1JGdHZXZzc3SlFvaTcwTVdRZnFtZ3hEdzJ2VEZ5MEFCbjQr' | jq
Response
{
"request": {
"id": "lcc-devcxv93jq",
"name": "HttpSourceV2Zendesk",
"offsets": [
{
"partition": {
"url": "https://example.com/api/v2/users/18266623893394/tickets/assigned"
},
"offset": {
"offset": "eyJvIjoibmljZV9pZCIsInYiOiJhUUVBQUFBQUFBQUEifQ=="
}
}
],
"requested_at": "2024-04-12T13:39:48.497054666Z",
"type": "PATCH"
},
"status": {
"phase": "APPLIED",
"message": "The Connect framework-managed offsets for this connector have been altered successfully. However, if this connector manages offsets externally, they will need to be manually altered in the system that the connector uses."
},
"previous_offsets": [
{
"partition": {
"url": "https://example.com/api/v2/users/18266623893394/tickets/assigned"
},
"offset": {
"offset": "eyJvIjoibmljZV9pZCIsInYiOiJhUmtBQUFBQUFBQUEifQ=="
}
}
],
"applied_at": "2024-04-12T13:39:51.574576416Z"
}
Outcome
Shows the status of the latest PATCH/DELETE operation on offsets.
To delete the offset, make a
POST
request that specifies the environment, Kafka cluster, and connector name. Include a JSON payload that specifies the delete type.curl -X POST \ 'https://api.confluent.cloud/api/connect/v1/environments/env-zpj85d/clusters/lkc-devcjg7py8/connectors/HttpSourceV2Zendesk/offsets/request' \ -H 'Content-Type: application/json' \ -H 'Authorization: Basic UlJDWU5NVkU1NlBGMk5DRjpCWHF0ZmZaRXF5eDFxbm56bGlYQmx5b21vMXFQQ1JGdHZXZzc3SlFvaTcwTVdRZnFtZ3hEdzJ2VEZ5MEFCbjQr' \ -d '{ "type":"DELETE" }' | jqResponse
{ "id": "lcc-devcxv93jq", "name": "HttpSourceV2Zendesk", "offsets": [], "requested_at": "2024-04-12T13:43:30.704714773Z", "type": "DELETE" }
Outcome
Deletes the current offset and starts sourcing records again as if the connector was created again with the same configurations.
Quick Start¶
Use this quick start to get up and running with the Confluent Cloud HTTP Source V2 connector.
Prerequisites¶
- Authorized access to a Confluent Cloud cluster on Amazon Web Services (AWS), Microsoft Azure (Azure), or Google Cloud).
- The Confluent CLI installed and configured for the cluster. For help, see Install the Confluent CLI.
- Schema Registry must be enabled to use a Schema Registry-based format (for example, Avro, JSON Schema, or Protobuf). For more information, see Schema Registry Enabled Environments.
- OpenAPI Specification file version 3.0.
- Relevant authentication credentials for both Kafka and your data system.
- At least one source Kafka topic must exist in your Confluent Cloud cluster before creating the Source connector.
Using the Confluent Cloud Console¶
Step 1: Launch your Confluent Cloud cluster¶
See the Quick Start for Confluent Cloud for installation instructions.
Step 2: Add a connector¶
In the left navigation menu, click Connectors. If you already have connectors in your cluster, click + Add connector.
Step 4: Enter the connector details¶
Note
- Ensure you have all your prerequisites completed.
- An asterisk ( * ) designates a required entry.
At the Add HTTP Source V2 Connector screen, complete the following:
- Provide the connector name in the Connector name field.
- Add the OpenAPI specification file (OAS 3.0 or higher) by adding a URL
endpoint or by uploading a YAML/JSON formatted specification file. Note
that you can convert Swagger 1.x or 2.0 definitions to OpenAPI 3.0
using the Swagger Converter.
- To add a URL endpoint, enter the URL in the Add via URL field. Note that the maximum file size is 3 MB.
- To upload a YAML/JSON formatted specification file, select Add a file, then click Upload file to upload the file. Note that the maximum file size is 1 MB.
- Select the Output Kafka record value format (data coming from the connector): AVRO, JSON_SR (JSON Schema), or PROTOBUF. A valid schema must be available in Schema Registry to use a schema-based message format (for example, Avro, JSON Schema, or Protobuf). For more information, see Schema Registry Enabled Environments.
- Select the way you want to provide Kafka Cluster credentials. You can
choose one of the following options:
- Global Access: Allows your connector to access everything you have access to. With global access, connector access will be linked to your account. This option is not recommended for production.
- Granular access: Limits the access for your connector. You will be able to manage connector access through a service account. This option is recommended for production.
- Use an existing API key: Allows you to enter an API key and secret part you have stored. You can enter an API key and secret (or generate these in the Cloud Console).
- Click Continue.
Server connection.
Enter your API Base URL in the URL field. The HTTP API Base URL. For
example: http://example.com/absenceManagement/v1
.
Authentication.
Enter the following authentication details to use for the server connection. Note that you can change the authentication type of the endpoint by clicking Change method. Supported methods are:
The connector authenticates with a username and password. If you select Basic, enter the following details:
- Username: The username to be used with an endpoint requiring authentication.
- Password: The password to be used with an endpoint requiring authentication.`
Enter the following details:
- Bearer token: The bearer authentication token to be used with an endpoint requiring bearer token-based authentication.
This is the default. The endpoint requires no authentication.
Client Credentials grant flow
The connector authenticates using OAuth credentials. Enter the following details:
- Client ID: The client ID used when fetching the OAuth2 token.
- Client secret: The secret used when fetching the OAuth2 token.
- Client authentication mode: Specifies how to encode the Client
ID and Client Secret in the OAuth2 authorization request. If set to
header
, the credentials are encoded as an'Authorization: Basic <base-64 encoded client_id:client_secret>'
HTTP header. If set tourl
, then Client ID and Client Secret are sent as URL encoded parameters. - Scope: The scope parameter sent when fetching the OAuth2 token.
- Token property name: The name of the property containing the
OAuth2 token returned by the OAuth2 token URL. Defaults to
access_token
. - Token server URL: The URL to be used for fetching the OAuth2 token.
(Optional) To customize SSL for your HTTP URL, enable Customize SSL and configure the following SSL settings accordingly:
- SSL Protocol: Enter the protocol to use for SSL connections.
- Key store: Upload the key store file containing the server certificate.
- Key store password: Enter the password used to access the key store.
- Key password: Enter the password for the private key in the key store file.
- Trust store: Upload the trust store file containing a server CA certificate.
- Trust store password: Enter the trust store password containing a server CA certificate.
Click Authenticate. Follow the authentication steps. If successful, you should see a message similar to “Authenticated successfully”.
Click Continue.
- Select one or more endpoint paths for connector requests. You can
select up to 15 paths. Add any resource IDs or sub-resource IDs as
needed for the API endpoint path. The connector supports
GET
andPOST
operations. - Click Continue.
On the Configuration page, configure the following.
Select topics
Choose the topic(s) you want to get data from. After selecting the desired topic(s), click on the Request configuration tab.
- Under Path variables, configure the HTTP path parameters to
be added to the request. Parameter values can be templated with
different template values like
${key}
,${topic}
,${offset}
or other field references from the Kafka record. - Under Query parameters, configure the HTTP parameters to be added to the request.
- Under Headers, configure the HTTP headers to be included in each request.
- In the Body field, enter the payload to be sent along with the HTTP request.
- Click on the Settings tab.
For Offset mode: Define how offsets should be computed and generated. If set to
Simple incrementing
, the${offset}
used to generate requests is simply the previous offset (or initial offset) incremented by the number of records in the response. In this mode, the initial offset needs to be set to an integer value. If set toChaining
, the configurationhttp.offset.json.pointer
needs to be set, and the offset for a record is set to the value at the JSON pointer in the record data. If set toCursor pagination`
, the configurationhttp.next.page.json.pointer
needs to be set and the offset for the last record in each page will be set to the next page value.For Initial offset, define the offset to be used to generate the first request. This needs to be set if either one or more of the following configurations:
url
,http.request.parameters
, orhttp.request.body
contain the template variable${offset}
.(Optional) Add additional settings, by clicking Show additional settings.
(Optional) Configure the following advanced configurations, and then, click Continue:
Advanced configurations
Schema context
Schema context: Select a schema context to use for this connector, if using a schema-based data format. This property defaults to the Default context, which configures the connector to use the default schema set up for Schema Registry in your Confluent Cloud environment. A schema context allows you to use separate schemas (like schema sub-registries) tied to topics in different Kafka clusters that share the same Schema Registry environment. For example, if you select a non-default context, a Source connector uses only that schema context to register a schema and a Sink connector uses only that schema context to read from. For more information about setting up a schema context, see What are schema contexts and when should you use them?.
Error configurations
Behavior on errors: Select the error handling behavior setting for handling error responses from HTTP requests. Valid options are
Fail connector
andIgnore errors
. This defaults toFail connector
which is recommended.Error record format: Dictates the content of records produced to the error topic. If set to
Error string
the value is a human readable string describing the failure. The value will include some or all of the following information if available: http response code, reason phrase, submitted payload, URL, response content, exception and error message. If set tohttp_response
, the value would be the plain response content for a failed record.
Subject naming strategy
Key: Determines how to construct the subject name under which the key schema is registered with Schema Registry.
Value: Determines how to construct the subject name under which the value schema is registered with Schema Registry. For more details, see subject name strategy topic.
Procession position
Define a specific offset position for this connector to being processing from. If adding a new offset manually, refer to Manage custom offsets.
For all property values and definitions, see Configuration Properties.
(Optional) If you wish to test the API, click on the Test tab, and follow the instructions. Else, continue to the next step.
Click Continue.
200 OK
response. Click
Close. For more help with using the Test API, see the
Test API section- Enter the number of tasks for the connector to use in the Tasks field. This number should be equal to the number of HTTP APIs you configured.
- Click Continue.
Verify the connection details.
Click Launch connector.
The status for the connector should go from Provisioning to Running.
Step 5: Check for records¶
Verify that records are being produced at the endpoint.
For more information and examples to use with the Confluent Cloud API for Connect, see the Confluent Cloud API for Managed and Custom Connectors section.
Tip
When you launch a connector, a Dead Letter Queue topic is automatically created. See Confluent Cloud Dead Letter Queue for details.
Using the Confluent CLI¶
To set up and run the connector using the Confluent CLI, complete the following steps, but ensure you have met all prerequisites.
Step 1: List the available connectors¶
Enter the following command to list available connectors:
confluent connect plugin list
Step 2: List the connector configuration properties¶
Enter the following command to show the connector configuration properties:
confluent connect plugin describe <connector-plugin-name>
The command output shows the required and optional configuration properties.
Step 3: Create the connector configuration file¶
Create a JSON file that contains the connector configuration properties. The following example shows the required connector properties.
{
"config": {
"connector.class": "HttpSourceV2",
"name": "HttpSourceV2Connector_0",
"kafka.auth.mode": "KAFKA_API_KEY",
"kafka.api.key": "<my-kafka-api-key>",
"kafka.api.secret": "<my-kafka-api-secret>",
"schema.context.name": "default",
"value.subject.name.strategy": "TopicNameStrategy",
"output.data.format": "AVRO",
"tasks.max": "1",
"behavior.on.error": "IGNORE",
"http.api.base.url": "http://example.com/absenceManagement/v1",
"auth.type": "NONE",
"https.ssl.enabled": "false",
"apis.num": "1",
"api1.http.request.method": "GET",
"api1.http.connect.timeout.ms": "30000",
"api1.http.request.timeout.ms": "30000",
"api1.http.offset.mode": "SIMPLE_INCREMENTING",
"api1.max.retries": "5",
"api1.retry.backoff.policy": "EXPONENTIAL_WITH_JITTER",
"api1.retry.backoff.ms": "3000",
"api1.retry.on.status.codes": "400-",
"api1.http.request.headers.separator": "|",
"api1.http.request.parameters.separator": "&",
"api1.request.interval.ms": "60000",
"api1.http.path.parameters.separator": "|",
"api1.test.api": "false",
}
}
Note the following property definitions:
"connector.class"
: Identifies the connector plugin name."output.data.format"
: Sets the output Kafka record value format (data coming from the connector). Valid entries are: AVRO, JSON_SR (JSON Schema), or PROTOBUF. Note that you must have Confluent Cloud Schema Registry configured when using a schema-based format (for example, Avro)."name"
: Sets a name for your new connector.
"kafka.auth.mode"
: Identifies the connector authentication mode you want to use. There are two options:SERVICE_ACCOUNT
orKAFKA_API_KEY
(the default). To use an API key and secret, specify the configuration propertieskafka.api.key
andkafka.api.secret
, as shown in the example configuration (above). To use a service account, specify the Resource ID in the propertykafka.service.account.id=<service-account-resource-ID>
. To list the available service account resource IDs, use the following command:confluent iam service-account list
For example:
confluent iam service-account list Id | Resource ID | Name | Description +---------+-------------+-------------------+------------------- 123456 | sa-l1r23m | sa-1 | Service account 1 789101 | sa-l4d56p | sa-2 | Service account 2
"http.request.method"
: Enter an HTTP request method:GET
andPOST
. Defaults toGET
.
Single Message Transforms: For details about adding SMTs using the CLI, see the Single Message Transforms (SMT) documentation. For all property values and descriptions, see Configuration Properties.
Step 3: Load the properties file and create the connector¶
Enter the following command to load the configuration and start the connector:
confluent connect cluster create --config-file <file-name>.json
For example:
confluent connect cluster create --config-file http-source-v2-config.json
Example output:
Created connector HttpSourceV2Connector_0 lcc-do6vzd
Step 4: Check the connector status¶
Enter the following command to check the connector status:
confluent connect cluster list
Example output:
ID | Name | Status | Type | Trace |
+------------+-------------------------------+---------+--------+-------+
lcc-do6vzd | HttpSourceV2Connector_0 | RUNNING | Source | |
Step 5: Check for records¶
Verify that records are populating the endpoint.
For more information and examples to use with the Confluent Cloud API for Connect, see the Confluent Cloud API for Managed and Custom Connectors section.
Tip
When you launch a connector, a Dead Letter Queue topic is automatically created. See Confluent Cloud Dead Letter Queue for details.
Test API¶
Use the Test API functionality to test the API with a sample record and view the logs directly in the Confluent Cloud user interface.
Important
- This feature is only available for publicly accessible endpoints.
- Invoking the Test API on an API may change data on the end system, depending on the API’s behavior.
When using this feature with the HTTP Source V2 connector, configure the
following required parameter in the user interface: ${offset}
. This is the
offset that the connector should read from. The connector uses this value to
replace the ${offset}
template variable wherever it is configured.
Configuration Properties¶
Use the following configuration properties with the fully-managed HTTP V2 Source
connector. When configuring your APIs, use the configuration properties listed
in the API-1 Config section. You’ll just need to update the prefix name for
each of the configurations accordingly. For example, for the
http.request.headers
parameter, it would look similar to: api<index
number>.http.request.headers
(for example, api2.http.request.headers
,
api3.http.request.headers
, and so forth)
How should we connect to your data?¶
name
Sets a name for your connector.
- Type: string
- Valid Values: A string at most 64 characters long
- Importance: high
Kafka Cluster credentials¶
kafka.auth.mode
Kafka Authentication mode. It can be one of KAFKA_API_KEY or SERVICE_ACCOUNT. It defaults to KAFKA_API_KEY mode.
- Type: string
- Default: KAFKA_API_KEY
- Valid Values: KAFKA_API_KEY, SERVICE_ACCOUNT
- Importance: high
kafka.api.key
Kafka API Key. Required when kafka.auth.mode==KAFKA_API_KEY.
- Type: password
- Importance: high
kafka.service.account.id
The Service Account that will be used to generate the API keys to communicate with Kafka Cluster.
- Type: string
- Importance: high
kafka.api.secret
Secret associated with Kafka API key. Required when kafka.auth.mode==KAFKA_API_KEY.
- Type: password
- Importance: high
CSFLE¶
csfle.enabled
Determines whether the connector honours CSFLE rules or not
- Type: boolean
- Default: false
- Importance: high
sr.service.account.id
The Service Account that will be used to generate the API keys to communicate with SR Cluster.
- Type: string
- Default: “”
- Importance: high
Schema Config¶
schema.context.name
Add a schema context name. A schema context represents an independent scope in Schema Registry. It is a separate sub-schema tied to topics in different Kafka clusters that share the same Schema Registry instance. If not used, the connector uses the default schema configured for Schema Registry in your Confluent Cloud environment.
- Type: string
- Default: default
- Importance: medium
value.subject.name.strategy
Determines how to construct the subject name under which the value schema is registered with Schema Registry.
- Type: string
- Default: TopicNameStrategy
- Valid Values: RecordNameStrategy, TopicNameStrategy, TopicRecordNameStrategy
- Importance: medium
Output messages¶
output.data.format
Sets the output Kafka record value format. Valid entries are AVRO, JSON_SR, or PROTOBUF. Please configure Confluent Cloud Schema Registry.
- Type: string
- Default: JSON_SR
- Importance: high
Number of tasks for this connector¶
tasks.max
Maximum number of tasks for the connector.
- Type: int
- Valid Values: [1,…]
- Importance: high
Behavior On Error¶
behavior.on.error
Error handling behavior setting for handling error response from HTTP requests.
- Type: string
- Default: FAIL
- Importance: low
Authentication¶
http.api.base.url
The HTTP API Base URL. For example: http://example.com/absenceManagement/v1.
- Type: string
- Importance: high
auth.type
Authentication type of the endpoint. Valid values are
NONE
,BASIC
,OAUTH2
(Client Credentials grant type only),BEARER
.- Type: string
- Default: NONE
- Importance: high
connection.user
The username to be used with an endpoint requiring basic authentication.
- Type: string
- Importance: medium
connection.password
The password to be used with an endpoint requiring basic authentication.
- Type: password
- Importance: medium
bearer.token
The bearer authentication token to be used with an endpoint requiring bearer token based authentication.
- Type: password
- Importance: medium
oauth2.token.url
The URL to be used for fetching the OAuth2 token. Client Credentials is the only supported grant type.
- Type: string
- Importance: medium
oauth2.client.id
The client id used when fetching the OAuth2 token.
- Type: string
- Importance: medium
oauth2.client.secret
The client secret used when fetching the OAuth2 token.
- Type: password
- Importance: medium
oauth2.token.property
The name of the property containing the OAuth2 token returned by the OAuth2 token URL (defaults to access_token).
- Type: string
- Default: access_token
- Importance: medium
oauth2.client.scope
The scope parameter sent to the service when fetching the OAuth2 token.
- Type: string
- Default: any
- Importance: medium
oauth2.client.auth.mode
Specifies how to encode
client_id
andclient_secret
in the OAuth2 authorization request. If set toheader
, the credentials are encoded as an ‘Authorization: Basic <base-64 encoded client_id:client_secret>’ HTTP header. If set to ‘url’, thenclient_id
andclient_secret
are sent as URL encoded parameters.- Type: string
- Default: header
- Importance: medium
oauth2.client.headers
HTTP headers to be included in the OAuth2 client endpoint. Individual headers should be separated by |
- Type: password
- Importance: low
https.ssl.enabled
Whether or not to connect to the endpoint via SSL.
- Type: boolean
- Default: false
- Importance: medium
https.ssl.keystorefile
The key store containing the server certificate.
- Type: password
- Importance: low
https.ssl.keystore.password
The store password for the key store file.
- Type: password
- Importance: high
https.ssl.key.password
The password for the private key in the key store file.
- Type: password
- Importance: high
https.ssl.truststorefile
The trust store containing a server CA certificate.
- Type: password
- Importance: high
https.ssl.truststore.password
The trust store password containing a server CA certificate.
- Type: password
- Importance: high
https.ssl.protocol
The protocol to use for SSL connections
- Type: string
- Default: TLSv1.3
- Importance: medium
APIs¶
apis.num
The number of http(s) APIs to configure. This value should be less than or equal to 15
- Type: int
- Default: 1
- Importance: high
Reporter¶
report.errors.as
Dictates the content of records produced to the error topic. If set to
Error string
the value would be a human readable string describing the failure. The value will include some or all of the following information if available: http response code, reason phrase, submitted payload, url, response content, exception and error message. If set to http_response, the value would be the plain response content for the request which failed to write the record. In both modes, any information about the failure will also be included in the error records headers.- Type: string
- Default: Error string
- Importance: low
API-1 Configs¶
api1.http.api.path
The HTTP API path together with the ‘http.api.base.url’ will form the complete HTTP(S) URL. This path can be templated with offset information. For example:
/resource1/${offset}
where${offset}
will be substituted with the offset generated from the previous request’s response (or if it’s the first request, from ‘http.initial.offset’).- Type: string
- Importance: high
api1.topics
The name of the Kafka topics to publish data to.
- Type: string
- Importance: high
api1.http.request.headers
HTTP headers to be included in each request. Header names and values should be separated by :. Distinct headers should be separated by |. For example: From:abcxyz@confluent.io|Content-Length:348.
- Type: string
- Importance: medium
api1.http.request.method
HTTP Request Method. Valid options are GET and POST.
- Type: string
- Default: GET
- Importance: high
api1.http.request.parameters
HTTP parameters to be added to each request. Parameter names and values should be separated by
=
. Distinct parameters should be separated by&
. Parameter values can be templated with offset information (for example:search_after=${offset}
) where${offset}
will be substituted with the offset generated from the previous request’s response (or if it’s the first request, from ‘http.initial.offset’). The parameters are only set if ‘http.request.method’ = GET.- Type: string
- Importance: medium
api1.http.connect.timeout.ms
The time in milliseconds to wait for a connection to be established
- Type: int
- Default: 30000 (30 seconds)
- Importance: medium
api1.http.request.body
The payload to be sent along with each HTTP request. The value can be templated with offset (for example:
search_after: ${offset}
) where${offset}
will be substituted with the offset generated from the previous request’s response (or if it’s the first request, from ‘http.initial.offset’). The body is only set if ‘http.request.method’ = POST.- Type: string
- Importance: medium
api1.http.request.timeout.ms
The time in milliseconds to wait for a request response from the server
- Type: int
- Default: 30000 (30 seconds)
- Importance: medium
api1.http.offset.mode
This config indicates how offsets are computed and how requests are generated. If set to
SIMPLE_INCREMENTING
, the ${offset} used to generate requests is simply the previous offset (orhttp.initial.offset
) incremented by 1 per sourced record. In this mode,http.initial.offset
needs to be set to an integer value. If set toCHAINING
, the config ‘http.offset.json.pointer’ needs to be set, and the offset for a record is set to the value for the configured key in the response data. If the value isCURSOR_PAGINATION
, then the config ‘http.next.page.json.pointer’ needs to be set and the offset for the last record in each page will be set to the next page value.- Type: string
- Default: SIMPLE_INCREMENTING
- Importance: high
api1.max.retries
The maximum number of times to retry on errors before failing the task.
- Type: int
- Default: 5
- Importance: medium
api1.http.initial.offset
The initial offset to be used to generate the first request. This needs to be set if either one or more of the configs - ‘url’, ‘http.request.parameters’, or ‘http.request.body’ contain the template variable
${offset}
.- Type: string
- Default: “”
- Importance: high
api1.retry.backoff.policy
The backoff policy to use in terms of retry - CONSTANT_VALUE or EXPONENTIAL_WITH_JITTER
- Type: string
- Default: EXPONENTIAL_WITH_JITTER
- Importance: medium
api1.http.response.data.json.pointer
The JSON Pointer to the entity in the JSON response containing the actual data that should be written to Kafka as records. The entity can be an array (multiple records in a single response) or an object / scalar value (single record).
- Type: string
- Importance: high
api1.retry.backoff.ms
The initial duration in milliseconds to wait following an error before a retry attempt is made. Subsequent backoff attempts can be a constant value or exponential with jitter (can be configured using api*.retry.backoff.policy parameter). Jitter adds randomness to the exponential backoff algorithm to prevent synchronized retries.
- Type: int
- Default: 3000 (3 seconds)
- Valid Values: [100,…]
- Importance: medium
api1.http.offset.json.pointer
The JSON Pointer to the value in each record that corresponds to the offset for that record (it is relative to ‘http.response.data.json.pointer’). The offset will be available to the subsequent request as ${offset} and it will also be used for checkpointing and recovery in case of connector failures or restarts. This config should only be set if ‘http.offset.mode’ is set to
CHAINING
.- Type: string
- Importance: medium
api1.retry.on.status.codes
Comma-separated list of HTTP status codes or range of codes to retry on. Ranges are specified with start and optional end code. Range boundaries are inclusive. For instance, 400- includes all codes greater than or equal to 400. 400-500 includes codes from 400 to 500, including 500. Multiple ranges and single codes can be specified together to achieve fine-grained control over retry behavior. For example, 404,408,500- will retry on 404 NOT FOUND, 408 REQUEST TIMEOUT, and all 5xx error codes. Note that some status codes will always be retried, such as unauthorized, timeouts and too many requests.
- Type: string
- Default: 400-
- Importance: medium
api1.http.next.page.json.pointer
The JSON pointer to the value in the response which corresponds to the next page reference (either a page token, a full URL or a URL fragment). This will be stored as the offset and will be available to the subsequent request via the template variable ${offset}. This config should only be set if ‘http.offset.mode’ is set to
CURSOR_PAGINATION
.- Type: string
- Importance: medium
api1.request.interval.ms
The time in milliseconds to wait between consecutive requests.
- Type: int
- Default: 60000 (1 minute)
- Valid Values: [100,…]
- Importance: medium
api1.http.path.parameters
HTTP path parameters to be added to the request. Parameter names and values should be separated by
:
. Distinct parameters should be separated by|
. Parameter values can be templated with different template values like${key}
,${topic}
,${offset}
or other field references from kafka record.- Type: string
- Importance: medium
Next Steps¶
For an example that shows fully-managed Confluent Cloud connectors in action with Confluent Cloud ksqlDB, see the Cloud ETL Demo. This example also shows how to use Confluent CLI to manage your resources in Confluent Cloud.