HTTP Source Connector for Confluent Cloud¶
Tip
Confluent recommends upgrading to version 2 of this connector if you use the OpenAPI spec to avoid future migration issues. For more information, see HTTP Source V2 Connector for Confluent Cloud.
The fully-managed HTTP Source connector for Confluent Cloud periodically polls for data from an HTTP API and then produces records from that source into Apache Kafka®.
Note
If you require private networking for fully-managed connectors, make sure to set up the proper networking beforehand. For more information, see Manage Networking for Confluent Cloud Connectors.
Features¶
The HTTP Source connector supports the following features:
Offset modes: The connector supports the following modes:
SIMPLE_INCREMENTING
: For the first request, the connector computes the offset based on the initial offset set in thehttp.initial.offset
property. For subsequent requests, the connector increments the offset by the number of records in the previous response. For instance, the connector increments the offset by one for an object type response that indicates a single record. For an array type response, the connector increments the offset by the length of the array.CHAINING
: For the first request, the connector computes the offset based on the initial offset set in thehttp.initial.offset
property. Upon receiving a successful response to the first request, the connector parses the response and generates one or more records (depending on whether the JSON response is an object or an array) with the offset value set to the value from the response at the path configured in thehttp.offset.json.pointer
configuration property.CURSOR_PAGINATION
: In this mode, the propertyhttp.next.page.json.pointer
is used to configure the offset value. The offset for the last record in each page is set to the next page value.
For additional information, see Offset Mode Use Case Examples.
At least once delivery: The connector guarantees that records are delivered at least once to the Kafka topic.
Supports multiple tasks: The connector supports running one or more tasks. More tasks may improve performance when multiple entities are configured.
Supported output data formats: The connector supports Avro, JSON Schema (JSON-SR), Protobuf, and JSON (schemaless) output record value formats. Schema Registry must be enabled to use a Schema Registry-based format (for example, Avro, JSON Schema, or Protobuf).
For more information and examples to use with the Confluent Cloud API for Connect, see the Confluent Cloud API for Connect Usage Examples section.
Limitations¶
Be sure to review the following information.
- For connector limitations, see HTTP Source Connector limitations.
- If you plan to use one or more Single Message Transforms (SMTs), see SMT Limitations.
- If you plan to use Confluent Cloud Schema Registry, see Schema Registry Enabled Environments.
Quick Start¶
Use this quick start to get up and running with the Confluent Cloud HTTP Source connector. The quick start provides the basics of selecting the connector and configuring it to get data from an HTTP API.
- Prerequisites
- Authorized access to a Confluent Cloud cluster on Amazon Web Services (AWS), Microsoft Azure (Azure), or Google Cloud.
- The Confluent CLI installed and configured for the cluster. See Install the Confluent CLI.
- Authorized access to an HTTP or HTTPS endpoint.
- Schema Registry must be enabled to use a Schema Registry-based format (for example, Avro, JSON_SR (JSON Schema), or Protobuf).
Using the Confluent Cloud Console¶
Step 1: Launch your Confluent Cloud cluster¶
See the Quick Start for Confluent Cloud for installation instructions.
Step 2: Add a connector¶
In the left navigation menu, click Connectors. If you already have connectors in your cluster, click + Add connector.
Step 3: Select your connector¶
Click the HTTP Source connector card.
Note
- Make sure you have all your prerequisites completed.
- An asterisk ( * ) designates a required entry.
- The steps provide information about how to use the required configuration properties. See Configuration Properties for other configuration property values and descriptions.
At the Add HTTP Source Connector screen, complete the following:
- Select the way you want to provide Kafka Cluster credentials. You can
choose one of the following options:
- My account: This setting allows your connector to globally access everything that you have access to. With a user account, the connector uses an API key and secret to access the Kafka cluster. This option is not recommended for production.
- Service account: This setting limits the access for your connector by using a service account. This option is recommended for production.
- Use an existing API key: This setting allows you to specify an API key and a secret pair. You can use an existing pair or create a new one. This method is not recommended for production environments.
- Click Continue.
- Enter the connection details:
- HTTP(S) URL: The HTTP API URL. You can template the URL property
with offset and entity information. For example:
http://example.com/api/v1/${entityName}/${offset}
. The connector substitutes${offset}
with the offset generated from the previous request’s response. If the request is the first one, the connector useshttp.initial.offset
. The connector substitutes${entityName}
with the property values from the connector configuration property Entity Names (entity.names
). For additional information, see HTTP Source Connector limitations. - Endpoint Authentication Type: Select one of the authentication
options:
- basic: Enter a username and password for basic authentication.
- bearer: For endpoints requiring client bearer token authentication, you must provide the token.
- none: No endpoint authentication.
- oauth2: Enter the following options when OAuth2 is selected:
- OAuth2 token url: The URL to be used for fetching the OAuth2 token in the field. Client Credentials is the only supported grant type.
- OAuth2 Client ID: The client ID used when fetching the OAuth2 token.
- OAuth2 Client Secret: The secret used when fetching the OAuth2 token.
- OAuth2 Token Property Name: The name of the property
containing the OAuth2 token returned by the OAuth2 token URL.
Defaults to
access_token
. - OAuth2 Client Scope: The scope parameter sent when fetching the OAuth2 token.
- OAuth2 Client Mode: Specifies how to encode
client_id
andclient_secret
in the OAuth2 authorization request. If set toheader
, the credentials are encoded as anAuthorization: Basic <base-64 encoded client_id:client_secret>
HTTP header. If set tourl
, thenclient_id
andclient_secret
are sent as URL- encoded parameters.
- SSL Enabled: Defaults to
false
. If you set this property to true, you must upload the Key store and/ or Trust store files. You must also supply the required passwords for each uploaded file.
- HTTP(S) URL: The HTTP API URL. You can template the URL property
with offset and entity information. For example:
- Click Continue.
Add the Topic Name Pattern. The name of the Kafka topic where the connector publishes data. If you want to use a separate topic for each entity, you can use the template variable
${entityName}
. For example, a valid value for this property isElasticsearch_Index_${entityName}
. The variable${entityName}
is replaced with values from the configuration property Entity Names (entity.names
).Select the HTTP Request Method: Defaults to
GET
. ForGET
you add parameters to the HTTP Request Parameters property. ForPOST
, you add the JSON payload request in the HTTP Request Body property.Select the output record value format (data going to the Kafka topic): AVRO, JSON, JSON_SR (JSON Schema), or PROTOBUF. Schema Registry must be enabled to use a Schema Registry-based format (for example, Avro, JSON Schema, or Protobuf). For additional information, see Schema Registry Enabled Environments .
Add the HTTP request details:
- HTTP Request Headers: The HTTP headers included in each request.
Header names and values must be separated by
:
. Distinct headers must be separated by thehttp.request.headers.separator
property value (defaults to|
). For example:From:abcxyz@confluent.io|Content-Length:348
. - HTTP Request Parameters: The HTTP parameters added to each
request. Used with the
GET
request method only. Parameter names and values should be separated by=
. Distinct parameters should be separated by thehttp.request.parameters.separator
property (defaults to&
). Parameter values can be templated with offset and entity information. For example:entity=${entityName}&search_after=${offset}
, where${offset}
is substituted with the offset generated from the previous request’s response. Or, if it’s the first request, the offset comes from thehttp.initial.offset
property.${entityName}
is substituted with values from theentity.names
property. - HTTP Request Body: The JSON payload parameters sent with each
HTTP request. Used with the
POST
request method only. The value can be templated with offset and entity information. For example:{'entity': '${entityName}', 'search_after': '${offset}'
, where${offset}
is substituted with the offset generated from the previous request’s response. Or, if it’s the first request, the offset comes from thehttp.initial.offset
property.${entityName}
is substituted with values from theentity.names
property. - HTTP Initial Offset: The initial offset used to generate the
first request. You must set this property value if
url
,http.request.parameters
, orhttp.request.body
contains the template variable${offset}
. - HTTP Offset Mode: Indicates how the connector computes offsets
and generates requests. If set to
SIMPLE_INCREMENTING
, the${offset}
used to generate requests is the previous offset (orhttp.initial.offset
) incremented by one per-sourced record. If the response is a JSON object indicating a single record, the offset is incremented by one, but if the response is a JSON array of n records, the offset increments by n. InSIMPLE_INCREMENTING
mode, you sethttp.initial.offset
to an integer value and the propertyhttp.offset.json.pointer
is not configured. If you set this property toCHAINING
, you must also set the propertyhttp.offset.json.pointer
and the offset for a record is set to the value for the configured key in the response data. InCURSOR_PAGINATION
mode, you sethttp.next.page.json.pointer
to configure the path in each JSON response that can be used as the offset value/next page reference. The offset for the last record in each page is set to the next page value. For additional information, see Offset Mode Use Case Examples. - HTTP Response Data JSON Pointer: The JSON Pointer to the entity in the JSON response containing the actual data the connector writes to Kafka as records. The entity can be an array (multiple records in a single response) or an object/scalar value (single record).
- HTTP Offset JSON Pointer: The JSON Pointer to the value in each
record that corresponds to the offset for that record (relative to
the
http.response.data.json.pointer
value). The offset is available to the subsequent request as${offset}
and is also used for checkpointing and recovery in case the connector fails and restarts. Used when HTTP Offset Mode is set toCHAINING
. - HTTP Next Page JSON Pointer: The JSON Pointer to the value in
the response that corresponds to the next page reference (either a
page token, a full URL, or a URL fragment). This is stored as the
offset and is available to the subsequent request via the template
variable
${offset}
. You set this property only if HTTP Offset Mode is set toCURSOR_PAGINATION
. Note that the next page reference in each JSON response must be a JSON string. For instance, the reference cannot be a JSON array, object, or number. - Request Interval (ms): The time in milliseconds (ms) to wait between consecutive requests.
Show advanced configurations
Schema context: Select a schema context to use for this connector, if using a schema-based data format. This property defaults to the Default context, which configures the connector to use the default schema set up for Schema Registry in your Confluent Cloud environment. A schema context allows you to use separate schemas (like schema sub-registries) tied to topics in different Kafka clusters that share the same Schema Registry environment. For example, if you select a non-default context, a Source connector uses only that schema context to register a schema and a Sink connector uses only that schema context to read from. For more information about setting up a schema context, see What are schema contexts and when should you use them?.
Entity Names: A list of entities that the connector polls. Values from this list replace the template variable
${entityName}
in the configuration propertiestopic.name.pattern
,url
,http.request.parameters
, andhttp.request.body
. You do not set this property if none of the referenced configuration properties contain the template variable${entityName}
.Maximum Retries: The maximum number of times the connector retries a request when an error occurs, before the task fails.
Retry Backoff (ms): The time in ms to wait following an error before the connector retries the task.
Retry HTTP Status Codes: The HTTP error codes returned that prompt the connector to retry the request. Enter a comma- separated list of codes or range of codes. Ranges are specified with a start and optional end code. Range boundaries are inclusive. For example:
400-
includes all codes greater than or equal to400
and400-500
includes codes from 400 to 500, including 500. Multiple ranges and single codes can be specified together to achieve fine-grained control over retry behavior. For example:404,408,500-
prompts the connector to retry on404 NOT FOUND
,408 REQUEST TIMEOUT
, and all5xx
error codes. Note that some status codes are always retried, such as unauthorized, timeouts, and too many requests.HTTP Request Headers Separator: The character that separates multiple distinct request headers in the configuration property
http.request.headers
.HTTP Request Parameters Separator: The character that separates multiple distinct request parameters in the configuration property
http.request.parameters
.Transforms and Predicates: See the Single Message Transforms (SMT) documentation for details.
- HTTP Request Headers: The HTTP headers included in each request.
Header names and values must be separated by
Click Continue.
- Based on the number of topic partitions you select, you will be provided with a recommended number of tasks.
- Click Continue.
Verify the connection details by previewing the running configuration.
Once you’ve validated that the properties are configured to your satisfaction, click Launch.
The status for the connector should go from Provisioning to Running.
Step 5: Check for records¶
Verify that records are being produced in the Kafka topic.
For more information and examples to use with the Confluent Cloud API for Connect, see the Confluent Cloud API for Connect Usage Examples section.
Using the Confluent CLI¶
To set up and run the connector using the Confluent CLI, complete the following steps.
Note
Make sure you have all your prerequisites completed.
Step 1: List the available connectors¶
Enter the following command to list available connectors:
confluent connect plugin list
Step 2: List the connector configuration properties¶
Enter the following command to show the connector configuration properties:
confluent connect plugin describe <connector-plugin-name>
The command output shows the required and optional configuration properties.
Step 3: Create the connector configuration file¶
Create a JSON file that contains the connector configuration properties. The following example shows the required connector properties.
{
"connector.class": "HttpSource",
"name": "HttpSourceConnector_0",
"kafka.api.key": "****************",
"kafka.api.secret": "*********************************",
"topic.name.pattern": "${entityName}",
"entity.names": "AppName, User",
"url": "http://example.com/api/v1/${entityName}/${offset}",
"http.initial.offset": "1000",
"http.offset.mode": "SIMPLE_INCREMENTING",
"output.data.format": "JSON",
"tasks.max": "1",
}
Note the following property definitions:
"connector.class"
: Identifies the connector plugin name."name"
: Sets a name for your new connector.
"kafka.auth.mode"
: Identifies the connector authentication mode you want to use. There are two options:SERVICE_ACCOUNT
orKAFKA_API_KEY
(the default). To use an API key and secret, specify the configuration propertieskafka.api.key
andkafka.api.secret
, as shown in the example configuration (above). To use a service account, specify the Resource ID in the propertykafka.service.account.id=<service-account-resource-ID>
. To list the available service account resource IDs, use the following command:confluent iam service-account list
For example:
confluent iam service-account list Id | Resource ID | Name | Description +---------+-------------+-------------------+------------------- 123456 | sa-l1r23m | sa-1 | Service account 1 789101 | sa-l4d56p | sa-2 | Service account 2
"topic.name.pattern"
: The name of the Kafka topic where the connector publishes data. If you want to use a separate topic for each entity, you can use the template variable${entityName}
. For example, a valid value for this property isElasticsearch_Index_${entityName}
. The variable${entityName}
is replaced with values from the configuration propertyentity.names
."entity.names"
: A list of entities that the connector polls."url"
: The HTTP API URL. You can template the URL property with offset and entity information. For example:http://example.com/api/v1/${entityName}/${offset}
. The connector substitutes${offset}
with the offset generated from the previous request’s response. If the request is the first one, the connector useshttp.initial.offset
. The connector substitutes${entityName}
with the property values from the connector configuration propertyentity.names
. For additional information, see HTTP Source Connector limitations."http.initial.offset"
: The initial offset used to generate the first request. You must set this property value ifurl
,http.request.parameters
, orhttp.request.body
contains the template variable${offset}
."http.offset.mode"
: Indicates how offsets are computed and how requests are generated. If set toSIMPLE_INCREMENTING
, the${offset}
used to generate requests is the previous offset (orhttp.initial.offset
) incremented by one sourced record. In this mode, you sethttp.initial.offset
to an integer value and the propertyhttp.offset.json.pointer
is not configured. If set toCHAINING
, you must set the propertyhttp.offset.json.pointer
and the offset for a record is set to the value for the configured key in the response data. InCURSOR_PAGINATION
mode, you sethttp.next.page.json.pointer
to configure the path in each JSON response that can be used as the offset value/next page reference. The offset for the last record in each page is set to the next page value. Note that the next page reference in each JSON response must be a JSON string. For instance, the reference cannot be a JSON array, object, or number. For additional information, see Offset Mode Use Case Examples."output.data.format"
: Set the output record value format (data going to the Kafka topic): AVRO, JSON, JSON_SR (JSON Schema), or PROTOBUF. Schema Registry must be enabled to use a Schema Registry-based format (for example, Avro, JSON Schema, or Protobuf). For additional information, see Schema Registry Enabled Environments ."tasks.max"
: The connector supports running multiple tasks per connector. More tasks may improve performance.
Single Message Transforms:
See the Single Message Transforms (SMT) documentation for details about adding SMTs using the CLI.
Important
For TLS connections, you must supply the keystore and/or truststore file
contents and the file passwords when creating the connector configuration
JSON. The truststore and keystore files are binary files. For the
https.ssl.keystorefile
and the https.ssl.truststorefile
properties, you encode the truststore or keystore file in base64, take the
encoded string, add the data:text/plain:base64
prefix, and then use the
entire string as the property entry. For example:
"https.ssl.keystorefile" : "data:text/plain;base64,/u3+7QAAAAIAAAACAAAAAQAGY2xpZ...omitted...==",
"https.ssl.keystore.password" : "<password>",
"https.ssl.key.password" : "<private-key-password>"
For all property values and descriptions, see Configuration Properties.
Step 3: Load the properties file and create the connector¶
Enter the following command to load the configuration and start the connector:
confluent connect cluster create --config-file <file-name>.json
For example:
confluent connect cluster create --config-file http-source-config.json
Example output:
Created connector HttpSourceConnector_0 lcc-do6vzd
Step 4: Check the connector status.¶
Enter the following command to check the connector status:
confluent connect cluster list
Example output:
ID | Name | Status | Type | Trace
+------------+-----------------------------+---------+--------+-------+
lcc-do6vzd | HttpSourceConnector_0 | RUNNING | source | |
Step 5: Check the Kafka topic¶
Verify that records are being produced at the Kafka topic.
For more information and examples to use with the Confluent Cloud API for Connect, see the Confluent Cloud API for Connect Usage Examples section.
Offset Mode Use Case Examples¶
The following examples show how the connector polls JSON data from
various HTTP-based APIs using the following different HTTP Offset Mode
(http.offset.mode
) configuration properties:
SIMPLE_INCREMENTING offset mode with the Atlassian Confluence Cloud REST API¶
The Atlassian Confluence Cloud REST API provides the ability to query Atlassian Confluence Cloud resources using GET requests. This GET API supports the following two query parameters:
start
: The index of the first item returned in the page of results. The base index is0
.limit
: The number of objects to return per page. This may be restricted by system limits.
The following shows an example request and response using the Atlassian Confluence Cloud REST API.
Request
GET /wiki/rest/api/space?start=0&limit=1
Response body
{
"results": [
{
"id": 12345657891,
"key": "~11121314",
"name": "2020 ~ jsmith",
"type": "personal",
"status": "current",
"_expandable": {
"settings": "/rest/api/space/~11121314/settings",
"metadata": "",
"operations": "",
"lookAndFeel": "/rest/api/settings/lookandfeel?spaceKey=~11121314",
"identifiers": "",
"permissions": "",
"icon": "",
"description": "",
"theme": "/rest/api/space/~823923446/theme",
"history": "",
"homepage": "/rest/api/content/151617181920"
},
"_links": {
"webui": "/spaces/~11121314",
"self": "https://confluentinc.atlassian.net/wiki/rest/api/space/~11121314"
}
}
],
"start": 0,
"limit": 1,
"size": 1,
"_links": {
"base": "https://confluentinc.atlassian.net/wiki",
"context": "/wiki",
"next": "/rest/api/space?next=true&limit=1&start=1",
"self": "https://confluentinc.atlassian.net/wiki/rest/api/space"
}
}
The following example connector configuration snippet shows the properties used to continuously poll data from the Atlassian Confluence Cloud REST API. Note that this REST API supports basic authentication using a username and password.
"url": "https://confluentinc.atlassian.net/wiki/rest/api/space",
"http.request.parameters": "start=${offset}&limit=1",
"http.offset.mode": "SIMPLE_INCREMENTING",
"http.initial.offset": "0",
"http.response.data.json.pointer": "/results",
"auth.type": "basic",
"connection.user": "username",
"connection.password": "*****"
Using the configuration above, the connector makes the initial GET request to
https://confluentinc.atlassian.net/wiki/rest/api/space?start=0&limit=1
and
then the next request to https://confluentinc.atlassian.net/wiki/rest/api/space?start=1&limit=1
,
and so on. Once the start
parameter begins referencing records which aren’t
yet created, the Confluence Cloud REST API returns an empty array in /results
.
The connector parses this response and finds 0
records to produce. The same
request is repeated until the connector finds new records. Note that the offset
does not advance until records are found.
CHAINING offset mode with the Elasticsearch Search REST API¶
Elasticsearch has a Search REST API
that returns documents that match the query defined in the request. The Search
API supports query parameters from
and size
to page through results. The
Search API and these parameters do not support paging through more than 10,000
documents. To support these use cases, the Search API provides the parameter
search_after
that retrieves the next page of documents using a set of sort
values from the previous page.
The following shows an example request and response using the Search REST API.
Request
POST /test-index/_search
Request body
{
"size": 100,
"sort": [
{
"time": "asc"
}
],
"search_after": [
1647948089978
]
}
Response body
{
"took": 614,
"timed_out": false,
"_shards": {
"total": 1,
"successful": 1,
"skipped": 0,
"failed": 0
},
"hits": {
"total": {
"value": 3,
"relation": "eq"
},
"max_score": null,
"hits": [
{
"_index": "test-index",
"_id": "...",
"_score": null,
"_source": {
"name": "Name1",
"time": "1647948089979"
},
"sort": [
1647948089979
]
},
{
"_index": "test-index",
"_id": "...",
"_score": null,
"_source": {
"name": "Name2",
"time": "1647948092349"
},
"sort": [
1647948092349
]
},
{
"_index": "test-index",
"_id": "...",
"_score": null,
"_source": {
"name": "Name3",
"time": "1647948097246"
},
"sort": [
1647948097246
]
}
]
}
}
The following example connector configuration snippet shows the properties used to continuously poll data from the Elasticsearch Search REST API. Note that Elasticsearch REST APIs support basic authentication using a username and password.
"url": "http://domain/test-index/_search",
"http.offset.mode": "CHAINING",
"http.request.body": "{"size": 100, "sort": [{"@time": "asc"}], "search_after": [${offset}]}",
"http.request.method": "POST",
"http.initial.offset": "1647948000000",
"http.response.data.json.pointer": "/hits/hits",
"http.offset.json.pointer": "/sort/0",
"auth.type": "basic",
"connection.user": "username",
"connection.password": "*****"
In the example above, the connector makes the initial POST request to
http://domain/test-index/_search
with request body {"size": 100, "sort":
[{"@time": "asc"}], "search_after": [1647948000000]}
. Using the response
example above, the connector sends the next request with request body {"size":
100, "sort": [{"@time": "asc"}], "search_after": [1647948097246]}
, and so on.
Note that if the size
parameter isn’t set to a value large enough, connector
data loss can occur if the number of documents with the same sort
parameter
value exceeds the size
value.
CURSOR_PAGINATION offset mode with the GCS List Objects REST API¶
The Google Cloud Storage (GCS) List Objects REST API
allows listing all the objects in a GCS bucket in paginated form. The API takes
the query parameter pageToken
to denote the page of responses that need to
be returned.
The following shows an example request and response using the Google Cloud Storage (GCS) List Objects REST API.
Request
GET storage/v1/b/{bucket_name}/o?pageToken={page_token}
Response body
{
"kind": "storage#objects",
"nextPageToken": "CkV0b3BpY3MvZGVtby9MS9ob3V...=",
"items": [
{
"kind": "storage#object",
"id": "{object_id}",
"selfLink": "https://www.googleapis.com/storage/v1/b/{bucket_name}/o/{object_id}",
"mediaLink": "https://storage.googleapis.com/download/storage/v1/b/{bucket_name}/o/{object_id}",
"name": "{object_name}",
"bucket": "{bucket_id}",
"generation": "1669124982018947",
"metageneration": "1",
"contentType": "font/otf",
"storageClass": "STANDARD",
"size": "145520",
"md5Hash": "...",
"crc32c": "...",
"etag": "...",
"timeCreated": "2022-11-22T13:49:42.022Z",
"updated": "2022-11-22T13:49:42.022Z",
"timeStorageClassUpdated": "2022-11-22T13:49:42.022Z"
},
....
The following example connector configuration snippet shows the properties used to get paginated data from the GCS List Objects REST API.
"url": "https://storage.googleapis.com/storage/v1/b/test-bucket/o",
"http.offset.mode": "CURSOR_PAGINATION",
"http.request.method": "GET",
"http.request.parameters": "pageToken=${offset}",
"http.response.data.json.pointer": "/items",
"http.next.page.json.pointer": "/nextPageToken",
"auth.type": "bearer",
"bearer.token": "*****"
In the example above, the connector makes the initial GET request to
https://storage.googleapis.com/storage/v1/b/test-bucket/o?pageToken=
which
returns the first page of results. Using the response example above, the
connector parses and produces the records to Kafka. The connector then makes a
subsequent request to
https://storage.googleapis.com/storage/v1/b/test-bucket/o?pageToken=CkV0b3BpY3MvZGVtby9MS9ob3V...=
,
and so on.
Configuration Properties¶
Use the following configuration properties with the fully-managed connector. For self-managed connector property definitions and other details, see the connector docs in Self-managed connectors for Confluent Platform.
How should we connect to your data?¶
name
Sets a name for your connector.
- Type: string
- Valid Values: A string at most 64 characters long
- Importance: high
Kafka Cluster credentials¶
kafka.auth.mode
Kafka Authentication mode. It can be one of KAFKA_API_KEY or SERVICE_ACCOUNT. It defaults to KAFKA_API_KEY mode.
- Type: string
- Default: KAFKA_API_KEY
- Valid Values: KAFKA_API_KEY, SERVICE_ACCOUNT
- Importance: high
kafka.api.key
Kafka API Key. Required when kafka.auth.mode==KAFKA_API_KEY.
- Type: password
- Importance: high
kafka.service.account.id
The Service Account that will be used to generate the API keys to communicate with Kafka Cluster.
- Type: string
- Importance: high
kafka.api.secret
Secret associated with Kafka API key. Required when kafka.auth.mode==KAFKA_API_KEY.
- Type: password
- Importance: high
Which topic(s) do you want to send data to?¶
topic.name.pattern
The name of the Kafka topic to publish data to. The value can contain a template variable ${entityName} in case a separate Kafka topic should be used for each entity; the variable ${entityName} will be replaced with values from the config ‘entity.names’.
- Type: string
- Importance: high
Schema Config¶
schema.context.name
Add a schema context name. A schema context represents an independent scope in Schema Registry. It is a separate sub-schema tied to topics in different Kafka clusters that share the same Schema Registry instance. If not used, the connector uses the default schema configured for Schema Registry in your Confluent Cloud environment.
- Type: string
- Default: default
- Importance: medium
Connection details¶
url
The HTTP API URL which can be templated with offset and entity information. For example: http://example.com/api/v1/${entityName}/${offset} where ${offset} will be substituted with the offset generated from the previous request’s response (or if it’s the first request, from ‘http.initial.offset’), and ${entityName} will be substituted with values from the config entity.names.
- Type: string
- Importance: high
http.request.method
HTTP Request Method. Valid options are GET and POST.
- Type: string
- Default: GET
- Importance: high
http.request.headers
HTTP headers to be included in each request. Header names and values should be separated by :. Distinct headers should be separated by the config value for ‘http.request.headers.separator’ (defaults to |’). For example: `From:abcxyz@confluent.io|Content-Length:348.
- Type: string
- Importance: medium
http.request.parameters
HTTP parameters to be added to each request. Parameter names and values should be separated by =. Distinct parameters should be separated by the config value for ‘http.request.parameters.separator’ ((defaults to &). Parameter values can be templated with offset and entity information (for example: entity=${entityName}&search_after=${offset}) where ${offset} will be substituted with the offset generated from the previous request’s response (or if it’s the first request, from ‘http.initial.offset’), and ${entityName} will be substituted with values from the config ‘entity.names’. The parameters are only set if ‘http.request.method’ = GET.
- Type: string
- Importance: medium
http.request.body
The payload to be sent along with each HTTP request. The value can be templated with offset and entity information (for example: {‘entity’: ‘${entityName}’, ‘search_after’: ‘${offset}’) where ${offset} will be substituted with the offset generated from the previous request’s response (or if it’s the first request, from ‘http.initial.offset’), and ${entityName} will be substituted with values from the config ‘entity.names’. The body is only set if ‘http.request.method’ = POST.
- Type: string
- Importance: medium
http.initial.offset
The initial offset to be used to generate the first request. This needs to be set if either one or more of the configs - ‘url’, ‘http.request.parameters’, or ‘http.request.body’ contain the template variable ${offset}.
- Type: string
- Importance: high
http.offset.mode
This config indicates how offsets are computed and how requests are generated. If set to SIMPLE_INCREMENTING, the ${offset} used to generate requests is simply the previous offset (or http.initial.offset) incremented by 1 per sourced record. In this mode, http.initial.offset needs to be set to an integer value. If set to CHAINING, the config ‘http.offset.json.pointer’ needs to be set, and the offset for a record is set to the value for the configured key in the response data. If the value is CURSOR_PAGINATION, then the config ‘http.next.page.json.pointer’ needs to be set and the offset for the last record in each page will be set to the next page value.
- Type: string
- Default: SIMPLE_INCREMENTING
- Importance: high
http.response.data.json.pointer
The JSON Pointer to the entity in the JSON response containing the actual data that should be written to Kafka as records. The entity can be an array (multiple records in a single response) or an object / scalar value (single record).
- Type: string
- Importance: high
http.offset.json.pointer
The JSON Pointer to the value in each record that corresponds to the offset for that record (it is relative to ‘http.response.data.json.pointer’). The offset will be available to the subsequent request as ${offset} and it will also be used for checkpointing and recovery in case of connector failures or restarts. This config should only be set if ‘http.offset.mode’ is set to CHAINING.
- Type: string
- Importance: medium
http.next.page.json.pointer
The JSON pointer to the value in the response which corresponds to the next page reference (either a page token, a full URL or a URL fragment). This will be stored as the offset and will be available to the subsequent request via the template variable ${offset}. This config should only be set if ‘http.offset.mode’ is set to CURSOR_PAGINATION.
- Type: string
- Importance: medium
request.interval.ms
The time in milliseconds to wait between consecutive requests.
- Type: int
- Default: 15000 (15 seconds)
- Valid Values: [100,…]
- Importance: medium
entity.names
A list of entities that should be polled. Values from this list will replace the template variable ${entityName} in the configs ‘topic.name.pattern’, ‘url’, ‘http.request.parameters’, ‘http.request.body’. This config doesn’t need to be set if none of the mentioned configs contain the template variable ${entityName}.
- Type: list
- Importance: medium
max.retries
The maximum number of times to retry on errors before failing the task.
- Type: int
- Default: 10
- Importance: medium
retry.backoff.ms
The time in milliseconds to wait following an error before a retry attempt is made.
- Type: int
- Default: 3000 (3 seconds)
- Valid Values: [100,…]
- Importance: medium
retry.on.status.codes
Comma-separated list of HTTP status codes or range of codes to retry on. Ranges are specified with start and optional end code. Range boundaries are inclusive. For instance, 400- includes all codes greater than or equal to 400. 400-500 includes codes from 400 to 500, including 500. Multiple ranges and single codes can be specified together to achieve fine-grained control over retry behavior. For example, 404,408,500- will retry on 404 NOT FOUND, 408 REQUEST TIMEOUT, and all 5xx error codes. Note that some status codes will always be retried, such as unauthorized, timeouts and too many requests.
- Type: string
- Default: 400-
- Importance: medium
http.request.headers.separator
The character that separates multiple distinct headers in the config ‘http.request.headers’.
- Type: string
- Default: |
- Importance: medium
http.request.parameters.separator
The character that separates multiple distinct request parameters in the config ‘http.request.parameters’.
- Type: string
- Default: &
- Importance: medium
http.request.sensitive.headers
Sensitive HTTP headers (eg: credentials) to be included in all requests. Individual headers should be separated by the Header Separator
- Type: password
- Importance: high
Authentication and SSL¶
auth.type
Authentication type of the endpoint. Valid values are
none
,basic
,oauth2
,bearer
(Client Credentials grant type only).- Type: string
- Default: none
- Importance: high
connection.user
The username to be used with an endpoint requiring basic authentication.
- Type: string
- Importance: medium
connection.password
The password to be used with an endpoint requiring basic authentication.
- Type: password
- Importance: medium
bearer.token
The bearer authentication token to be used with an endpoint requiring bearer token based authentication.
- Type: password
- Importance: medium
oauth2.token.url
The URL to be used for fetching the OAuth2 token. Client Credentials is the only supported grant type.
- Type: string
- Importance: medium
oauth2.client.id
The client id used when fetching the OAuth2 token.
- Type: string
- Importance: medium
oauth2.client.secret
The client secret used when fetching the OAuth2 token.
- Type: password
- Importance: medium
oauth2.token.property
The name of the property containing the OAuth2 token returned by the OAuth2 token URL (defaults to access_token).
- Type: string
- Importance: medium
oauth2.client.scope
The scope parameter sent to the service when fetching the OAuth2 token.
- Type: string
- Importance: medium
oauth2.client.mode
Specifies how to encode
client_id
andclient_secret
in the OAuth2 authorization request. If set toheader
, the credentials are encoded as an ‘Authorization: Basic <base-64 encoded client_id:client_secret>’ HTTP header. If set to ‘url’, thenclient_id
andclient_secret
are sent as URL encoded parameters.- Type: string
- Importance: medium
https.ssl.enabled
Whether or not to connect to the endpoint via SSL.
- Type: boolean
- Default: false
- Importance: medium
https.ssl.keystorefile
The key store containing the server certificate.
- Type: password
- Importance: low
https.ssl.keystore.password
The store password for the key store file.
- Type: password
- Importance: high
https.ssl.key.password
The password for the private key in the key store file.
- Type: password
- Importance: high
https.ssl.truststorefile
The trust store containing a server CA certificate.
- Type: password
- Importance: high
https.ssl.truststore.password
The trust store password containing a server CA certificate.
- Type: password
- Importance: high
https.ssl.protocol
The protocol to use for SSL connections
- Type: string
- Default: TLSv1.3
- Importance: medium
Output messages¶
output.data.format
Sets the output Kafka record value format. Valid entries are AVRO, JSON_SR, PROTOBUF, or JSON. Note that you need to have Confluent Cloud Schema Registry configured if using a schema-based message format like AVRO, JSON_SR, and PROTOBUF
- Type: string
- Importance: high
Number of tasks for this connector¶
tasks.max
Maximum number of tasks for the connector.
- Type: int
- Valid Values: [1,…]
- Importance: high
Next Steps¶
For an example that shows fully-managed Confluent Cloud connectors in action with Confluent Cloud ksqlDB, see the Cloud ETL Demo. This example also shows how to use Confluent CLI to manage your resources in Confluent Cloud.