Connect API for Confluent Cloud

The Confluent Cloud API for fully-managed connectors allows you to interact with your connector using the Confluent Cloud API. This is a queryable HTTP API. For instance, you can POST a query written in JSON and get back connector information specified by the query. Use the following examples to learn more about using the Confluent Cloud API.

Tip

The example commands in this document show Version 1 of the Confluent Cloud API. See the Confluent Cloud API developer docs for the most recent API version, a complete list of Connect API requests, all response codes, and other details.

Examples

Prerequsites
  • Authorized access to Confluent Cloud.

  • The Confluent Cloud CLI installed and configured.

  • cURL and jq installed to use the API request examples in this document.

  • A Confluent Cloud API key to authenticate with the Confluent Cloud API. Every API request must include the resource key and secret encoded as Base64. Complete the following steps to create the keys and encode them in Base64.

    1. Log in to your Confluent Cloud account.

      ccloud login
      
    2. Create the API key and secret for the cloud resource.

      ccloud api-key create --resource cloud
      

      Important

      You must create a Confluent Cloud API key for --resource cloud to interact with the Confluent Cloud API. Using the Kafka cluster API key created for your Confluent Cloud cluster (that is, --resource <cluster-ID>) results in an authentication error when running the API request.

    3. The Confluent Cloud API uses Basic access authentication. To use the API key and secret, you send them as a header in the form of Authorization: Basic <Base64-credentials>. This form is shown in the curl examples. Enter the following command to display the Base64 encoding for the API key pair.

      echo -n "<api-key>:<secret>" | base64
      

      For example:

      echo -n "EERLEXFPZROVP:z6yyH3LEEWdrAAamfue9mIeTAyocCMjO/oSKzg0UMoXA0x3CXjVglPJHYC/" | base64
      RUVSTEVYRlBaUk9TNjVWUDp6Nnl5SDNMRUVXQmtQN1lkckFBYW1mdWU5bUllVEF5b2NDTWpPL29TS3pnMFVNb1hBMHgzQ1hq
      

      You use the Base64 encoded result in the following examples.

Note

The examples show using curl commands to work with the Confluent Cloud API.

Get a list of connectors

To return a list of connectors in the cluster, use the following API request. Successful completion returns a list of connectors.

curl --request GET 'https://api.confluent.cloud/connect/v1/environments/<my-environment-ID>/clusters/<my-cluster-ID>/connectors' /
--header 'authorization: Basic <base64-encoded-key-and-secret>' | jq

For example:

curl --request GET 'https://api.confluent.cloud/connect/v1/environments/env-*****/clusters/lkc-*****/connectors' \
--header 'authorization: Basic RUVSTEVYRlBaUk9TNjVWUDp6Nnl5SDNMRUVXQmtQN1dWU5bUllVEF5b2NDTWpPL29TS3pnMFVNb1hBMHgzQ1hqVmdsUEpIWUMv' | jq

The output displays a list of connectors. For example:

[
  "DatagenSourceConnector_0",
  "S3_SINKConnector_0"
]

Create a connector

When using the Confluent Cloud API to create a connector, you can either create a connector configuration JSON file to use as the payload or use the connector configuration JSON in the curl command itself. Successful completion returns the connector configuration.

Note

Make sure to consider the following:

  • Use the Confluent Cloud Kafka cluster API key and secret in your JSON connector configuration. You use the --resource cloud API key and secret in the API request.
  • You must provide all required connector configuration properties. See the individual cloud connector documentation for the required connector properties for each connector.

The following curl command examples show two ways to provide the payload connector configuration.

Raw JSON payload example

The following command uses the connector configuration in the curl command. The example shows an Amazon S3 Sink connector configuration.

curl --request POST 'https://api.confluent.cloud/connect/v1/environments/<my-environment-ID>/clusters/<my-cluster-ID>/connectors' \
--header 'authorization: Basic <base64-encoded-key-and-secret>' \
--header 'Content-Type: application/json' \
--data-raw '{
  "name": "S3_SINKConnector_0",
  "config": {
    "topics": "pageviews",
    "input.data.format": "AVRO",
    "connector.class": "S3_SINK",
    "name": "S3_SINKConnector_0",
    "kafka.api.key": "<my-kafka-api-key>",
    "kafka.api.secret" : "<my-kafka-api-secret>",
    "aws.access.key.id" : "<my-aws-access-key>",
    "aws.secret.access.key": "<my-aws-secret>",
    "s3.bucket.name": "<my-s3-bucket-name>",
    "output.data.format": "AVRO",
    "time.interval": "HOURLY",
    "flush.size": "1000",
    "tasks.max": "1"
  }
}' | jq

JSON file payload example

The following command uploads the connector configuration in a JSON file.

curl --request POST 'https://api.confluent.cloud/connect/v1/environments/<my-environment-ID>/clusters/<my-cluster-ID>/connectors' \
--header 'authorization: Basic <base64-encoded-key-and-secret>' \
--header 'Content-Type: application/json' \
--data "@<my-connector-config>.json" | jq

Note

The CLI section in each of the fully-managed connector docs provides an example of correctly formatted JSON to use in the payload file. For example, here is the JSON example provided in the Amazon S3 Sink connector documentation.

The following example command uploads a JSON file named my-s3-connector.json which is then used to create the Amazon S3 connector:

curl --request POST 'https://api.confluent.cloud/connect/v1/environments/env-*****/clusters/lkc-*****/connectors' \
--header 'authorization: Basic RUVSTEVYRlBaUk9TNjVWUDp6Nnl51lkckFBYW1mdWU5bUllVEF5b2NDTWpPL29TS3pnMFVNb1hBMHgzQ1hqVmdsUEpIWUMv' \
--header 'Content-Type: application/json' \
--data "@s3-connector-config.json" | jq

The output displays the connector configuration. For example:

% Total    % Received % Xferd    Average Speed   Time    Time     Time  Current
                                 Dload   Upload  Total   Spent    Left  Speed
100  1467  100   878  100   589     35     23  0:00:25  0:00:24  0:00:01  214
{
  "name": "S3_SINKConnector_0",
  "type": "sink",
  "config": {
    "aws.access.key.id": "****************",
    "aws.secret.access.key": "****************",
    "cloud.environment": "prod",
    "cloud.provider": "aws",
    "connector.class": "S3_SINK",
    "flush.size": "1000",
    "input.data.format": "AVRO",
    "kafka.api.key": "****************",
    "kafka.api.secret": "****************",
    "kafka.dedicated": "false",
    "kafka.endpoint": "SASL_SSL://pkc-*****.us-west-2.aws.confluent.cloud:9092",
    "kafka.region": "us-west-2",
    "name": "S3_SINKConnector_0",
    "output.data.format": "AVRO",
    "s3.bucket.name": "datagen-to-s3",
    "schema.registry.url": "https://psrc-*****.us-east-2.aws.confluent.cloud",
    "tasks.max": "1",
    "time.interval": "HOURLY",
    "topics": "pageviews"
  },
  "tasks": []
}

Tip

The example commands in this document show Version 1 of the Confluent Cloud API. See the Confluent Cloud API developer docs for the most recent API version, a complete list of Connect API requests, all response codes, and other details.

Read a connector configuration

Use the following API request to read a connector configuration. Successful completion returns the connector configuration.

curl --request GET 'https://api.confluent.cloud/connect/v1/environments/<my-environment-ID>/clusters/<my-cluster-ID>/connectors/<my-connector-name>/config' \
--header 'authorization: Basic <base64-encoded-key-and-secret>' | jq

For example:

curl --request GET 'https://api.confluent.cloud/connect/v1/environments/env-****/clusters/lkc-*****/connectors/S3_SINKConnector_0/config' \
--header 'authorization: Basic RUVSTEVYRlBaUk9TNjVWUVXQmtQN1lkckFBYW1m5b2NDTWpPL29TS3pnMFVNb1hBMHgzQ1hqVmdsUEpIWUMv' | jq

The output displays the connector configuration. For example:

 % Total    % Received % Xferd  Average Speed   Time    Time     Time Current
                                Dload  Upload   Total   Spent    Left  Speed
100   814  100   814    0     0   1477      0 --:--:-- --:--:-- --:--:-- 1477
{
  "aws.access.key.id": "****************",
  "aws.secret.access.key": "****************",
  "cloud.environment": "prod",
  "cloud.provider": "aws",
  "connector.class": "S3_SINK",
  "flush.size": "1000",
  "input.data.format": "AVRO",
  "kafka.api.key": "****************",
  "kafka.api.secret": "****************",
  "kafka.dedicated": "false",
  "kafka.endpoint": "SASL_SSL://pkc-****.us-west-2.aws.confluent.cloud:9092",
  "kafka.region": "us-west-2",
  "name": "S3_SINKConnector_0",
  "output.data.format": "AVRO",
  "s3.bucket.name": "datagen-to-s3",
  "schema.registry.url": "https://psrc-*****.us-east-2.aws.confluent.cloud",
  "tasks.max": "1",
  "time.interval": "HOURLY",
  "topics": "pageviews"
}

Tip

The example commands in this document show Version 1 of the Confluent Cloud API. See the Confluent Cloud API developer docs for the most recent API version, a complete list of Connect API requests, all response codes, and other details.

Update a connector configuration

When using the Confluent Cloud API to update a connector configuration, you can either update a connector configuration JSON file to use as the payload or use the updated connector configuration JSON in the curl command itself. For examples showing how to construct the two types of curl commands, see Create a connector.

Note

Make sure to consider the following:

  • Use the Confluent Cloud Kafka cluster API key and secret in your JSON connector configuration. You use the --resource cloud API key and secret in the API request.
  • You must provide all required connector configuration properties. See the individual cloud connector documentation for the required connector properties for each connector.

The following curl command example shows how to update the connector flush time interval property from HOURLY to DAILY. Successful completion returns the updated connector configuration.

The connector immediately begins using the new configuration.

curl --request PUT 'https://api.confluent.cloud/connect/v1/environments/<my-environment-ID>/clusters/<my-cluster-ID>/connectors/<my-connector-name>/config' \
--header 'authorization: Basic <base64-encoded-key-and-secret>' \
--header 'Content-Type: application/json' \
--data-raw '{
    "topics": "pageviews",
    "input.data.format": "AVRO",
    "connector.class": "S3_SINK",
    "name": "<my-connector-name>",
    "kafka.api.key": "<my-kafka-api-key>",
    "kafka.api.secret" : "<my-kafka-api-secret>",
    "aws.access.key.id" : "<my-aws-access-key>",
    "aws.secret.access.key": "<my-aws-secret>",
    "s3.bucket.name": "<my-s3-bucket-name>",
    "output.data.format": "AVRO",
    "time.interval": "DAILY",
    "flush.size": "1000",
    "tasks.max": "1"
}' | jq

You can also update the connector configuration by supplying a JSON file and using the following command:

curl --request PUT https://api.confluent.cloud/connect/v1/environments/<my-environment>/clusters/<my-cluster>/connectors/<my-connector-name>/config \
--header 'authorization: Basic <base64-encoded-key-and-secret>' \
--header 'Content-Type: application/json' \
--data "@s3-sink-update.json" | jq

Note

The CLI section in each of the fully-managed connector docs provides an example of correctly formatted JSON to use in the payload file. For example, here is the JSON example provided in the Amazon S3 Sink connector documentation.

For example:

curl --request PUT https://api.confluent.cloud/connect/v1/environments/env-*****/clusters/lkc-*****/connectors/S3_SINKConnector_5/config \
--header 'authorization: Basic VTZRN1ZaNjNPV0QzSlRNUjpHVll3UjmUwK3JRUk5HQi82YkdtZlRCb1lYMDZaSTJEMHNMSnBwalBocjNLN1JORWQ2VWo2' \
--header 'Content-Type: application/json' \
--data "@s3-sink-update.json" | jq

The output displays the updated connector configuration. For example:

% Total    % Received % Xferd  Average Speed   Time    Time     Time Current
                               Dload   Upload  Total   Spent    Left  Speed
100  1415  100   877  100   538     36  22  0:00:24  0:00:23  0:00:01   205
   {
     "name": "S3_SINKConnector_5",
     "type": "sink",
     "config": {
       "aws.access.key.id": "****************",
       "aws.secret.access.key": "****************",
       "cloud.environment": "prod",
       "cloud.provider": "aws",
       "connector.class": "S3_SINK",
       "flush.size": "1000",
       "input.data.format": "AVRO",
       "kafka.api.key": "****************",
       "kafka.api.secret": "****************",
       "kafka.dedicated": "false",
       "kafka.endpoint": "SASL_SSL://pkc-*****.us-west-2.aws.confluent.cloud:9092",
       "kafka.region": "us-west-2",
       "name": "S3_SINKConnector_1",
       "output.data.format": "AVRO",
       "s3.bucket.name": "datagen-to-s3",
       "schema.registry.url": "https://psrc-*****.us-east-2.aws.confluent.cloud",
       "tasks.max": "1",
       "time.interval": "DAILY",
       "topics": "pageviews"
     },
     "tasks": []
   }

Tip

The example commands in this document show Version 1 of the Confluent Cloud API. See the Confluent Cloud API developer docs for the most recent API version, a complete list of Connect API requests, all response codes, and other details.

Query a sink connector for metrics

Complete the following steps to query a sink connector for metrics.

  1. Create a JSON file named query-connector-metrics.json to use as the payload for the API request. You can copy and paste the following example to get the number of records the connector recieved in a specific time interval. Make sure to enter the correct connector resource ID for value and a valid time interval for intervals.

    {
        "aggregations": [
            {
                "metric": "io.confluent.kafka.connect/received_records"
            }
        ],
        "filter":
        {
            "field": "resource.connector.id",
            "op": "EQ",
            "value": "lcc-k2q7v"
        },
        "granularity": "PT1H",
        "intervals": [
            "2021-03-2T0:00:00/2021-03-2T23:00:00"
        ],
        "group_by": [
            "resource.connector.id"
        ]
    }
    
  2. Enter the following POST query command:

    curl --request POST 'https://api.telemetry.confluent.cloud/v2/metrics/cloud/query' \
    --header 'authorization: Basic <base64-encoded-key-and-secret>' \
    --header 'Content-Type: application/json' \
    --data "<my-json-filename>.json" | jq
    

    For example:

    curl --request POST 'https://api.telemetry.confluent.cloud/v2/metrics/cloud/query' \
    --header 'authorization: Basic VTZRN1ZaNjNPV0QzSeEZCemUwK3JRUk5HQi82YkdtZlRCb1lYMDZaSTJEMHNMSnBwalBocjNLN1JORWQ2VWo2' \
    --header 'Content-Type: application/json' \
    --data "@query-metrics.json" | jq
    

    The request returns the number of records that connector lcc-k2q7v received in the time interval specfied. For example:

    % Total    % Received % Xferd  Average  Speed   Time    Time       Time   Current
                                   Dload    Upload   Total  Spent      Left   Speed
    100   826  100   451  100   375    457    380 --:--:-- --:--:-- --:--:--   837
    {
      "data": [
        {
          "timestamp": "2021-03-02T18:00:00Z",
          "value": 44027,
          "resource.connector.id": "lcc-k2q7v"
        },
        {
          "timestamp": "2021-03-02T19:00:00Z",
          "value": 7227,
          "resource.connector.id": "lcc-k2q7v"
        },
        {
          "timestamp": "2021-03-02T20:00:00Z",
          "value": 7222,
          "resource.connector.id": "lcc-k2q7v"
        },
        {
          "timestamp": "2021-03-02T21:00:00Z",
          "value": 7253,
          "resource.connector.id": "lcc-k2q7v"
        },
        {
          "timestamp": "2021-03-02T22:00:00Z",
          "value": 7258,
          "resource.connector.id": "lcc-k2q7v"
        }
      ]
    }
    

For additional information about the Confluent Cloud Metrics API, see the Confluent Cloud Metrics API documentation.

Delete a connector

Use the following API request to delete a connector.

curl --request DELETE https://api.confluent.cloud/connect/v1/environments/<my-environment-ID>/clusters/<my-cluster-ID>/connectors/<my-connector-name> \
--header 'authorization: Basic <base64-encoded-key-and-secret>' | jq

For example:

curl --request DELETE https://api.confluent.cloud/connect/v1/environments/env-*****/clusters/lkc-*****/connectors/S3_SINKConnector_1 \
--header 'authorization: Basic TE9PMk5CSEFUNUJWNjdONjpOc3RyWE5kamlzZE05VTdOSk05T3FuSTcyQzlIb2ZRaWhURWtiOWlkVTFtdTB6' | jq

The following shows an output example where the connector was successfully deleted.

  % Total    % Received % Xferd  Average Speed   Time    Time     Time    Current
                                 Dload   Upload  Total   Spent    Left    Speed
100    14    0    14    0     0      0      0 --:--:--  0:00:15 --:--:--     3
{
  "error": null
}

Next Steps

See the Confluent Cloud API developer docs for the most recent API version, a complete list of Connect API requests, all response codes, and other details.

See also

For an example that shows fully-managed Confluent Cloud connectors in action with Confluent Cloud ksqlDB, see the Cloud ETL Demo. This example also shows how to use Confluent Cloud CLI to manage your resources in Confluent Cloud.

../_images/topology.png