HTTP Source Connector for Confluent Cloud

Tip

Confluent recommends upgrading to version 2 of this connector if you use the OpenAPI spec to avoid future migration issues. For more information, see HTTP Source V2 Connector for Confluent Cloud.

The fully-managed HTTP Source connector for Confluent Cloud periodically polls for data from an HTTP API and then produces records from that source into Apache Kafka®.

Note

If you require private networking for fully-managed connectors, make sure to set up the proper networking beforehand. For more information, see Manage Networking for Confluent Cloud Connectors.

Features

The HTTP Source connector supports the following features:

  • Offset modes: The connector supports the following modes:

    • SIMPLE_INCREMENTING: For the first request, the connector computes the offset based on the initial offset set in the http.initial.offset property. For subsequent requests, the connector increments the offset by the number of records in the previous response. For instance, the connector increments the offset by one for an object type response that indicates a single record. For an array type response, the connector increments the offset by the length of the array.

    • CHAINING: For the first request, the connector computes the offset based on the initial offset set in the http.initial.offset property. Upon receiving a successful response to the first request, the connector parses the response and generates one or more records (depending on whether the JSON response is an object or an array) with the offset value set to the value from the response at the path configured in the http.offset.json.pointer configuration property.

    • CURSOR_PAGINATION: In this mode, the property http.next.page.json.pointer is used to configure the offset value. The offset for the last record in each page is set to the next page value.

    For additional information, see Offset Mode Use Case Examples.

  • At least once delivery: The connector guarantees that records are delivered at least once to the Kafka topic.

  • Supports multiple tasks: The connector supports running one or more tasks. More tasks may improve performance when multiple entities are configured.

  • Supported output data formats: The connector supports Avro, JSON Schema (JSON-SR), Protobuf, and JSON (schemaless) output record value formats. Schema Registry must be enabled to use a Schema Registry-based format (for example, Avro, JSON Schema, or Protobuf).

For more information and examples to use with the Confluent Cloud API for Connect, see the Confluent Cloud API for Connect Usage Examples section.

Limitations

Be sure to review the following information.

Quick Start

Use this quick start to get up and running with the Confluent Cloud HTTP Source connector. The quick start provides the basics of selecting the connector and configuring it to get data from an HTTP API.

Prerequisites
  • Authorized access to a Confluent Cloud cluster on Amazon Web Services (AWS), Microsoft Azure (Azure), or Google Cloud.

  • The Confluent CLI installed and configured for the cluster. See Install the Confluent CLI.

  • Authorized access to an HTTP or HTTPS endpoint.

  • Schema Registry must be enabled to use a Schema Registry-based format (for example, Avro, JSON_SR (JSON Schema), or Protobuf).

Using the Confluent Cloud Console

Step 1: Launch your Confluent Cloud cluster

To create and launch a Kafka cluster in Confluent Cloud, see Create a kafka cluster in Confluent Cloud.

Step 2: Add a connector

In the left navigation menu, click Connectors. If you already have connectors in your cluster, click + Add connector.

Step 3: Select your connector

Click the HTTP Source connector card.

HTTP Source Connector Card

Note

  • Make sure you have all your prerequisites completed.

  • An asterisk ( * ) designates a required entry.

  • The steps provide information about how to use the required configuration properties. See Configuration Properties for other configuration property values and descriptions.

At the Add HTTP Source Connector screen, complete the following:

  1. Select the way you want to provide Kafka Cluster credentials. You can choose one of the following options:

    • My account: This setting allows your connector to globally access everything that you have access to. With a user account, the connector uses an API key and secret to access the Kafka cluster. This option is not recommended for production.

    • Service account: This setting limits the access for your connector by using a service account. This option is recommended for production.

    • Use an existing API key: This setting allows you to specify an API key and a secret pair. You can use an existing pair or create a new one. This method is not recommended for production environments.

    Note

    Freight clusters support only service accounts for Kafka authentication.

  2. Click Continue.

  1. Configure the authentication properties:

    • HTTP(S) URL: The HTTP API URL. You can template the URL property with offset and entity information. For example: http://example.com/api/v1/${entityName}/${offset}. The connector substitutes ${offset} with the offset generated from the previous request’s response. If the request is the first one, the connector uses http.initial.offset. The connector substitutes ${entityName} with the property values from the connector configuration property Entity Names (entity.names). For additional information, see HTTP Source Connector limitations.

    • Endpoint Authentication Type: Select one of the authentication options:

      • basic: Enter a username and password for basic authentication.

      • bearer: For endpoints requiring client bearer token authentication, you must provide the token.

      • none: No endpoint authentication.

      • oauth2: Enter the following options when OAuth2 is selected:

        • OAuth2 token url: The URL to be used for fetching the OAuth2 token in the field. Client Credentials is the only supported grant type.

        • OAuth2 Client ID: The client ID used when fetching the OAuth2 token.

        • OAuth2 Client Secret: The secret used when fetching the OAuth2 token.

        • OAuth2 Token Property Name: The name of the property containing the OAuth2 token returned by the OAuth2 token URL. Defaults to access_token.

        • OAuth2 Client Scope: The scope parameter sent when fetching the OAuth2 token.

        • OAuth2 Client Mode: Specifies how to encode client_id and client_secret in the OAuth2 authorization request. If set to header, the credentials are encoded as an Authorization: Basic <base-64 encodedclient_id:client_secret> HTTP header. If set to url, then client_id and client_secret are sent as URL- encoded parameters.

      • SSL Enabled: Defaults to false. If you set this property to true, you must upload the Key store and/ or Trust store files. You must also supply the required passwords for each uploaded file.

    • Auth Username: The username to be used with an endpoint requiring basic authentication.

    • Auth Password: The password to be used with an endpoint requiring basic authentication.

    • Bearer Token: The bearer authentication token to be used with an endpoint requiring bearer token based authentication.

    • OAuth2 Token URL: The URL to be used for fetching the OAuth2 token. Client Credentials is the only supported grant type.

    • OAuth2 Client ID: The client id used when fetching the OAuth2 token.

    • OAuth2 Client Secret: The client secret used when fetching the OAuth2 token.

    • OAuth2 Token Property Name: The name of the property containing the OAuth2 token returned by the OAuth2 token URL (defaults to access_token).

    • OAuth2 Client Scope: The scope parameter sent to the service when fetching the OAuth2 token.

    • OAuth2 Client Mode: Specifies how to encode client_id and client_secret in the OAuth2 authorization request. If set to header, the credentials are encoded as an ‘Authorization: Basic <base-64 encoded client_id:client_secret>’ HTTP header. If set to ‘url’, then client_id and client_secret are sent as URL encoded parameters.

    • SSL Enabled: Whether or not to connect to the endpoint via SSL.

    • Key Store: The key store containing the server certificate.

    • Keystore Password: The store password for the key store file.

    • Key Password: The password for the private key in the key store file.

    • Trust Store: The trust store containing a server CA certificate.

    • Trust Store Password: The trust store password containing a server CA certificate.

    • SSL Protocol: The protocol to use for SSL connections

  2. Click Continue.

  • Topic Name Pattern: Add the Topic Name Pattern. The name of the Kafka topic where the connector publishes data. If you want to use a separate topic for each entity, you can use the template variable ${entityName}. For example, a valid value for this property is Elasticsearch_Index_${entityName}. The variable ${entityName} is replaced with values from the configuration property Entity Names (entity.names).

  • HTTP Request Method: Select the HTTP Request Method. Defaults to GET. For GET you add parameters to the HTTP Request Parameters property. For POST, add the JSON payload request in the HTTP Request Body property.

  • HTTP Request Headers: The HTTP headers included in each request. Header names and values must be separated by :. Distinct headers must be separated by the http.request.headers.separator property value (defaults to |). For example: From:abcxyz@confluent.io|Content-Length:348.

  • HTTP Request Parameters: The HTTP parameters added to each request. Used with the GET request method only. Parameter names and values should be separated by =. Distinct parameters should be separated by the http.request.parameters.separator property (defaults to &). Parameter values can be templated with offset and entity information. For example: entity=${entityName}&search_after=${offset}, where ${offset} is substituted with the offset generated from the previous request’s response. Or, if it’s the first request, the offset comes from the http.initial.offset property. ${entityName} is substituted with values from the entity.names property.

  • HTTP Request Body: The JSON payload parameters sent with each HTTP request. Used with the POST request method only. The value can be templated with offset and entity information. For example: {'entity': '${entityName}', 'search_after': '${offset}', where ${offset} is substituted with the offset generated from the previous request’s response. Or, if it’s the first request, the offset comes from the http.initial.offset property. ${entityName} is substituted with values from the entity.names property.

  • HTTP Initial Offset: The initial offset used to generate the first request. You must set this property value if url, http.request.parameters, or http.request.body contains the template variable ${offset}.

  • HTTP Offset Mode: Indicates how the connector computes offsets and generates requests. If set to SIMPLE_INCREMENTING, the ${offset} used to generate requests is the previous offset (or http.initial.offset) incremented by one per-sourced record. If the response is a JSON object indicating a single record, the offset is incremented by one, but if the response is a JSON array of n records, the offset increments by n. In SIMPLE_INCREMENTING mode, you set http.initial.offset to an integer value and the property http.offset.json.pointer is not configured. If you set this property to CHAINING, you must also set the property http.offset.json.pointer and the offset for a record is set to the value for the configured key in the response data. In CURSOR_PAGINATION mode, you set http.next.page.json.pointer to configure the path in each JSON response that can be used as the offset value/next page reference. The offset for the last record in each page is set to the next page value. For additional information, see Offset Mode Use Case Examples.

  • HTTP Response Data JSON Pointer: The JSON Pointer to the entity in the JSON response containing the actual data the connector writes to Kafka as records. The entity can be an array (multiple records in a single response) or an object/scalar value (single record).

  • HTTP Offset JSON Pointer: The JSON Pointer to the value in each record that corresponds to the offset for that record (relative to the http.response.data.json.pointer value). The offset is available to the subsequent request as ${offset} and is also used for checkpointing and recovery in case the connector fails and restarts. Used when HTTP Offset Mode is set to CHAINING.

  • Use value of HTTP Offset JSON Pointer as string: By default the value of ‘http.response.data.json.pointer’ will be converted to string before using it as offset. Set this to false if this conversion is not needed.

  • HTTP Next Page JSON Pointer: The JSON Pointer to the value in the response that corresponds to the next page reference (either a page token, a full URL, or a URL fragment). This is stored as the offset and is available to the subsequent request via the template variable ${offset}. You set this property only if HTTP Offset Mode is set to CURSOR_PAGINATION. Note that the next page reference in each JSON response must be a JSON string. For instance, the reference cannot be a JSON array, object, or number.

  • Request Interval (ms): The time in milliseconds (ms) to wait between consecutive requests.

  • Sensitive HTTP Headers: Sensitive HTTP headers (eg: credentials) to be included in all requests. Individual headers should be separated by the Header Separator

Output messages

  • Select output record value format: Select the output record value format (data going to the Kafka topic): AVRO, JSON, JSON_SR (JSON Schema), or PROTOBUF. Schema Registry must be enabled to use a Schema Registry-based format (for example, Avro, JSON Schema, or Protobuf). For additional information, see Schema Registry Enabled Environments .

Show advanced configurations
  • Schema context: Select a schema context to use for this connector, if using a schema-based data format. This property defaults to the Default context, which configures the connector to use the default schema set up for Schema Registry in your Confluent Cloud environment. A schema context allows you to use separate schemas (like schema sub-registries) tied to topics in different Kafka clusters that share the same Schema Registry environment. For example, if you select a non-default context, a Source connector uses only that schema context to register a schema and a Sink connector uses only that schema context to read from. For more information about setting up a schema context, see What are schema contexts and when should you use them?.

  • Entity Names: A list of entities that the connector polls. Values from this list replace the template variable ${entityName} in the configuration properties topic.name.pattern, url, http.request.parameters, and http.request.body. You do not set this property if none of the referenced configuration properties contain the template variable ${entityName}.

  • Maximum Retries: The maximum number of times the connector retries a request when an error occurs, before the task fails.

  • Retry Backoff (ms): The time in ms to wait following an error before the connector retries the task.

  • Retry HTTP Status Codes: The HTTP error codes returned that prompt the connector to retry the request. Enter a comma- separated list of codes or range of codes. Ranges are specified with a start and optional end code. Range boundaries are inclusive. For example: 400- includes all codes greater than or equal to 400 and 400-500 includes codes from 400 to 500, including 500. Multiple ranges and single codes can be specified together to achieve fine-grained control over retry behavior. For example: 404,408,500- prompts the connector to retry on 404 NOT FOUND, 408 REQUEST TIMEOUT, and all 5xx error codes. Note that some status codes are always retried, such as unauthorized, timeouts, and too many requests.

  • HTTP Request Headers Separator: The character that separates multiple distinct request headers in the configuration property http.request.headers.

  • HTTP Request Parameters Separator: The character that separates multiple distinct request parameters in the configuration property http.request.parameters.

Auto-restart policy

  • Enable Connector Auto-restart: Control the auto-restart behavior of the connector and its task in the event of user-actionable errors. Defaults to true, enabling the connector to automatically restart in case of user-actionable errors. Set this property to false to disable auto-restart for failed connectors. In such cases, you would need to manually restart the connector.

Additional Configs

  • Value Converter Reference Subject Name Strategy: Set the subject reference name strategy for value. Valid entries are DefaultReferenceSubjectNameStrategy or QualifiedReferenceSubjectNameStrategy. Note that the subject reference name strategy can be selected only for PROTOBUF format with the default strategy being DefaultReferenceSubjectNameStrategy.

  • Value Converter Decimal Format: Specify the JSON/JSON_SR serialization format for Connect DECIMAL logical type values with two allowed literals: BASE64 to serialize DECIMAL logical types as base64 encoded binary data and NUMERIC to serialize Connect DECIMAL logical type values in JSON/JSON_SR as a number representing the decimal value.

  • Key Converter Schema ID Serializer: The class name of the schema ID serializer for keys. This is used to serialize schema IDs in the message headers.

  • Value Converter Connect Meta Data: Allow the Connect converter to add its metadata to the output schema. Applicable for Avro Converters.

  • Value Converter Value Subject Name Strategy: Determines how to construct the subject name under which the value schema is registered with Schema Registry.

  • Key Converter Key Subject Name Strategy: How to construct the subject name for key schema registration.

  • Value Converter Schema ID Serializer: The class name of the schema ID serializer for values. This is used to serialize schema IDs in the message headers.

Transforms

Processing position

  • Set offsets: Click Set offsets to define a specific offset for this connector to begin procession data from. For more information on managing offsets, see Manage offsets.

  • Click Continue.

  1. Based on the number of topic partitions you select, you will be provided with a recommended number of tasks.

  2. Click Continue.

  1. Verify the connection details by previewing the running configuration.

  2. Once you’ve validated that the properties are configured to your satisfaction, click Launch.

    The status for the connector should go from Provisioning to Running.

Step 5: Check for records

Verify that records are being produced in the Kafka topic.

For more information and examples to use with the Confluent Cloud API for Connect, see the Confluent Cloud API for Connect Usage Examples section.

Using the Confluent CLI

To set up and run the connector using the Confluent CLI, complete the following steps.

Note

Make sure you have all your prerequisites completed.

Step 1: List the available connectors

Enter the following command to list available connectors:

confluent connect plugin list

Step 2: List the connector configuration properties

Enter the following command to show the connector configuration properties:

confluent connect plugin describe <connector-plugin-name>

The command output shows the required and optional configuration properties.

Step 3: Create the connector configuration file

Create a JSON file that contains the connector configuration properties. The following example shows the required connector properties.

{
  "connector.class": "HttpSource",
  "name": "HttpSourceConnector_0",
  "kafka.api.key": "****************",
  "kafka.api.secret": "*********************************",
  "topic.name.pattern": "${entityName}",
  "entity.names": "AppName, User",
  "url": "http://example.com/api/v1/${entityName}/${offset}",
  "http.initial.offset": "1000",
  "http.offset.mode": "SIMPLE_INCREMENTING",
  "output.data.format": "JSON",
  "tasks.max": "1",
}

Note the following property definitions:

  • "connector.class": Identifies the connector plugin name.

  • "name": Sets a name for your new connector.

  • "kafka.auth.mode": Identifies the connector authentication mode you want to use. There are two options: SERVICE_ACCOUNT or KAFKA_API_KEY (the default). To use an API key and secret, specify the configuration properties kafka.api.key and kafka.api.secret, as shown in the example configuration (above). To use a service account, specify the Resource ID in the property kafka.service.account.id=<service-account-resource-ID>. To list the available service account resource IDs, use the following command:

    confluent iam service-account list
    

    For example:

    confluent iam service-account list
    
       Id     | Resource ID |       Name        |    Description
    +---------+-------------+-------------------+-------------------
       123456 | sa-l1r23m   | sa-1              | Service account 1
       789101 | sa-l4d56p   | sa-2              | Service account 2
    
  • "topic.name.pattern": The name of the Kafka topic where the connector publishes data. If you want to use a separate topic for each entity, you can use the template variable ${entityName}. For example, a valid value for this property is Elasticsearch_Index_${entityName}. The variable ${entityName} is replaced with values from the configuration property entity.names.

  • "entity.names": A list of entities that the connector polls.

  • "url": The HTTP API URL. You can template the URL property with offset and entity information. For example: http://example.com/api/v1/${entityName}/${offset}. The connector substitutes ${offset} with the offset generated from the previous request’s response. If the request is the first one, the connector uses http.initial.offset. The connector substitutes ${entityName} with the property values from the connector configuration property entity.names. For additional information, see HTTP Source Connector limitations.

  • "http.initial.offset": The initial offset used to generate the first request. You must set this property value if url, http.request.parameters, or http.request.body contains the template variable ${offset}.

  • "http.offset.mode": Indicates how offsets are computed and how requests are generated. If set to SIMPLE_INCREMENTING, the ${offset} used to generate requests is the previous offset (or http.initial.offset) incremented by one sourced record. In this mode, you set http.initial.offset to an integer value and the property http.offset.json.pointer is not configured. If set to CHAINING, you must set the property http.offset.json.pointer and the offset for a record is set to the value for the configured key in the response data. In CURSOR_PAGINATION mode, you set http.next.page.json.pointer to configure the path in each JSON response that can be used as the offset value/next page reference. The offset for the last record in each page is set to the next page value. Note that the next page reference in each JSON response must be a JSON string. For instance, the reference cannot be a JSON array, object, or number. For additional information, see Offset Mode Use Case Examples.

  • "output.data.format": Set the output record value format (data going to the Kafka topic): AVRO, JSON, JSON_SR (JSON Schema), or PROTOBUF. Schema Registry must be enabled to use a Schema Registry-based format (for example, Avro, JSON Schema, or Protobuf). For additional information, see Schema Registry Enabled Environments .

  • "tasks.max": The connector supports running multiple tasks per connector. More tasks may improve performance.

Single Message Transforms:

See the Single Message Transforms (SMT) documentation for details about adding SMTs using the CLI.

Important

For TLS connections, you must supply the keystore and/or truststore file contents and the file passwords when creating the connector configuration JSON. The truststore and keystore files are binary files. For the https.ssl.keystorefile and the https.ssl.truststorefile properties, you encode the truststore or keystore file in base64, take the encoded string, add the data:text/plain:base64 prefix, and then use the entire string as the property entry. For example:

"https.ssl.keystorefile" : "data:text/plain;base64,/u3+7QAAAAIAAAACAAAAAQAGY2xpZ...omitted...==",
"https.ssl.keystore.password" : "<password>",
"https.ssl.key.password" : "<private-key-password>"

For all property values and descriptions, see Configuration Properties.

Step 3: Load the properties file and create the connector

Enter the following command to load the configuration and start the connector:

confluent connect cluster create --config-file <file-name>.json

For example:

confluent connect cluster create --config-file http-source-config.json

Example output:

Created connector HttpSourceConnector_0 lcc-do6vzd

Step 4: Check the connector status.

Enter the following command to check the connector status:

confluent connect cluster list

Example output:

ID           |             Name            | Status  | Type   | Trace
+------------+-----------------------------+---------+--------+-------+
lcc-do6vzd   | HttpSourceConnector_0       | RUNNING | source |       |

Step 5: Check the Kafka topic

Verify that records are being produced at the Kafka topic.

For more information and examples to use with the Confluent Cloud API for Connect, see the Confluent Cloud API for Connect Usage Examples section.

Offset Mode Use Case Examples

The following examples show how the connector polls JSON data from various HTTP-based APIs using the following different HTTP Offset Mode (http.offset.mode) configuration properties:

SIMPLE_INCREMENTING offset mode with the Atlassian Confluence Cloud REST API

The Atlassian Confluence Cloud REST API provides the ability to query Atlassian Confluence Cloud resources using GET requests. This GET API supports the following two query parameters:

  • start: The index of the first item returned in the page of results. The base index is 0.

  • limit: The number of objects to return per page. This may be restricted by system limits.

The following shows an example request and response using the Atlassian Confluence Cloud REST API.

Request

GET /wiki/rest/api/space?start=0&limit=1
Response body
{
  "results": [
    {
      "id": 12345657891,
      "key": "~11121314",
      "name": "2020 ~ jsmith",
      "type": "personal",
      "status": "current",
      "_expandable": {
        "settings": "/rest/api/space/~11121314/settings",
        "metadata": "",
        "operations": "",
        "lookAndFeel": "/rest/api/settings/lookandfeel?spaceKey=~11121314",
        "identifiers": "",
        "permissions": "",
        "icon": "",
        "description": "",
        "theme": "/rest/api/space/~823923446/theme",
        "history": "",
        "homepage": "/rest/api/content/151617181920"
      },
      "_links": {
        "webui": "/spaces/~11121314",
        "self": "https://confluentinc.atlassian.net/wiki/rest/api/space/~11121314"
      }
    }
  ],
  "start": 0,
  "limit": 1,
  "size": 1,
  "_links": {
    "base": "https://confluentinc.atlassian.net/wiki",
    "context": "/wiki",
    "next": "/rest/api/space?next=true&limit=1&start=1",
    "self": "https://confluentinc.atlassian.net/wiki/rest/api/space"
  }
}

The following example connector configuration snippet shows the properties used to continuously poll data from the Atlassian Confluence Cloud REST API. Note that this REST API supports basic authentication using a username and password.

"url": "https://confluentinc.atlassian.net/wiki/rest/api/space",
"http.request.parameters": "start=${offset}&limit=1",
"http.offset.mode": "SIMPLE_INCREMENTING",
"http.initial.offset": "0",
"http.response.data.json.pointer": "/results",
"auth.type": "basic",
"connection.user": "username",
"connection.password": "*****"

Using the configuration above, the connector makes the initial GET request to https://confluentinc.atlassian.net/wiki/rest/api/space?start=0&limit=1 and then the next request to https://confluentinc.atlassian.net/wiki/rest/api/space?start=1&limit=1, and so on. Once the start parameter begins referencing records which aren’t yet created, the Confluence Cloud REST API returns an empty array in /results. The connector parses this response and finds 0 records to produce. The same request is repeated until the connector finds new records. Note that the offset does not advance until records are found.

CHAINING offset mode with the Elasticsearch Search REST API

Elasticsearch has a Search REST API that returns documents that match the query defined in the request. The Search API supports query parameters from and size to page through results. The Search API and these parameters do not support paging through more than 10,000 documents. To support these use cases, the Search API provides the parameter search_after that retrieves the next page of documents using a set of sort values from the previous page.

The following shows an example request and response using the Search REST API.

Request

POST /test-index/_search

Request body

{
    "size": 100,
    "sort": [
        {
            "time": "asc"
        }
    ],
    "search_after": [
        1647948089978
    ]
}
Response body
{
  "took": 614,
  "timed_out": false,
  "_shards": {
    "total": 1,
    "successful": 1,
    "skipped": 0,
    "failed": 0
  },
  "hits": {
    "total": {
      "value": 3,
      "relation": "eq"
    },
    "max_score": null,
    "hits": [
      {
        "_index": "test-index",
        "_id": "...",
        "_score": null,
        "_source": {
          "name": "Name1",
          "time": "1647948089979"
        },
        "sort": [
          1647948089979
        ]
      },
      {
        "_index": "test-index",
        "_id": "...",
        "_score": null,
        "_source": {
          "name": "Name2",
          "time": "1647948092349"
        },
        "sort": [
          1647948092349
        ]
      },
      {
        "_index": "test-index",
        "_id": "...",
        "_score": null,
        "_source": {
          "name": "Name3",
          "time": "1647948097246"
        },
        "sort": [
          1647948097246
        ]
      }
    ]
  }
}

The following example connector configuration snippet shows the properties used to continuously poll data from the Elasticsearch Search REST API. Note that Elasticsearch REST APIs support basic authentication using a username and password.

"url": "http://domain/test-index/_search",
"http.offset.mode": "CHAINING",
"http.request.body": "{"size": 100, "sort": [{"@time": "asc"}], "search_after": [${offset}]}",
"http.request.method": "POST",
"http.initial.offset": "1647948000000",
"http.response.data.json.pointer": "/hits/hits",
"http.offset.json.pointer": "/sort/0",
"auth.type": "basic",
"connection.user": "username",
"connection.password": "*****"

In the example above, the connector makes the initial POST request to http://domain/test-index/_search with request body {"size": 100, "sort":[{"@time": "asc"}], "search_after": [1647948000000]}. Using the response example above, the connector sends the next request with request body {"size":100, "sort": [{"@time": "asc"}], "search_after": [1647948097246]}, and so on.

Note that if the size parameter isn’t set to a value large enough, connector data loss can occur if the number of documents with the same sort parameter value exceeds the size value.

CURSOR_PAGINATION offset mode with the GCS List Objects REST API

The Google Cloud Storage (GCS) List Objects REST API allows listing all the objects in a GCS bucket in paginated form. The API takes the query parameter pageToken to denote the page of responses that need to be returned.

The following shows an example request and response using the Google Cloud Storage (GCS) List Objects REST API.

Request

GET storage/v1/b/{bucket_name}/o?pageToken={page_token}
Response body
{
  "kind": "storage#objects",
  "nextPageToken": "CkV0b3BpY3MvZGVtby9MS9ob3V...=",
  "items": [
    {
      "kind": "storage#object",
      "id": "{object_id}",
      "selfLink": "https://www.googleapis.com/storage/v1/b/{bucket_name}/o/{object_id}",
      "mediaLink": "https://storage.googleapis.com/download/storage/v1/b/{bucket_name}/o/{object_id}",
      "name": "{object_name}",
      "bucket": "{bucket_id}",
      "generation": "1669124982018947",
      "metageneration": "1",
      "contentType": "font/otf",
      "storageClass": "STANDARD",
      "size": "145520",
      "md5Hash": "...",
      "crc32c": "...",
      "etag": "...",
      "timeCreated": "2022-11-22T13:49:42.022Z",
      "updated": "2022-11-22T13:49:42.022Z",
      "timeStorageClassUpdated": "2022-11-22T13:49:42.022Z"
     },
     ....

The following example connector configuration snippet shows the properties used to get paginated data from the GCS List Objects REST API.

"url": "https://storage.googleapis.com/storage/v1/b/test-bucket/o",
"http.offset.mode": "CURSOR_PAGINATION",
"http.request.method": "GET",
"http.request.parameters": "pageToken=${offset}",
"http.response.data.json.pointer": "/items",
"http.next.page.json.pointer": "/nextPageToken",
"auth.type": "bearer",
"bearer.token": "*****"

In the example above, the connector makes the initial GET request to https://storage.googleapis.com/storage/v1/b/test-bucket/o?pageToken= which returns the first page of results. Using the response example above, the connector parses and produces the records to Kafka. The connector then makes a subsequent request to https://storage.googleapis.com/storage/v1/b/test-bucket/o?pageToken=CkV0b3BpY3MvZGVtby9MS9ob3V...=, and so on.

Configuration Properties

Use the following configuration properties with the fully-managed connector. For self-managed connector property definitions and other details, see the connector docs in Self-managed connectors for Confluent Platform.

How should we connect to your data?

name

Sets a name for your connector.

  • Type: string

  • Valid Values: A string at most 64 characters long

  • Importance: high

Kafka Cluster credentials

kafka.auth.mode

Kafka Authentication mode. It can be one of KAFKA_API_KEY or SERVICE_ACCOUNT. It defaults to KAFKA_API_KEY mode, whenever possible.

  • Type: string

  • Valid Values: SERVICE_ACCOUNT, KAFKA_API_KEY

  • Importance: high

kafka.api.key

Kafka API Key. Required when kafka.auth.mode==KAFKA_API_KEY.

  • Type: password

  • Importance: high

kafka.service.account.id

The Service Account that will be used to generate the API keys to communicate with Kafka Cluster.

  • Type: string

  • Importance: high

kafka.api.secret

Secret associated with Kafka API key. Required when kafka.auth.mode==KAFKA_API_KEY.

  • Type: password

  • Importance: high

Which topic(s) do you want to send data to?

topic.name.pattern

The name of the Kafka topic to publish data to. The value can contain a template variable ${entityName} in case a separate Kafka topic should be used for each entity; the variable ${entityName} will be replaced with values from the config ‘entity.names’.

  • Type: string

  • Importance: high

Schema Config

schema.context.name

Add a schema context name. A schema context represents an independent scope in Schema Registry. It is a separate sub-schema tied to topics in different Kafka clusters that share the same Schema Registry instance. If not used, the connector uses the default schema configured for Schema Registry in your Confluent Cloud environment.

  • Type: string

  • Default: default

  • Importance: medium

Connection details

url

The HTTP API URL which can be templated with offset and entity information. For example: http://example.com/api/v1/${entityName}/${offset} where ${offset} will be substituted with the offset generated from the previous request’s response (or if it’s the first request, from ‘http.initial.offset’), and ${entityName} will be substituted with values from the config entity.names.

  • Type: string

  • Importance: high

http.request.method

HTTP Request Method. Valid options are GET and POST.

  • Type: string

  • Default: GET

  • Importance: high

http.request.headers

HTTP headers to be included in each request. Header names and values should be separated by :. Distinct headers should be separated by the config value for ‘http.request.headers.separator’ (defaults to |’). For example: `From:abcxyz@confluent.io|Content-Length:348.

  • Type: string

  • Importance: medium

http.request.parameters

HTTP parameters to be added to each request. Parameter names and values should be separated by =. Distinct parameters should be separated by the config value for ‘http.request.parameters.separator’ ((defaults to &). Parameter values can be templated with offset and entity information (for example: entity=${entityName}&search_after=${offset}) where ${offset} will be substituted with the offset generated from the previous request’s response (or if it’s the first request, from ‘http.initial.offset’), and ${entityName} will be substituted with values from the config ‘entity.names’. The parameters are only set if ‘http.request.method’ = GET.

  • Type: string

  • Importance: medium

http.request.body

The payload to be sent along with each HTTP request. The value can be templated with offset and entity information (for example: {‘entity’: ‘${entityName}’, ‘search_after’: ‘${offset}’) where ${offset} will be substituted with the offset generated from the previous request’s response (or if it’s the first request, from ‘http.initial.offset’), and ${entityName} will be substituted with values from the config ‘entity.names’. The body is only set if ‘http.request.method’ = POST.

  • Type: string

  • Importance: medium

http.initial.offset

The initial offset to be used to generate the first request. This needs to be set if either one or more of the configs - ‘url’, ‘http.request.parameters’, or ‘http.request.body’ contain the template variable ${offset}.

  • Type: string

  • Importance: high

http.offset.mode

This config indicates how offsets are computed and how requests are generated. If set to SIMPLE_INCREMENTING, the ${offset} used to generate requests is simply the previous offset (or http.initial.offset) incremented by 1 per sourced record. In this mode, http.initial.offset needs to be set to an integer value. If set to CHAINING, the config ‘http.offset.json.pointer’ needs to be set, and the offset for a record is set to the value for the configured key in the response data. If the value is CURSOR_PAGINATION, then the config ‘http.next.page.json.pointer’ needs to be set and the offset for the last record in each page will be set to the next page value.

  • Type: string

  • Default: SIMPLE_INCREMENTING

  • Importance: high

http.response.data.json.pointer

The JSON Pointer to the entity in the JSON response containing the actual data that should be written to Kafka as records. The entity can be an array (multiple records in a single response) or an object / scalar value (single record).

  • Type: string

  • Importance: high

http.offset.json.pointer

The JSON Pointer to the value in each record that corresponds to the offset for that record (it is relative to ‘http.response.data.json.pointer’). The offset will be available to the subsequent request as ${offset} and it will also be used for checkpointing and recovery in case of connector failures or restarts. This config should only be set if ‘http.offset.mode’ is set to CHAINING.

  • Type: string

  • Importance: medium

use.http.offset.json.pointer.as.string

By default the value of ‘http.response.data.json.pointer’ will be converted to string before using it as offset. Set this to false if this conversion is not needed.

  • Type: boolean

  • Default: true

  • Importance: medium

http.next.page.json.pointer

The JSON pointer to the value in the response which corresponds to the next page reference (either a page token, a full URL or a URL fragment). This will be stored as the offset and will be available to the subsequent request via the template variable ${offset}. This config should only be set if ‘http.offset.mode’ is set to CURSOR_PAGINATION.

  • Type: string

  • Importance: medium

request.interval.ms

The time in milliseconds to wait between consecutive requests.

  • Type: int

  • Default: 15000 (15 seconds)

  • Valid Values: [100,…]

  • Importance: medium

entity.names

A list of entities that should be polled. Values from this list will replace the template variable ${entityName} in the configs ‘topic.name.pattern’, ‘url’, ‘http.request.parameters’, ‘http.request.body’. This config doesn’t need to be set if none of the mentioned configs contain the template variable ${entityName}.

  • Type: list

  • Importance: medium

max.retries

The maximum number of times to retry on errors before failing the task.

  • Type: int

  • Default: 10

  • Importance: medium

retry.backoff.ms

The time in milliseconds to wait following an error before a retry attempt is made.

  • Type: int

  • Default: 3000 (3 seconds)

  • Valid Values: [100,…]

  • Importance: medium

retry.on.status.codes

Comma-separated list of HTTP status codes or range of codes to retry on. Ranges are specified with start and optional end code. Range boundaries are inclusive. For instance, 400- includes all codes greater than or equal to 400. 400-500 includes codes from 400 to 500, including 500. Multiple ranges and single codes can be specified together to achieve fine-grained control over retry behavior. For example, 404,408,500- will retry on 404 NOT FOUND, 408 REQUEST TIMEOUT, and all 5xx error codes. Note that some status codes will always be retried, such as unauthorized, timeouts and too many requests.

  • Type: string

  • Default: 400-

  • Importance: medium

http.request.headers.separator

The character that separates multiple distinct headers in the config ‘http.request.headers’.

  • Type: string

  • Default: |

  • Importance: medium

http.request.sensitive.headers

Sensitive HTTP headers (eg: credentials) to be included in all requests. Individual headers should be separated by the Header Separator

  • Type: password

  • Importance: high

http.request.parameters.separator

The character that separates multiple distinct request parameters in the config ‘http.request.parameters’.

  • Type: string

  • Default: &

  • Importance: medium

Authentication and SSL

auth.type

Authentication type of the endpoint. Valid values are none, basic, oauth2, bearer (Client Credentials grant type only).

  • Type: string

  • Default: none

  • Importance: high

connection.user

The username to be used with an endpoint requiring basic authentication.

  • Type: string

  • Importance: medium

connection.password

The password to be used with an endpoint requiring basic authentication.

  • Type: password

  • Importance: medium

bearer.token

The bearer authentication token to be used with an endpoint requiring bearer token based authentication.

  • Type: password

  • Importance: medium

oauth2.token.url

The URL to be used for fetching the OAuth2 token. Client Credentials is the only supported grant type.

  • Type: string

  • Importance: medium

oauth2.client.id

The client id used when fetching the OAuth2 token.

  • Type: string

  • Importance: medium

oauth2.client.secret

The client secret used when fetching the OAuth2 token.

  • Type: password

  • Importance: medium

oauth2.token.property

The name of the property containing the OAuth2 token returned by the OAuth2 token URL (defaults to access_token).

  • Type: string

  • Importance: medium

oauth2.client.scope

The scope parameter sent to the service when fetching the OAuth2 token.

  • Type: string

  • Importance: medium

oauth2.client.mode

Specifies how to encode client_id and client_secret in the OAuth2 authorization request. If set to header, the credentials are encoded as an ‘Authorization: Basic <base-64 encoded client_id:client_secret>’ HTTP header. If set to ‘url’, then client_id and client_secret are sent as URL encoded parameters.

  • Type: string

  • Importance: medium

https.ssl.enabled

Whether or not to connect to the endpoint via SSL.

  • Type: boolean

  • Default: false

  • Importance: medium

https.ssl.keystorefile

The key store containing the server certificate.

  • Type: password

  • Importance: low

https.ssl.keystore.password

The store password for the key store file.

  • Type: password

  • Importance: high

https.ssl.key.password

The password for the private key in the key store file.

  • Type: password

  • Importance: high

https.ssl.truststorefile

The trust store containing a server CA certificate.

  • Type: password

  • Importance: high

https.ssl.truststore.password

The trust store password containing a server CA certificate.

  • Type: password

  • Importance: high

https.ssl.protocol

The protocol to use for SSL connections

  • Type: string

  • Default: TLSv1.3

  • Importance: medium

Output messages

output.data.format

Sets the output Kafka record value format. Valid entries are AVRO, JSON_SR, PROTOBUF, or JSON. Note that you need to have Confluent Cloud Schema Registry configured if using a schema-based message format like AVRO, JSON_SR, and PROTOBUF

  • Type: string

  • Importance: high

Number of tasks for this connector

tasks.max

Maximum number of tasks for the connector.

  • Type: int

  • Valid Values: [1,…]

  • Importance: high

Auto-restart policy

auto.restart.on.user.error

Enable connector to automatically restart on user-actionable errors.

  • Type: boolean

  • Default: true

  • Importance: medium

Additional Configs

value.converter.connect.meta.data

Allow the Connect converter to add its metadata to the output schema. Applicable for Avro Converters.

  • Type: boolean

  • Importance: low

key.converter.key.schema.id.serializer

The class name of the schema ID serializer for keys. This is used to serialize schema IDs in the message headers.

  • Type: string

  • Default: io.confluent.kafka.serializers.schema.id.PrefixSchemaIdSerializer

  • Importance: low

key.converter.key.subject.name.strategy

How to construct the subject name for key schema registration.

  • Type: string

  • Default: TopicNameStrategy

  • Importance: low

value.converter.decimal.format

Specify the JSON/JSON_SR serialization format for Connect DECIMAL logical type values with two allowed literals:

BASE64 to serialize DECIMAL logical types as base64 encoded binary data and

NUMERIC to serialize Connect DECIMAL logical type values in JSON/JSON_SR as a number representing the decimal value.

  • Type: string

  • Default: BASE64

  • Importance: low

value.converter.reference.subject.name.strategy

Set the subject reference name strategy for value. Valid entries are DefaultReferenceSubjectNameStrategy or QualifiedReferenceSubjectNameStrategy. Note that the subject reference name strategy can be selected only for PROTOBUF format with the default strategy being DefaultReferenceSubjectNameStrategy.

  • Type: string

  • Default: DefaultReferenceSubjectNameStrategy

  • Importance: low

value.converter.value.schema.id.serializer

The class name of the schema ID serializer for values. This is used to serialize schema IDs in the message headers.

  • Type: string

  • Default: io.confluent.kafka.serializers.schema.id.PrefixSchemaIdSerializer

  • Importance: low

value.converter.value.subject.name.strategy

Determines how to construct the subject name under which the value schema is registered with Schema Registry.

  • Type: string

  • Default: TopicNameStrategy

  • Importance: low

Frequently asked questions

Find answers to frequently asked questions about the HTTP Source connector for Confluent Cloud.

Deployment model and product fit

Can I run HTTP Source as a self-managed connector on my own Connect cluster?

Yes. The HTTP Source connector is available both as a fully-managed connector on Confluent Cloud and as a self-managed connector for Confluent Platform. For self-managed connector documentation, see the connector docs in Self-managed connectors for Confluent Platform.

Tip

Confluent recommends upgrading to HTTP Source V2 Connector for Confluent Cloud if you use the OpenAPI spec to avoid future migration issues.

Networking and connectivity

Why do I see http.api.base.url: Could not connect to the configured host:port?

This error indicates the connector cannot reach the target URL from the Confluent Cloud workers. Common causes include:

  • Networking mismatch: If the cluster is PRIVATE_LINK or uses VPC Peering, outbound traffic may require additional configuration.

  • Firewall restrictions: Confluent egress IP ranges or PrivateLink/VPC routes are not allowlisted in your upstream gateway or firewall.

  • DNS resolution: The hostname cannot be resolved from the Confluent Cloud connector environment.

Checklist:

  1. Check network type: Is your cluster PUBLIC, PRIVATE_LINK, or using VPC_PEERING?

  2. Verify connectivity: For Private Link clusters, ensure the VPC path from Confluent to your endpoint is correctly configured.

  3. Allowlist IPs: Confirm the Confluent egress IPs (or VPC CIDRs) are allowlisted at your API perimeter.

  4. Port check: Ensure port 443 (or your custom port) is open.

  5. Test endpoint: Verify the endpoint is accessible using curl or similar tools from a comparable network path.

Which IPs or CIDRs do I need to allowlist for HTTP Source?

Data egress originates from Confluent-managed infrastructure.

  • For public clusters, use the documented egress IPs for your specific region and cloud provider.

  • For Private Link or VPC Peering clusters, configure the appropriate private networking.

Authentication and OAuth

Which OAuth2 flows are supported? Can I use username/password grants?

The HTTP Source connector supports OAuth2 Client Credentials flow only. The connector obtains an access token from the configured oauth2.token.url and includes it as Authorization: Bearer <token> in API requests.

Not Supported: OAuth2 Resource Owner Password Credentials (username + password) grant type is not implemented.

Configuration example:

{
  "auth.type": "oauth2",
  "oauth2.client.id": "<client-id>",
  "oauth2.client.secret": "<client-secret>",
  "oauth2.token.url": "https://idp.example.com/oauth2/token",
  "oauth2.client.scope": "api.read"
}

Tip

If your OAuth provider does not accept the default scope parameter, set oauth2.client.scope to an empty string using the Confluent CLI or REST API.

Why does OAuth fail with 401 Unauthorized or invalid_client?

This error indicates the OAuth2 token request is failing. Common causes include:

  • Wrong client credentials: Verify oauth2.client.id and oauth2.client.secret are correct.

  • Incorrect oauth2.client.mode: By default, credentials are sent in the Authorization header. If your OAuth provider expects credentials in the request body, set:

    "oauth2.client.mode": "url"
    

    This sends client_id and client_secret as URL-encoded form parameters instead of as a Basic Auth header.

  • Unreachable token URL: The oauth2.token.url may not be accessible from Confluent Cloud egress IPs. Verify the URL is reachable and allowlisted.

  • Invalid scope: Some OAuth providers reject the default oauth2.client.scope value of any. Set an empty string or a valid scope value.

Resolution:

  1. Verify your OAuth provider’s expected authentication method (header vs body).

  2. Test the token request using curl with matching configuration:

    # If oauth2.client.mode=header (default)
    curl -X POST https://idp.example.com/oauth2/token \
      -u "<client-id>:<client-secret>" \
      -d "grant_type=client_credentials&scope=api.read"
    
    # If oauth2.client.mode=url
    curl -X POST https://idp.example.com/oauth2/token \
      -d "client_id=<client-id>&client_secret=<client-secret>&grant_type=client_credentials&scope=api.read"
    
  3. Update connector configuration to match the working curl command.

OAuth token fetch times out with Unable to complete OAuth token request

This error indicates the connector cannot reach the OAuth token URL within the default 30-second timeout:

Unable to complete OAuth token request POST https://idp.example.com/oauth2/token HTTP/1.1 within 30 seconds

Common causes:

  • Network connectivity: The OAuth provider’s token endpoint is not reachable from Confluent Cloud egress IPs.

  • Firewall blocking: The IdP firewall is not allowlisting Confluent egress IP ranges.

  • Slow OAuth provider: The token endpoint takes longer than 30 seconds to respond.

Resolution:

  1. Verify the oauth2.token.url is correct and publicly accessible (or accessible via your Private Link setup).

  2. Confirm Confluent egress IPs are allowlisted at your IdP.

  3. Test the OAuth endpoint from a comparable network path using curl.

Can I use basic authentication instead of OAuth?

Yes. Set auth.type to basic and provide connection.user and connection.password:

{
  "auth.type": "basic",
  "connection.user": "<username>",
  "connection.password": "<password>"
}

The connector sends credentials as an Authorization: Basic <base64-encoded-credentials> header.

Can I use a static bearer token or API key?

Yes. Set auth.type to bearer and provide the token:

{
  "auth.type": "bearer",
  "bearer.token": "<your-static-bearer-token>"
}

The connector sends Authorization: Bearer <token> with each request.

Unlike OAuth, the token is not automatically refreshed. If your token expires, you must manually update the connector configuration.

For API keys that use custom header names (for example, X-API-Key or Api-Token), use auth.type=none and set custom headers:

{
  "auth.type": "none",
  "http.request.headers": "X-API-Key: <your-api-key>|Content-Type: application/json"
}

Curl works but the connector gets 401 Unauthorized. Why?

There are several common causes:

  • Authentication type mismatch: Ensure auth.type matches your API’s expected authentication scheme (basic, bearer, oauth2, or none with custom headers).

  • OAuth2 client mode mismatch: If using OAuth, verify oauth2.client.mode matches how your IdP expects credentials. Default is header; some providers require url.

  • Custom header format: For non-standard authentication headers (for example, Api-Token), use auth.type=none and custom headers instead of bearer.

  • Missing retry configuration: If your endpoint returns 401 after token expiration, ensure 401 is included in retry.on.status.codes so the connector can retry with a refreshed token:

    "retry.on.status.codes": "401,429,500-"
    

SSL/TLS configuration

How do I configure SSL/TLS certificates for HTTPS endpoints?

For HTTPS endpoints, you must supply the keystore or truststore file contents when creating the connector configuration. The truststore and keystore files are binary files and must be encoded in base64.

For the https.ssl.keystorefile and https.ssl.truststorefile properties, encode the file in base64, add the data:text/plain;base64, prefix, and use the entire string as the property value.

Example:

{
  "https.ssl.keystorefile": "data:text/plain;base64,/u3+7QAAAAIAAAACAAAAAQAGY2xpZ...==",
  "https.ssl.keystore.password": "<password>",
  "https.ssl.key.password": "<private-key-password>",
  "https.ssl.truststorefile": "data:text/plain;base64,MIIFazCCA1OgAwIBAgIRAI...==",
  "https.ssl.truststore.password": "<truststore-password>"
}

Note

  • The fully-managed connector supports JKS and PKCS12 formats.

  • If you are connecting to a publicly trusted HTTPS endpoint (for example, with a valid certificate from a known CA), you typically do not need to provide a truststore.

  • For mutual TLS (mTLS), you must provide both keystore (client certificate) and truststore (CA certificates).

Why am I getting SSLHandshakeException or certificate_required errors?

This error indicates an SSL/TLS handshake failure. Common causes include:

  • Missing truststore: The endpoint uses a certificate not signed by a known CA, and you have not provided a truststore.

  • Mutual TLS required: The server requires a client certificate (mTLS), but the connector is not configured with a keystore.

  • Certificate mismatch: The truststore does not contain the correct CA certificate for the endpoint.

  • Expired certificates: The server certificate or CA certificate has expired.

Resolution:

  1. Verify endpoint certificate: Use openssl s_client -connect <host>:<port> -showcerts to inspect the server certificate.

  2. Check truststore: Ensure the truststore contains the CA certificate that signed the server’s certificate.

  3. Configure mTLS: If the server requires client authentication, provide both https.ssl.keystorefile and https.ssl.truststorefile.

  4. Validate passwords: Ensure keystore and truststore passwords are correct.

Offset modes and pagination

What is the difference between the three offset modes?

The connector supports three offset modes:

  • SIMPLE_INCREMENTING: The connector increments the offset by the number of records in the previous response. Use this for APIs that support simple numeric pagination (for example, ?start=0&limit=100, then ?start=100&limit=100).

  • CHAINING: The connector extracts the next offset value from the API response using a JSON pointer. Use this for APIs where each response includes the next offset value (for example, Elasticsearch search_after).

  • CURSOR_PAGINATION: The connector uses a next-page token from the response. Use this for APIs that return a nextPageToken or similar cursor in the response body.

For detailed examples of each mode, see Offset Mode Use Case Examples.

Why is the connector repeatedly fetching the same records?

Common causes:

  • Static parameters: Check if http.request.parameters is hard-coded (for example, start=0). Use the ${offset} template variable instead.

  • Incorrect offset mode: Ensure the offset mode matches your API’s pagination style.

  • API does not support parameter: Verify the downstream API supports the query parameter you are sending.

  • Missing JSON pointer: For CHAINING or CURSOR_PAGINATION modes, ensure the JSON pointer (http.offset.json.pointer or http.next.page.json.pointer) correctly points to the offset value in the response.

What happens when there are no more records to fetch?

When the API returns an empty response (for example, empty array at http.response.data.json.pointer), the connector pauses and retries the same request after the configured request.interval.ms. The offset does not advance until new records are found.

Connector creation and permissions

Why does my connector fail with Connector startup failed due to invalid Kafka API Key?

This error indicates the Kafka API key used by the connector is invalid or lacks required permissions.

Resolution:

  1. Verify API key: Ensure the API key is valid and associated with an active service account.

  2. Check ACLs: Confirm the service account has required permissions. For source connectors, the service account needs:

    • CREATE and WRITE on the target topics

    • READ and WRITE on internal topics (connect-configs, connect-offsets, connect-status)

  3. Check expiration: API keys are automatically deleted when the associated service account is deleted.

Next Steps

For an example that shows fully-managed Confluent Cloud connectors in action with Confluent Cloud for Apache Flink, see the Cloud ETL Demo. This example also shows how to use Confluent CLI to manage your resources in Confluent Cloud.

../_images/topology.png