HTTP Source Connector for Confluent Cloud

The fully-managed Kafka Connect HTTP Source connector periodically polls for data from an HTTP API and then produces records from that source in Apache Kafka®.

Features

The HTTP Source connector supports the following features:

  • Offset modes: The connector supports the following modes:
    • SIMPLE_INCREMENTING: For the first request, the connector computes the offset based on the initial offset set in the http.initial.offset property. For subsequent requests, the connector increments the offset by the number of records in the previous response. For instance, the connector increments the offset by one for an object type response that indicates a single record. For an array type response, the connector increments the offset by the length of the array.
    • CHAINING: For the first request, the connector computes the offset based on the initial offset set in the http.initial.offset property. Upon receiving a successful response to the first request, the connector parses the response and generates one or more records (depending on whether the JSON response is an object or an array) with the offset value set to the value from the response at the path configured in the http.offset.json.pointer configuration property.
  • At least once delivery: The connector guarantees that records are delivered at least once to the Kafka topic.
  • Supports multiple tasks: The connector supports running one or more tasks. More tasks may improve performance when multiple entities are configured.
  • Supported output data formats: The connector supports Avro, JSON Schema (JSON-SR), Protobuf, and JSON (schemaless) output record value formats. Schema Registry must be enabled to use a Schema Registry-based format (for example, Avro, JSON Schema, or Protobuf).

For more information and examples to use with the Confluent Cloud API for Connect, see the Confluent Cloud API for Connect section.

Limitations

Be sure to review the following information.

Quick Start

Use this quick start to get up and running with the Confluent Cloud HTTP Source connector. The quick start provides the basics of selecting the connector and configuring it to get data from an HTTP API.

Prerequisites
  • Authorized access to a Confluent Cloud cluster on Amazon Web Services (AWS), Microsoft Azure (Azure), or Google Cloud Platform (GCP).
  • The Confluent CLI installed and configured for the cluster. See Install the Confluent CLI.
  • Authorized access to an HTTP or HTTPS endpoint.
  • Schema Registry must be enabled to use a Schema Registry-based format (for example, Avro, JSON_SR (JSON Schema), or Protobuf).

Using the Confluent Cloud Console

Step 1: Launch your Confluent Cloud cluster.

See the Quick Start for Confluent Cloud for installation instructions.

Step 2: Add a connector.

In the left navigation menu, click Data integration, and then click Connectors. If you already have connectors in your cluster, click + Add connector.

Step 3: Select your connector.

Click the HTTP Source connector card.

HTTP Source Connector Card

Note

  • Make sure you have all your prerequisites completed.
  • An asterisk ( * ) designates a required entry.
  • The steps provide information about how to use the required configuration properties. See Configuration Properties for other configuration property values and descriptions.

At the Add HTTP Source Connector screen, complete the following:

  1. Select the way you want to provide Kafka Cluster credentials. You can choose one of the following options:
    • Global Access: Allows your connector to access everything you have access to. With global access, connector access will be linked to your account. This option is not recommended for production.
    • Granular access: Limits the access for your connector. You will be able to manage connector access through a service account. This option is recommended for production.
    • Use an existing API key: Allows you to enter an API key and secret part you have stored. You can enter an API key and secret (or generate these in the Cloud Console).
  2. Click Continue.

Step 5: Check for records.

Verify that records are being produced in the Kafka topic.

For more information and examples to use with the Confluent Cloud API for Connect, see the Confluent Cloud API for Connect section.

Using the Confluent CLI

To set up and run the connector using the Confluent CLI, complete the following steps.

Note

Make sure you have all your prerequisites completed.

Step 1: List the available connectors.

Enter the following command to list available connectors:

confluent connect plugin list

Step 2: Show the required connector configuration properties.

Enter the following command to show the required connector properties:

confluent connect plugin describe <connector-catalog-name>

For example:

confluent connect plugin describe HttpSource

Example output:

The following are required configs:
connector.class : HttpSource
name
kafka.api.key : ["kafka.api.key" is required when "kafka.auth.mode==KAFKA_API_KEY"]
kafka.api.secret : ["kafka.api.secret" is required when "kafka.auth.mode==KAFKA_API_KEY"]
topic.name.pattern
url
output.data.format
tasks.max

Step 3: Create the connector configuration file.

Create a JSON file that contains the connector configuration properties. The following example shows the required connector properties.

{
  "connector.class": "HttpSource",
  "name": "HttpSourceConnector_0",
  "kafka.api.key": "****************",
  "kafka.api.secret": "*********************************",
  "topic.name.pattern": "${entityName}",
  "entity.names": "AppName, User",
  "url": "http://example.com/api/v1/${entityName}/${offset}",
  "http.initial.offset": "1000",
  "http.offset.mode": "SIMPLE_INCREMENTING",
  "output.data.format": "JSON",
  "tasks.max": "1",
}

Note the following property definitions:

  • "connector.class": Identifies the connector plugin name.
  • "name": Sets a name for your new connector.
  • "kafka.auth.mode": Identifies the connector authentication mode you want to use. There are two options: SERVICE_ACCOUNT or KAFKA_API_KEY (the default). To use an API key and secret, specify the configuration properties kafka.api.key and kafka.api.secret, as shown in the example configuration (above). To use a service account, specify the Resource ID in the property kafka.service.account.id=<service-account-resource-ID>. To list the available service account resource IDs, use the following command:

    confluent iam service-account list
    

    For example:

    confluent iam service-account list
    
       Id     | Resource ID |       Name        |    Description
    +---------+-------------+-------------------+-------------------
       123456 | sa-l1r23m   | sa-1              | Service account 1
       789101 | sa-l4d56p   | sa-2              | Service account 2
    
  • "topic.name.pattern": The name of the Kafka topic where the connector publishes data. If you want to use a separate topic for each entity, you can use the template variable ${entityName}. For example, a valid value for this property is Elasticsearch_Index_${entityName}. The variable ${entityName} is replaced with values from the configuration property entity.names.

  • "entity.names": A list of entities that the connector polls.

  • "url": The HTTP API URL. You can template the URL property with offset and entity information. For example: http://example.com/api/v1/${entityName}/${offset}. The connector substitutes ${offset} with the offset generated from the previous request’s response. If the request is the first one, the connector uses http.initial.offset. The connector substitutes ${entityName} with the property values from the connector configuration property entity.names.

  • "http.initial.offset": The initial offset used to generate the first request. You must set this property value if url, http.request.parameters, or http.request.body contains the template variable ${offset}.

  • "http.offset.mode": Indicates how offsets are computed and how requests are generated. If set to SIMPLE_INCREMENTING, the ${offset} used to generate requests is the previous offset (or http.initial.offset) incremented by one sourced record. In this mode, you set http.initial.offset to an integer value and the property http.offset.json.pointer is not configured. If set to CHAINING, you must set the property http.offset.json.pointer and the offset for a record is set to the value for the configured key in the response data.

  • "output.data.format": Set the output record value format (data going to the Kafka topic): AVRO, JSON, JSON_SR (JSON Schema), or PROTOBUF. Schema Registry must be enabled to use a Schema Registry-based format (for example, Avro, JSON Schema, or Protobuf). For additional information, see Schema Registry Enabled Environments .

  • "tasks.max": The connector supports running multiple tasks per connector. More tasks may improve performance.

Single Message Transforms:

See the Single Message Transforms (SMT) documentation for details about adding SMTs using the CLI.

Important

For TLS connections, you must supply the keystore and/or truststore file contents and the file passwords when creating the connector configuration JSON. The truststore and keystore files are binary files. For the https.ssl.keystorefile and the https.ssl.truststorefile properties, you encode the truststore or keystore file in base64, take the encoded string, add the data:text/plain:base64 prefix, and then use the entire string as the property entry. For example:

"https.ssl.keystorefile" : "data:text/plain;base64,/u3+7QAAAAIAAAACAAAAAQAGY2xpZ...omitted...==",
"https.ssl.keystore.password" : "<password>",
"https.ssl.key.password" : "<private-key-password>"

For all property values and descriptions, see Configuration Properties.

Step 3: Load the properties file and create the connector.

Enter the following command to load the configuration and start the connector:

confluent connect create --config <file-name>.json

For example:

confluent connect create --config http-source-config.json

Example output:

Created connector HttpSourceConnector_0 lcc-do6vzd

Step 4: Check the connector status.

Enter the following command to check the connector status:

confluent connect list

Example output:

ID           |             Name            | Status  | Type   | Trace
+------------+-----------------------------+---------+--------+-------+
lcc-do6vzd   | HttpSourceConnector_0       | RUNNING | source |       |

Step 5: Check the Kafka topic.

Verify that records are being produced at the Kafka topic.

For more information and examples to use with the Confluent Cloud API for Connect, see the Confluent Cloud API for Connect section.

Configuration Properties

Use the following configuration properties with this connector.

How should we connect to your data?

name

Sets a name for your connector.

  • Type: string
  • Valid Values: A string at most 64 characters long
  • Importance: high

Kafka Cluster credentials

kafka.auth.mode

Kafka Authentication mode. It can be one of KAFKA_API_KEY or SERVICE_ACCOUNT. It defaults to KAFKA_API_KEY mode.

  • Type: string
  • Default: KAFKA_API_KEY
  • Valid Values: KAFKA_API_KEY, SERVICE_ACCOUNT
  • Importance: high
kafka.api.key
  • Type: password
  • Importance: high
kafka.service.account.id

The Service Account that will be used to generate the API keys to communicate with Kafka Cluster.

  • Type: string
  • Importance: high
kafka.api.secret
  • Type: password
  • Importance: high

Which topic(s) do you want to send data to?

topic.name.pattern

The name of the Kafka topic to publish data to. The value can contain a template variable ${entityName} in case a separate Kafka topic should be used for each entity; the variable ${entityName} will be replaced with values from the config ‘entity.names’.

  • Type: string
  • Importance: high

Connection details

url

The HTTP API URL which can be templated with offset and entity information. For example: http://example.com/api/v1/${entityName}/${offset} where ${offset} will be substituted with the offset generated from the previous request’s response (or if it’s the first request, from ‘http.initial.offset’), and ${entityName} will be substituted with values from the config entity.names.

  • Type: string
  • Importance: high
http.request.method

HTTP Request Method. Valid options are GET and POST.

  • Type: string
  • Default: GET
  • Importance: high
http.request.headers

HTTP headers to be included in each request. Header names and values should be separated by :. Distinct headers should be separated by the config value for ‘http.request.headers.separator’ (defaults to |’). For example: `From:abcxyz@confluent.io|Content-Length:348.

  • Type: string
  • Importance: medium
http.request.parameters

HTTP parameters to be added to each request. Parameter names and values should be separated by =. Distinct parameters should be separated by the config value for ‘http.request.parameters.separator’ ((defaults to &). Parameter values can be templated with offset and entity information (for example: entity=${entityName}&search_after=${offset}) where ${offset} will be substituted with the offset generated from the previous request’s response (or if it’s the first request, from ‘http.initial.offset’), and ${entityName} will be substituted with values from the config ‘entity.names’. The parameters are only set if ‘http.request.method’ = GET.

  • Type: string
  • Importance: medium
http.request.body

The payload to be sent along with each HTTP request. The value can be templated with offset and entity information (for example: {‘entity’: ‘${entityName}’, ‘search_after’: ‘${offset}’) where ${offset} will be substituted with the offset generated from the previous request’s response (or if it’s the first request, from ‘http.initial.offset’), and ${entityName} will be substituted with values from the config ‘entity.names’. The body is only set if ‘http.request.method’ = POST.

  • Type: string
  • Importance: medium
http.initial.offset

The initial offset to be used to generate the first request. This needs to be set if either one or more of the configs - ‘url’, ‘http.request.parameters’, or ‘http.request.body’ contain the template variable ${offset}.

  • Type: string
  • Importance: high
http.offset.mode

This config indicates how offsets are computed and how requests are generated. If set to SIMPLE_INCREMENTING, the ${offset} used to generate requests is simply the previous offset (or http.initial.offset) incremented by 1 per sourced record. In this mode, http.initial.offset needs to be set to an integer value, and http.offset.json.pointer does not need to be configured. If set to CHAINING, the config ‘http.offset.json.pointer’ needs to be set, and the offset for a record is set to the value for the configured key in the response data.

  • Type: string
  • Default: SIMPLE_INCREMENTING
  • Importance: high
http.response.data.json.pointer

The JSON Pointer to the entity in the JSON response containing the actual data that should be written to Kafka as records. The entity can be an array (multiple records in a single response) or an object / scalar value (single record).

  • Type: string
  • Importance: high
http.offset.json.pointer

The JSON Pointer to the value in each record that corresponds to the offset for that record (it is relative to ‘http.response.data.json.pointer’). The offset will be available to the subsequent request as ${offset} and it will also be used for checkpointing and recovery in case of connector failures or restarts. This config should only be set if ‘http.offset.mode’ is set to CHAINING.

  • Type: string
  • Importance: medium
request.interval.ms

The time in milliseconds to wait between consecutive requests.

  • Type: int
  • Default: 15000 (15 seconds)
  • Valid Values: [100,…]
  • Importance: medium
entity.names

A list of entities that should be polled. Values from this list will replace the template variable ${entityName} in the configs ‘topic.name.pattern’, ‘url’, ‘http.request.parameters’, ‘http.request.body’. This config doesn’t need to be set if none of the mentioned configs contain the template variable ${entityName}.

  • Type: list
  • Importance: medium
max.retries

The maximum number of times to retry on errors before failing the task.

  • Type: int
  • Default: 10
  • Importance: medium
retry.backoff.ms

The time in milliseconds to wait following an error before a retry attempt is made.

  • Type: int
  • Default: 3000 (3 seconds)
  • Valid Values: [100,…]
  • Importance: medium
retry.on.status.codes

Comma-separated list of HTTP status codes or range of codes to retry on. Ranges are specified with start and optional end code. Range boundaries are inclusive. For instance, 400- includes all codes greater than or equal to 400. 400-500 includes codes from 400 to 500, including 500. Multiple ranges and single codes can be specified together to achieve fine-grained control over retry behavior. For example, 404,408,500- will retry on 404 NOT FOUND, 408 REQUEST TIMEOUT, and all 5xx error codes. Note that some status codes will always be retried, such as unauthorized, timeouts and too many requests.

  • Type: string
  • Default: 400-
  • Importance: medium
http.request.headers.separator

The character that separates multiple distinct headers in the config ‘http.request.headers’.

  • Type: string
  • Default: |
  • Importance: medium
http.request.parameters.separator

The character that separates multiple distinct request parameters in the config ‘http.request.parameters’.

  • Type: string
  • Default: &
  • Importance: medium

Authentication and SSL

auth.type

Authentication type of the endpoint. Valid values are none, basic, oauth2, bearer (Client Credentials grant type only).

  • Type: string
  • Default: none
  • Importance: high
connection.user

The username to be used with an endpoint requiring basic authentication.

  • Type: string
  • Importance: medium
connection.password

The password to be used with an endpoint requiring basic authentication.

  • Type: password
  • Importance: medium
bearer.token

The bearer authentication token to be used with an endpoint requiring bearer token based authentication.

  • Type: password
  • Importance: medium
oauth2.token.url

The URL to be used for fetching the OAuth2 token. Client Credentials is the only supported grant type.

  • Type: string
  • Importance: medium
oauth2.client.id

The client id used when fetching the OAuth2 token.

  • Type: string
  • Importance: medium
oauth2.client.secret

The client secret used when fetching the OAuth2 token.

  • Type: password
  • Importance: medium
oauth2.token.property

The name of the property containing the OAuth2 token returned by the OAuth2 token URL (defaults to access_token).

  • Type: string
  • Importance: medium
oauth2.client.scope

The scope parameter sent to the service when fetching the OAuth2 token.

  • Type: string
  • Importance: medium
oauth2.client.mode

Specifies how to encode client_id and client_secret in the OAuth2 authorization request. If set to header, the credentials are encoded as an ‘Authorization: Basic <base-64 encoded client_id:client_secret>’ HTTP header. If set to ‘url’, then client_id and client_secret are sent as URL encoded parameters.

  • Type: string
  • Importance: medium
https.ssl.enabled

Whether or not to connect to the endpoint via SSL.

  • Type: boolean
  • Default: false
  • Importance: medium
https.ssl.keystorefile

The key store containing the server certificate.

  • Type: password
  • Importance: low
https.ssl.keystore.password

The store password for the key store file.

  • Type: password
  • Importance: high
https.ssl.key.password

The password for the private key in the key store file.

  • Type: password
  • Importance: high
https.ssl.truststorefile

The trust store containing a server CA certificate.

  • Type: password
  • Importance: high
https.ssl.truststore.password

The trust store password containing a server CA certificate.

  • Type: password
  • Importance: high
https.ssl.protocol

The protocol to use for SSL connections

  • Type: string
  • Default: TLSv1.3
  • Importance: medium

Output messages

output.data.format

Sets the output Kafka record value format. Valid entries are AVRO, JSON_SR, PROTOBUF, or JSON. Note that you need to have Confluent Cloud Schema Registry configured if using a schema-based message format like AVRO, JSON_SR, and PROTOBUF

  • Type: string
  • Importance: high

Number of tasks for this connector

tasks.max
  • Type: int
  • Valid Values: [1,…]
  • Importance: high

Next Steps

See also

For an example that shows fully-managed Confluent Cloud connectors in action with Confluent Cloud ksqlDB, see the Cloud ETL Demo. This example also shows how to use Confluent CLI to manage your resources in Confluent Cloud.

../_images/topology.png