Confluent Cloud API for Connect¶
The Confluent Cloud API for fully-managed connectors allows you to interact with your connector using the Confluent Cloud API. This is a queryable HTTP API. For instance, you can POST a query written in JSON and get back connector information specified by the query. Use the following examples to learn more about using the Confluent Cloud API.
Tip
The example commands in this document show Version 1 of the Confluent Cloud API. See the Confluent Cloud API developer docs for the most recent API version, a complete list of Connect API requests, all response codes, and other details.
For a hands-on course showing how this works, see Hands On: Confluent Cloud Managed Connector API.
Examples¶
- Prerequisites
- Authorized access to Confluent Cloud.
- The Confluent CLI installed and configured. The example commands use Confluent CLI version 2. For more information see, Confluent CLI v2.
- cURL and jq installed to use the API request examples in this document.
A Confluent Cloud API key to authenticate with the Confluent Cloud API. Every API request must include the resource key and secret encoded as Base64. Complete the following steps to create the keys and encode them in Base64.
Log in to your account.
confluent login
Create the API key and secret for the
cloud
resource.confluent api-key create --resource cloud
Important
You must create a Confluent Cloud API key for
--resource cloud
to interact with the Confluent Cloud API. Using the Kafka cluster API key created for your Confluent Cloud cluster (that is,--resource <cluster-ID>
) results in an authentication error when running the API request.The Confluent Cloud API uses Basic access authentication. To use the API key and secret, you send them as a header in the form of
Authorization: Basic <Base64-credentials>
. This form is shown in the curl examples. Enter the following command to display the Base64 encoding for the API key pair.echo -n "<api-key>:<secret>" | base64
For example:
echo -n "ABCDEFGPZROVP:z6yyH3LEEWdrAAamfue9mIeTAyocCMjO/oSKzg0UMoXA0x3CXjVglPJHYC/" | base64 HIJKLMNOPYRlBaUk9TNjVWUDp6Nnl5SDNMRUVXQmtQN1lkckFBYW1mdWU5bUllVEF5b2NDTWpPL29TS3pnMFVNb1hBMHgzQ1hq
You use the Base64 encoded result in the following examples.
Note
- The examples show using curl commands to work with the Confluent Cloud API.
- For a hands-on course showing how this works, see Hands On: Confluent Cloud Managed Connector API.
Get a list of connectors¶
To return a list of connectors in the cluster, use the following API request. Successful completion returns a list of connectors.
curl --request GET 'https://api.confluent.cloud/connect/v1/environments/<my-environment-ID>/clusters/<my-cluster-ID>/connectors' \
--header 'authorization: Basic <base64-encoded-key-and-secret>' | jq
For example:
curl --request GET 'https://api.confluent.cloud/connect/v1/environments/env-*****/clusters/lkc-*****/connectors' \
--header 'authorization: Basic HIJKLMNOPYRlB9TNjVWUDp6Nnl5SDNMRUVXQmtQN1dWU5bUllVEF5b2NDTWpPL29TS3pnMFVNb1hBMHgzQ1hqVmdsUEpIWUMv' | jq
The output displays a list of connectors. For example:
[
"DatagenSourceConnector_0",
"S3_SINKConnector_0"
]
Create a connector¶
When using the Confluent Cloud API to create a connector, you can either create a connector configuration JSON file to use as the payload or use the connector configuration JSON in the curl command itself. Successful completion returns the connector configuration.
Note
Make sure to consider the following:
- Use the Confluent Cloud Kafka cluster API key and secret in your JSON connector configuration. You use the
--resource cloud
API key and secret in the API request. - You must provide all required connector configuration properties. See the individual cloud connector documentation for the required connector properties for each connector.
The following curl command examples show two ways to provide the payload connector configuration.
Raw JSON payload example
The following command uses the connector configuration in the curl command. The example shows an Amazon S3 Sink connector configuration.
curl --request POST 'https://api.confluent.cloud/connect/v1/environments/<my-environment-ID>/clusters/<my-cluster-ID>/connectors' \
--header 'authorization: Basic <base64-encoded-key-and-secret>' \
--header 'Content-Type: application/json' \
--data-raw '{
"name": "S3_SINKConnector_0",
"config": {
"topics": "pageviews",
"input.data.format": "AVRO",
"connector.class": "S3_SINK",
"name": "S3_SINKConnector_0",
"kafka.api.key": "<my-kafka-api-key>",
"kafka.api.secret" : "<my-kafka-api-secret>",
"aws.access.key.id" : "<my-aws-access-key>",
"aws.secret.access.key": "<my-aws-secret>",
"s3.bucket.name": "<my-s3-bucket-name>",
"output.data.format": "AVRO",
"time.interval": "HOURLY",
"flush.size": "1000",
"tasks.max": "1"
}
}' | jq
JSON file payload example
The following command uploads the connector configuration in a JSON file.
curl --request POST 'https://api.confluent.cloud/connect/v1/environments/<my-environment-ID>/clusters/<my-cluster-ID>/connectors' \
--header 'authorization: Basic <base64-encoded-key-and-secret>' \
--header 'Content-Type: application/json' \
--data "@<my-connector-config>.json" | jq
Note
The CLI section in each of the fully-managed connector docs provides an example of correctly formatted JSON to use in the payload file. For example, here is the JSON example provided in the Amazon S3 Sink connector documentation.
The following example command uploads a JSON file named my-s3-connector.json
which is then used to create the Amazon S3 connector:
curl --request POST 'https://api.confluent.cloud/connect/v1/environments/env-*****/clusters/lkc-*****/connectors' \
--header 'authorization: Basic RUVSTEVYRlBaUk9TNjVWUDp6Nnl51lkckFBYW1mdWU5bUllVEF5b2NDTWpPL29TS3pnMFVNb1hBMHgzQ1hqVmdsUEpIWUMv' \
--header 'Content-Type: application/json' \
--data "@s3-connector-config.json" | jq
The output displays the connector configuration. For example:
% Total % Received % Xferd Average Speed Time Time Time Current
Dload Upload Total Spent Left Speed
100 1467 100 878 100 589 35 23 0:00:25 0:00:24 0:00:01 214
{
"name": "S3_SINKConnector_0",
"type": "sink",
"config": {
"aws.access.key.id": "****************",
"aws.secret.access.key": "****************",
"cloud.environment": "prod",
"cloud.provider": "aws",
"connector.class": "S3_SINK",
"flush.size": "1000",
"input.data.format": "AVRO",
"kafka.api.key": "****************",
"kafka.api.secret": "****************",
"kafka.dedicated": "false",
"kafka.endpoint": "SASL_SSL://pkc-*****.us-west-2.aws.confluent.cloud:9092",
"kafka.region": "us-west-2",
"name": "S3_SINKConnector_0",
"output.data.format": "AVRO",
"s3.bucket.name": "datagen-to-s3",
"schema.registry.url": "https://psrc-*****.us-east-2.aws.confluent.cloud",
"tasks.max": "1",
"time.interval": "HOURLY",
"topics": "pageviews"
},
"tasks": []
}
Tip
The example commands in this document show Version 1 of the Confluent Cloud API. See the Confluent Cloud API developer docs for the most recent API version, a complete list of Connect API requests, all response codes, and other details.
Read a connector configuration¶
Use the following API request to read a connector configuration. Successful completion returns the connector configuration.
curl --request GET 'https://api.confluent.cloud/connect/v1/environments/<my-environment-ID>/clusters/<my-cluster-ID>/connectors/<my-connector-name>/config' \
--header 'authorization: Basic <base64-encoded-key-and-secret>' | jq
For example:
curl --request GET 'https://api.confluent.cloud/connect/v1/environments/env-****/clusters/lkc-*****/connectors/S3_SINKConnector_0/config' \
--header 'authorization: Basic HIJKLMNOPYRlB9TNjVWUVXQmtQN1lkckFBYW1m5b2NDTWpPL29TS3pnMFVNb1hBMHgzQ1hqVmdsUEpIWUMv' | jq
The output displays the connector configuration. For example:
% Total % Received % Xferd Average Speed Time Time Time Current
Dload Upload Total Spent Left Speed
100 814 100 814 0 0 1477 0 --:--:-- --:--:-- --:--:-- 1477
{
"aws.access.key.id": "****************",
"aws.secret.access.key": "****************",
"cloud.environment": "prod",
"cloud.provider": "aws",
"connector.class": "S3_SINK",
"flush.size": "1000",
"input.data.format": "AVRO",
"kafka.api.key": "****************",
"kafka.api.secret": "****************",
"kafka.dedicated": "false",
"kafka.endpoint": "SASL_SSL://pkc-****.us-west-2.aws.confluent.cloud:9092",
"kafka.region": "us-west-2",
"name": "S3_SINKConnector_0",
"output.data.format": "AVRO",
"s3.bucket.name": "datagen-to-s3",
"schema.registry.url": "https://psrc-*****.us-east-2.aws.confluent.cloud",
"tasks.max": "1",
"time.interval": "HOURLY",
"topics": "pageviews"
}
Tip
The example commands in this document show Version 1 of the Confluent Cloud API. See the Confluent Cloud API developer docs for the most recent API version, a complete list of Connect API requests, all response codes, and other details.
Update a connector configuration¶
When using the Confluent Cloud API to update a connector configuration, you can either update a connector configuration JSON file to use as the payload or use the updated connector configuration JSON in the curl command itself. For examples showing how to construct the two types of curl commands, see Create a connector.
Note
Make sure to consider the following:
- Use the Confluent Cloud Kafka cluster API key and secret in your JSON connector configuration. You use the
--resource cloud
API key and secret in the API request. - You must provide all required connector configuration properties. See the individual cloud connector documentation for the required connector properties for each connector.
The following curl command example shows how to update the connector flush time interval property from HOURLY to DAILY. Successful completion returns the updated connector configuration.
The connector immediately begins using the new configuration.
curl --request PUT 'https://api.confluent.cloud/connect/v1/environments/<my-environment-ID>/clusters/<my-cluster-ID>/connectors/<my-connector-name>/config' \
--header 'authorization: Basic <base64-encoded-key-and-secret>' \
--header 'Content-Type: application/json' \
--data-raw '{
"topics": "pageviews",
"input.data.format": "AVRO",
"connector.class": "S3_SINK",
"name": "<my-connector-name>",
"kafka.api.key": "<my-kafka-api-key>",
"kafka.api.secret" : "<my-kafka-api-secret>",
"aws.access.key.id" : "<my-aws-access-key>",
"aws.secret.access.key": "<my-aws-secret>",
"s3.bucket.name": "<my-s3-bucket-name>",
"output.data.format": "AVRO",
"time.interval": "DAILY",
"flush.size": "1000",
"tasks.max": "1"
}' | jq
You can also update the connector configuration by supplying a JSON file and using the following command:
curl --request PUT https://api.confluent.cloud/connect/v1/environments/<my-environment>/clusters/<my-cluster>/connectors/<my-connector-name>/config \
--header 'authorization: Basic <base64-encoded-key-and-secret>' \
--header 'Content-Type: application/json' \
--data "@s3-sink-update.json" | jq
Note
The CLI section in each of the fully-managed connector docs provides an example of correctly formatted JSON to use in the payload file. For example, here is the JSON example provided in the Amazon S3 Sink connector documentation.
For example:
curl --request PUT https://api.confluent.cloud/connect/v1/environments/env-*****/clusters/lkc-*****/connectors/S3_SINKConnector_5/config \
--header 'authorization: Basic ABCDEFGZaNjNPV0QzSlRNUjpHVll3UjmUwK3JRUk5HQi82YkdtZlRCb1lYMDZaSTJEMHNMSnBwalBocjNLN1JORWQ2VWo2' \
--header 'Content-Type: application/json' \
--data "@s3-sink-update.json" | jq
The output displays the updated connector configuration. For example:
% Total % Received % Xferd Average Speed Time Time Time Current
Dload Upload Total Spent Left Speed
100 1415 100 877 100 538 36 22 0:00:24 0:00:23 0:00:01 205
{
"name": "S3_SINKConnector_5",
"type": "sink",
"config": {
"aws.access.key.id": "****************",
"aws.secret.access.key": "****************",
"cloud.environment": "prod",
"cloud.provider": "aws",
"connector.class": "S3_SINK",
"flush.size": "1000",
"input.data.format": "AVRO",
"kafka.api.key": "****************",
"kafka.api.secret": "****************",
"kafka.dedicated": "false",
"kafka.endpoint": "SASL_SSL://pkc-*****.us-west-2.aws.confluent.cloud:9092",
"kafka.region": "us-west-2",
"name": "S3_SINKConnector_1",
"output.data.format": "AVRO",
"s3.bucket.name": "datagen-to-s3",
"schema.registry.url": "https://psrc-*****.us-east-2.aws.confluent.cloud",
"tasks.max": "1",
"time.interval": "DAILY",
"topics": "pageviews"
},
"tasks": []
}
Tip
The example commands in this document show Version 1 of the Confluent Cloud API. See the Confluent Cloud API developer docs for the most recent API version, a complete list of Connect API requests, all response codes, and other details.
Query a sink connector for metrics¶
Complete the following steps to query a sink connector for metrics.
Create a JSON file named
query-connector-metrics.json
to use as the payload for the API request. You can copy and paste the following example to get the number of records the connector received in a specific time interval. Be sure to enter the correct connector resource ID forvalue
and a valid time interval forintervals
.{ "aggregations": [ { "metric": "io.confluent.kafka.connect/received_records" } ], "filter": { "field": "resource.connector.id", "op": "EQ", "value": "lcc-k2q7v" }, "granularity": "PT1H", "intervals": [ "2021-03-02T00:00:00/2021-03-02T23:00:00" ] }
Enter the following POST query command:
curl --silent --request POST 'https://api.telemetry.confluent.cloud/v2/metrics/cloud/query' \ --header 'authorization: Basic <base64-encoded-key-and-secret>' \ --header 'Content-Type: application/json' \ --data "<my-json-filename>.json" | jq
For example:
curl --silent --request POST 'https://api.telemetry.confluent.cloud/v2/metrics/cloud/query' \ --header 'authorization: Basic ABCDEFGZaNjNPV0QzSeEZCemUwK3JRUk5HQi82YkdtZlRCb1lYMDZaSTJEMHNMSnBwalBocjNLN1JORWQ2VWo2' \ --header 'Content-Type: application/json' \ --data "@query-metrics.json" | jq
The request returns the number of records that connector
lcc-k2q7v
received in the time interval specified. For example:{ "data": [ { "timestamp": "2021-03-02T18:00:00Z", "value": 44027, }, { "timestamp": "2021-03-02T19:00:00Z", "value": 7227, }, { "timestamp": "2021-03-02T20:00:00Z", "value": 7222, }, { "timestamp": "2021-03-02T21:00:00Z", "value": 7253, }, { "timestamp": "2021-03-02T22:00:00Z", "value": 7258, } ] }
For additional information about the Confluent Cloud Metrics API, see the Confluent Cloud Metrics documentation.
Delete a connector¶
Use the following API request to delete a connector.
curl --request DELETE https://api.confluent.cloud/connect/v1/environments/<my-environment-ID>/clusters/<my-cluster-ID>/connectors/<my-connector-name> \
--header 'authorization: Basic <base64-encoded-key-and-secret>' | jq
For example:
curl --request DELETE https://api.confluent.cloud/connect/v1/environments/env-*****/clusters/lkc-*****/connectors/S3_SINKConnector_1 \
--header 'authorization: Basic HIJKLMNOPQCSEFUNUJWNjdONjpOc3RyWE5kamlzZE05VTdOSk05T3FuSTcyQzlIb2ZRaWhURWtiOWlkVTFtdTB6' | jq
The following shows an output example where the connector was successfully deleted.
% Total % Received % Xferd Average Speed Time Time Time Current
Dload Upload Total Spent Left Speed
100 14 0 14 0 0 0 0 --:--:-- 0:00:15 --:--:-- 3
{
"error": null
}
Next Steps¶
See the Confluent Cloud API developer docs for the most recent API version, a complete list of Connect API requests, all response codes, and other details.
For an example that shows fully-managed Confluent Cloud connectors in action with Confluent Cloud ksqlDB, see the Cloud ETL Demo. This example also shows how to use Confluent CLI to manage your resources in Confluent Cloud.