Confluent Cloud API for Connect Usage Examples¶
The Confluent Cloud API allows you to interact with your fully-managed and custom connectors using the Confluent Cloud API. This is a queryable HTTP API. For instance, you can POST a query written in JSON and get back connector information specified by the query. Use the following examples to learn more about using the Confluent Cloud API for fully-managed and custom connectors.
Note
See the Confluent Cloud Connect API specification for the most recent APIs, a complete list of Connect API requests, all response codes, and other details.
For a hands-on course showing how this works for fully-managed connectors, see Hands On: Confluent Cloud Managed Connector API.
Prerequisites¶
Be sure to have the following prerequisites completed before running any of the examples.
Authorized access to Confluent Cloud.
The Confluent CLI installed and configured.
cURL and jq installed to use the API request examples in this document.
A Confluent Cloud API key to authenticate with the Confluent Cloud API. Every API request must include the resource key and secret encoded as Base64. Complete the following steps to create the keys and encode them in Base64.
Log in to your account.
confluent login
Create the API key and secret for the
cloud
resource.confluent api-key create --resource cloud
Important
You must create a Confluent Cloud API key for
--resource cloud
to to interact with the Confluent Cloud resource management API. Using the API key created for your Confluent Cloud cluster (that is,--resource <cluster-ID>
) results in an authentication error when running the API request.The Confluent Cloud API uses Basic access authentication. To use the API key and secret, you send them as a header in the form of
Authorization: Basic <Base64-credentials>
. This form is shown in the curl examples. Enter the following command to display the Base64 encoding for the API key pair.echo -n "<api-key>:<secret>" | base64
For example:
echo -n "ABCDEFGPZROVP:z6yyH3LEEWdrAAamfue9mIeTAyocCMjO/oSKzg0UMoXA0x3CXjVglPJHYC/" | base64 HIJKLMNOPYRlBaUk9TNjVWUDp6Nnl5SDNMRUVXQmtQN1lkckFBYW1mdWU5bUllVEF5b2NDTWpPL29TS3pnMFVNb1hBMHgzQ1hq
You use the Base64 encoded result in the examples.
Managed and Custom Connector API Examples¶
The following examples show how to interact with your fully-managed and custom connectors using the Confluent Cloud Connect API. For plugin examples, see Fully-Managed and Custom Connector Plugin API Examples.
Before you run any examples, be sure to complete the prerequisites.
Note
- Unless specified, the examples show how to use the API for both fully-managed and custom connectors.
- The examples show using curl commands to work with the Confluent Cloud API.
- For a hands-on course showing how this works, see Hands On: Confluent Cloud Managed Connector API.
Get a list of connectors¶
To return a list of all connectors in the cluster, use the following API request. Successful completion returns a list of fully-managed and custom connectors.
curl --request GET 'https://api.confluent.cloud/connect/v1/environments/<my-environment-ID>/clusters/<my-cluster-ID>/connectors' \
--header 'authorization: Basic <base64-encoded-key-and-secret>' | jq
For example:
curl --request GET 'https://api.confluent.cloud/connect/v1/environments/env-*****/clusters/lkc-*****/connectors' \
--header 'authorization: Basic HIJKLMNOPYRlB9TNjVWUDp6Nnl5SDNMRUVXQmtQN1dWU5bUllVEF5b2NDTWpPL29TS3pnMFVNb1hBMHgzQ1hqVmdsUEpIWUMv' | jq
The output displays a list of connectors. For example:
[
"DatagenSourceConnector_0",
"S3_SINKConnector_0"
"custom-datagen_0"
]
Create a connector¶
When using the Confluent Cloud API to create or update a connector, you can either create a connector configuration JSON file to use as the payload or use the connector configuration JSON in the curl command itself. Successful completion returns the connector configuration.
The following curl command examples show two ways to provide the payload connector configuration. Note the following additional details when using these commands:
- For configuration payload differences between fully-managed and custom connectors, see Custom connector configuration payload.
- You use the Confluent Cloud Kafka cluster API key and secret in your JSON connector
configuration. You use the base64 encoded
--resource cloud
API key and secret in the API request. - You must provide all required connector configuration properties.
- Adhere to the connector naming conventions:
- Do not exceed 64 characters.
- A connector name can contain Unicode letters, numbers, marks, and the
following special characters:
. , & _ + | [] -
. Note that you can use spaces, but using dashes (-) makes it easier to reference the cluster in a programmatic way.
Custom connector configuration payload¶
There are a few configuration properties for custom connectors that are not required for fully-managed connector configurations. Note the following configuration property differences in the configuration payload when creating or updating custom connectors using the Confluent Cloud API.
confluent.connector.type
(string): This property defines the type of connector. The property accepts two values:CUSTOM
orMANAGED
. The default isMANAGED
. If creating a custom connector, this property is required to be set toCUSTOM
.confluent.custom.plugin.id
(string): This property provides the plugin ID for the custom connector. This is a unique ID assigned to the custom connector plugin uploaded to Confluent Cloud. This property is required for custom connectors. To get the plugin ID for a custom connector, see View a custom connector plugin ID.confluent.custom.connection.endpoints
(string): This property provides an allowed static endpoint (or endpoints) for the custom connector to use when securely attaching to a sink or source data system. This is optional for a custom connector configuration. For more information and endpoint requirements, see egress endpoints.confluent.custom.schema.registry.auto
(string): When set toTRUE
, this property automatically adds the required Schema Registry properties to the custom connector configuration, if Schema Registry is enabled for the Confluent Cloud cluster. Defaults toFALSE
. For more information about Schema Registry integration with custom connectors, see Schema Registry integration.
Note
The following two configuration properties are not used when creating a custom connector configuration:
kafka.auth.mode
: The current mode for authentication defaults to API key and secret. You do not need to set this configuration property.connector.class
: The connector class is derived from the plugin used. You do not need to set this configuration property.
The following example configuration shows how these and other required properties are used to create a custom connector.
curl --request POST 'https://api.confluent.cloud/connect/v1/environments/<my-environment-ID>/clusters/<my-cluster-ID>/connectors' \
--header 'authorization: Basic <base64-encoded-key-and-secret>' \
--header 'Content-Type: application/json' \
--data-raw '{
"name": "custom-datagen_0",
"config": {
"name": "custom-datagen_0",
"confluent.connector.type": "CUSTOM",
"confluent.custom.plugin.id": "custom-plugin-123456",
"kafka.api.key": "<my-kafka-api-key>",
"kafka.api.secret": "<my-kafka-api-secret>",
"interval.ms": "10000",
"kafka.topic": "pageviews",
"task.max:": "1"
}
}' | jq
In the example above, the optional properties
confluent.custom.connection.endpoints
and
confluent.custom.schema.registry.auto
are not used. An example of how these
properties would look in a configuration is shown below:
"confluent.custom.schema.registry.auto": "true",
"confluent.custom.connection.endpoints": "mydatabase.abc123ecs2.us-west-2.rds.amazonaws.com:1433"
Raw JSON payload example¶
The following command uses the connector configuration in the curl command. The example shows an Amazon S3 Sink connector configuration.
curl --request POST 'https://api.confluent.cloud/connect/v1/environments/<my-environment-ID>/clusters/<my-cluster-ID>/connectors' \
--header 'authorization: Basic <base64-encoded-key-and-secret>' \
--header 'Content-Type: application/json' \
--data-raw '{
"name": "S3_SINKConnector_0",
"config": {
"topics": "pageviews",
"input.data.format": "AVRO",
"connector.class": "S3_SINK",
"name": "S3_SINKConnector_0",
"kafka.api.key": "<my-kafka-api-key>",
"kafka.api.secret" : "<my-kafka-api-secret>",
"aws.access.key.id" : "<my-aws-access-key>",
"aws.secret.access.key": "<my-aws-secret>",
"s3.bucket.name": "<my-s3-bucket-name>",
"output.data.format": "AVRO",
"time.interval": "HOURLY",
"flush.size": "1000",
"tasks.max": "1"
}
}' | jq
JSON file payload example¶
The following command uploads the connector configuration in a JSON file.
curl --request POST 'https://api.confluent.cloud/connect/v1/environments/<my-environment-ID>/clusters/<my-cluster-ID>/connectors' \
--header 'authorization: Basic <base64-encoded-key-and-secret>' \
--header 'Content-Type: application/json' \
--data "@<my-connector-config>.json" | jq
Note
The CLI section in each of the fully-managed connector docs provides an example of correctly formatted JSON to use in the payload file. For example, here is the JSON example provided in the Amazon S3 Sink connector documentation.
The following example command uploads a JSON file named my-s3-connector.json
which is then used to create the Amazon S3 connector:
curl --request POST 'https://api.confluent.cloud/connect/v1/environments/env-*****/clusters/lkc-*****/connectors' \
--header 'authorization: Basic RUVSTEVYRlBaUk9TNjVWUDp6Nnl51lkckFBYW1mdWU5bUllVEF5b2NDTWpPL29TS3pnMFVNb1hBMHgzQ1hqVmdsUEpIWUMv' \
--header 'Content-Type: application/json' \
--data "@s3-connector-config.json" | jq
The output displays the connector configuration. For example:
{
"name": "S3_SINKConnector_0",
"type": "sink",
"config": {
"aws.access.key.id": "****************",
"aws.secret.access.key": "****************",
"cloud.environment": "prod",
"cloud.provider": "aws",
"connector.class": "S3_SINK",
"flush.size": "1000",
"input.data.format": "AVRO",
"kafka.api.key": "****************",
"kafka.api.secret": "****************",
"kafka.dedicated": "false",
"kafka.endpoint": "SASL_SSL://pkc-*****.us-west-2.aws.confluent.cloud:9092",
"kafka.region": "us-west-2",
"name": "S3_SINKConnector_0",
"output.data.format": "AVRO",
"s3.bucket.name": "datagen-to-s3",
"schema.registry.url": "https://psrc-*****.us-east-2.aws.confluent.cloud",
"tasks.max": "1",
"time.interval": "HOURLY",
"topics": "pageviews"
},
"tasks": []
}
Note
See the Confluent Cloud Connect API specification for the most recent APIs, a complete list of Connect API requests, all response codes, and other details.
Update (or create) a connector¶
When using the Confluent Cloud API to update a connector configuration, you can either update a connector configuration JSON file to use as the payload or use the updated connector configuration JSON in the curl command itself. For examples showing how to construct the two types of curl commands, see Create a connector.
If the connector exists, the API request will update the existing connector configuration with the replacement configuration payload. If there is no connector at the location in the requested route, a new connector is created.
Note the following additional details when using these commands:
- For configuration payload differences between fully-managed and custom connectors, see Custom connector configuration payload.
- You use the Confluent Cloud Kafka cluster API key and secret in your JSON connector
configuration. You use the base64 encoded
--resource cloud
API key and secret in the API request. - You must provide all required connector configuration properties.
- Adhere to the connector naming conventions:
- Do not exceed 64 characters.
- A connector name can contain Unicode letters, numbers, marks, and the
following special characters:
. , & _ + | [] -
. Note that you can use spaces, but using dashes (-) makes it easier to reference the cluster in a programmatic way.
The following curl command example shows how to update the connector flush time interval property from HOURLY to DAILY. Successful completion returns the updated connector configuration.
The connector immediately begins using the new configuration.
curl --request PUT 'https://api.confluent.cloud/connect/v1/environments/<my-environment-ID>/clusters/<my-cluster-ID>/connectors/<my-connector-name>/config' \
--header 'authorization: Basic <base64-encoded-key-and-secret>' \
--header 'Content-Type: application/json' \
--data-raw '{
"topics": "pageviews",
"input.data.format": "AVRO",
"connector.class": "S3_SINK",
"name": "<my-connector-name>",
"kafka.api.key": "<my-kafka-api-key>",
"kafka.api.secret" : "<my-kafka-api-secret>",
"aws.access.key.id" : "<my-aws-access-key>",
"aws.secret.access.key": "<my-aws-secret>",
"s3.bucket.name": "<my-s3-bucket-name>",
"output.data.format": "AVRO",
"time.interval": "DAILY",
"flush.size": "1000",
"tasks.max": "1"
}' | jq
You can also update the connector configuration by supplying a JSON file and using the following command:
curl --request PUT https://api.confluent.cloud/connect/v1/environments/<my-environment>/clusters/<my-cluster>/connectors/<my-connector-name>/config \
--header 'authorization: Basic <base64-encoded-key-and-secret>' \
--header 'Content-Type: application/json' \
--data "@s3-sink-update.json" | jq
Note
The CLI section in each of the fully-managed connector docs provides an example of correctly formatted JSON to use in the payload file. For example, here is the JSON example provided in the Amazon S3 Sink connector documentation.
For example:
curl --request PUT https://api.confluent.cloud/connect/v1/environments/env-*****/clusters/lkc-*****/connectors/S3_SINKConnector_5/config \
--header 'authorization: Basic ABCDEFGZaNjNPV0QzSlRNUjpHVll3UjmUwK3JRUk5HQi82YkdtZlRCb1lYMDZaSTJEMHNMSnBwalBocjNLN1JORWQ2VWo2' \
--header 'Content-Type: application/json' \
--data "@s3-sink-update.json" | jq
The output displays the updated connector configuration. For example:
{
"name": "S3_SINKConnector_5",
"type": "sink",
"config": {
"aws.access.key.id": "****************",
"aws.secret.access.key": "****************",
"cloud.environment": "prod",
"cloud.provider": "aws",
"connector.class": "S3_SINK",
"flush.size": "1000",
"input.data.format": "AVRO",
"kafka.api.key": "****************",
"kafka.api.secret": "****************",
"kafka.dedicated": "false",
"kafka.endpoint": "SASL_SSL://pkc-*****.us-west-2.aws.confluent.cloud:9092",
"kafka.region": "us-west-2",
"name": "S3_SINKConnector_1",
"output.data.format": "AVRO",
"s3.bucket.name": "datagen-to-s3",
"schema.registry.url": "https://psrc-*****.us-east-2.aws.confluent.cloud",
"tasks.max": "1",
"time.interval": "DAILY",
"topics": "pageviews"
},
"tasks": []
}
Note
See the Confluent Cloud Connect API specification for the most recent APIs, a complete list of Connect API requests, all response codes, and other details.
Read a connector configuration¶
Use the following API request to read a connector configuration. Successful completion returns the connector configuration.
curl --request GET 'https://api.confluent.cloud/connect/v1/environments/<my-environment-ID>/clusters/<my-cluster-ID>/connectors/<my-connector-name>/config' \
--header 'authorization: Basic <base64-encoded-key-and-secret>' | jq
For example:
curl --request GET 'https://api.confluent.cloud/connect/v1/environments/env-****/clusters/lkc-*****/connectors/S3_SINKConnector_0/config' \
--header 'authorization: Basic HIJKLMNOPYRlB9TNjVWUVXQmtQN1lkckFBYW1m5b2NDTWpPL29TS3pnMFVNb1hBMHgzQ1hqVmdsUEpIWUMv' | jq
The output displays the connector configuration. For example:
{
"aws.access.key.id": "****************",
"aws.secret.access.key": "****************",
"cloud.environment": "prod",
"cloud.provider": "aws",
"connector.class": "S3_SINK",
"flush.size": "1000",
"input.data.format": "AVRO",
"kafka.api.key": "****************",
"kafka.api.secret": "****************",
"kafka.dedicated": "false",
"kafka.endpoint": "SASL_SSL://pkc-****.us-west-2.aws.confluent.cloud:9092",
"kafka.region": "us-west-2",
"name": "S3_SINKConnector_0",
"output.data.format": "AVRO",
"s3.bucket.name": "datagen-to-s3",
"schema.registry.url": "https://psrc-*****.us-east-2.aws.confluent.cloud",
"tasks.max": "1",
"time.interval": "HOURLY",
"topics": "pageviews"
}
Note
See the Confluent Cloud Connect API specification for the most recent APIs, a complete list of Connect API requests, all response codes, and other details.
Query a sink connector for metrics¶
Complete the following steps to query a sink connector for metrics.
Create a JSON file named
query-connector-metrics.json
to use as the payload for the API request. You can copy and paste the following example to get the number of records the connector received in a specific time interval. Be sure to enter the correct connector resource ID forvalue
and a valid time interval forintervals
.{ "aggregations": [ { "metric": "io.confluent.kafka.connect/received_records" } ], "filter": { "field": "resource.connector.id", "op": "EQ", "value": "lcc-k2q7v" }, "granularity": "PT1H", "intervals": [ "2021-03-02T00:00:00/2021-03-02T23:00:00" ] }
Enter the following POST query command:
curl --silent --request POST 'https://api.telemetry.confluent.cloud/v2/metrics/cloud/query' \ --header 'authorization: Basic <base64-encoded-key-and-secret>' \ --header 'Content-Type: application/json' \ --data "<my-json-filename>.json" | jq
For example:
curl --silent --request POST 'https://api.telemetry.confluent.cloud/v2/metrics/cloud/query' \ --header 'authorization: Basic ABCDEFGZaNjNPV0QzSeEZCemUwK3JRUk5HQi82YkdtZlRCb1lYMDZaSTJEMHNMSnBwalBocjNLN1JORWQ2VWo2' \ --header 'Content-Type: application/json' \ --data "@query-metrics.json" | jq
The request returns the number of records that connector
lcc-k2q7v
received in the time interval specified. For example:{ "data": [ { "timestamp": "2021-03-02T18:00:00Z", "value": 44027, }, { "timestamp": "2021-03-02T19:00:00Z", "value": 7227, }, { "timestamp": "2021-03-02T20:00:00Z", "value": 7222, }, { "timestamp": "2021-03-02T21:00:00Z", "value": 7253, }, { "timestamp": "2021-03-02T22:00:00Z", "value": 7258, } ] }
For additional information about the Confluent Cloud Metrics API, see the Confluent Cloud Metrics documentation.
Delete a connector¶
Use the following API request to delete a connector.
curl --request DELETE https://api.confluent.cloud/connect/v1/environments/<my-environment-ID>/clusters/<my-cluster-ID>/connectors/<my-connector-name> \
--header 'authorization: Basic <base64-encoded-key-and-secret>' | jq
For example:
curl --request DELETE https://api.confluent.cloud/connect/v1/environments/env-*****/clusters/lkc-*****/connectors/S3_SINKConnector_1 \
--header 'authorization: Basic HIJKLMNOPQCSEFUNUJWNjdONjpOc3RyWE5kamlzZE05VTdOSk05T3FuSTcyQzlIb2ZRaWhURWtiOWlkVTFtdTB6' | jq
The following shows an output example where the connector was successfully deleted.
{
"error": null
}
Note
See the Confluent Cloud Connect API specification for the most recent APIs, a complete list of Connect API requests, all response codes, and other details.
Fully-Managed and Custom Connector Plugin API Examples¶
The following examples show how to interact with your fully-managed and custom connector plugins using the Confluent Cloud Connect API. Before you run any examples, be sure to complete the prerequisites.
Fully-managed connector plugin examples¶
The following examples show API requests for fully-managed connector plugins.
List fully-managed plugins¶
Use the following API request to return a list of all connector plugins in the cluster. Successful completion returns a list of fully-managed connector plugins.
curl --request GET 'https://api.confluent.cloud/connect/v1/environments/<my-environment-ID>/clusters/<my-cluster-ID>/connector-plugins' --header 'authorization: Basic <base64-encoded-key-and-secret>' | jq
For example:
curl --request GET 'https://api.confluent.cloud/connect/v1/environments/env-a12b34/clusters/lkc-67abc8/connector-plugins' --header 'authorization: Basic Uk1IWFE1MjNBRVFTRUJKVDphOFBn******************SlptcXd5aXNnBINnR3em1GVUw1bDVXQVNX' | jq
The output displays a list of connector plugins. For example:
[
{
"class": "MicrosoftSqlServerSink",
"type": "sink",
"version": "0.1.0"
},
{
"class": "MySqlCdcSource",
"type": "source",
"version": "0.1.0"
},
{
"class": "DataScienceBigQuerySink",
"type": "sink",
"version": "1"
},
{
"class": "PubSubSource",
"type": "source",
"version": "0.1.0"
},
{...
}
]
Validate a fully-managed plugin¶
Use the following API request to validate configuration values against the plugin configuration definition. The API validates the configuration and returns suggested values and validation error messages.
curl --request PUT 'https://api.confluent.cloud/connect/v1/environments/<my-environment-ID>/clusters/<my-cluster-ID/connector-plugins/<plugin-name>/config/validate' --header 'authorization: Basic <base64-encoded-key-and-secret>' --header 'Content-Type: application/json' --data '{
"<config1>": "<value1>",
"<config2>": "<value2>"
...
}' | jq
For example:
curl --request PUT 'https://api.confluent.cloud/connect/v1/environments/env-a12b34/clusters/lkc-67abc8/connector-plugins/DatagenSource/config/validate' --header 'authorization: Basic Uk1IWFE1MjNBRVFTRUJKVDphOFBn******************SlptcXd5aXNnBINnR3em1GVUw1bDVXQVNX' --header 'Content-Type: application/json' --data '{
"connector.class": "io.confluent.kafka.connect.datagen.DatagenConnector",
"kafka.topic": "pageviews",
"quickstart": "pageviews",
"key.converter": "org.apache.kafka.connect.storage.StringConverter",
"value.converter": "org.apache.kafka.connect.json.JsonConverter",
"value.converter.schemas.enable": "false",
"max.interval": "100",
"iterations": "10000000",
"tasks.max": "1"
}' | jq
The output returns suggested values and error messages. For example:
{
"name": "io.confluent.kafka.connect.datagen.DatagenConnector",
"groups": [
"How should we connect to your data?",
"Kafka Cluster credentials",
"Which topic do you want to send data to?",
"Schema Config",
"Output messages",
"Datagen Details",
"Number of tasks for this connector",
"Transforms",
"Predicates"
],
"error_count": 4,
"configs": [
{
"definition": {
"name": "connector.class",
"type": "STRING",
"required": true,
"default_value": "",
"importance": "HIGH",
"documentation": "",
"group": "How should we connect to your data?",
"width": "NONE",
"display_name": "Connector class",
"dependents": [],
"order": 1,
"alias": ""
},
"value": {
"name": "connector.class",
"value": "DatagenSource",
"recommended_values": [],
"errors": [],
"visible": true
},
"metadata": {}
},
{
"definition": {
"name": "name",
"type": "STRING",
"required": true,
"default_value": "",
"importance": "HIGH",
"documentation": "Sets a name for your connector.",
"group": "How should we connect to your data?",
"width": "NONE",
"display_name": "Connector name",
"dependents": [],
"order": 2,
"alias": ""
},
"value": {
"name": "name",
"value": "{{.logicalClusterId}}",
"recommended_values": [],
"errors": [
"\"name\" is required"
],
"visible": true
},
"metadata": {}
},
{ ...
},
{ ...
},
{ ...
},
{
"definition": {
"name": "predicates",
"type": "LIST",
"required": false,
"default_value": "",
"importance": "LOW",
"documentation": "Aliases for the predicates used by transformations.",
"group": "Predicates",
"width": "LONG",
"display_name": "Predicates",
"dependents": [],
"order": 8,
"alias": ""
},
"value": {
"name": "predicates",
"value": "",
"recommended_values": [],
"errors": [],
"visible": true
},
"metadata": {}
}
]
}
Custom connector plugin examples¶
The following examples show API requests for custom connector plugins.
Important
Custom connector plugins are organization-level resources. The following admin roles can interact with custom connectors and plugins:
- OrganizationAdmin
- EnvironmentAdmin
- CloudClusterAdmin
For more information, see RBAC role mappings.
List custom plugins¶
Depending on the activity in your environment, there may be many pages of plugin
results returned with this command. The API supports pagination and so you can
set a page size and use the "metadata":""next"
URL to move forward. For
example:
To list the first 20 plugins:
curl --request GET 'https://api.confluent.cloud/connect/v1/custom-connector-plugins\?page_size\=20' --header 'authorization: Basic <base64-encoded-key-and-secret>' | jq
The output of this command includes the following:
"metadata": {
"first": "http://api.confluent.cloud/connect/v1/custom-connector-plugins",
"next": "http://api.confluent.cloud/connect/v1/custom-connector-plugins?page_token=eyJpZCI6ImN1c3RvbS1wbHVnaW4tbDlvanZsIiwiY3JlYXRlZCI6IjIwMjMtMDQtMjFUMTY6MTk6MjEuNzY2NTAyWiIsInNpemUiOjR9"
}
Use the "next"
URL to go to the next set of results. Note that the default
page_size
value is 10
and the maximum allowed is 100
.
The following API request to return a list of all custom connector plugins in the cluster. Successful completion returns a list of custom connector plugins.
curl --request GET 'https://api.confluent.cloud/connect/v1/custom-connector-plugins' --header 'authorization: Basic <base64-encoded-key-and-secret>' | jq
For example:
curl --request GET 'https://api.confluent.cloud/connect/v1/custom-connector-plugins' --header 'authorization: Basic Uk1IWFE1MjNBRVFTRUJKVDphOFBn******************SlptcXd5aXNnBINnR3em1GVUw1bDVXQVNX' | jq
The output displays custom connector plugin details.
{
"api_version": "connect/v1",
"data": [
{
"api_version": "connect/v1",
"connector_class": "io.confluent.kafka.connect.datagen.DatagenConnector",
"connector_type": "source",
"content_format": "ZIP",
"description": "Datagen Custom Connector",
"display_name": "custom-datagen",
"documentation_link": "https://github.com/confluentinc/kafka-connect-datagen/blob/master/README.md",
"id": "custom-plugin-abc01e",
"kind": "CustomConnectorPlugin",
"metadata": {
"created_at": "2023-05-03T18:17:26.024602Z",
"resource_name": "crn://confluent.cloud/organization=12345abc-6def-4843-93a9-6ba27bd2fe5c/custom-connector-plugins=custom-plugin-abc01e",
"self": "http://api.confluent.cloud/connect/v1/custom-connector-plugins/custom-plugin-abc01e",
"updated_at": "2023-08-14T20:54:21.229971Z"
},
"sensitive_config_properties": []
}
],
"kind": "CustomConnectorPlugin",
"metadata": {
"first": "http://api.confluent.cloud/connect/v1/custom-connector-plugins",
"next": ""
}
}
Request a presigned URL¶
Before you can upload a custom connector plugin to Confluent Cloud, you need to get a presigned URL and ID to use in the configuration. The presigned URL and ID include the correct bucket policy and security token for uploading the plugin. Note that the URL policy expires in one hour. If the policy expires, you can request a new presigned upload URL.
Use the following API request to get the upload URL and ID. Note that the
request can be made for both ZIP
and JAR
file formats, depending on the
plugin format. The example below is for a ZIP
upload file.
curl --request POST 'https://api.confluent.cloud/connect/v1/presigned-upload-url' --header 'authorization: Basic <base64-encoded-key-and-secret>' --header 'content-type: application/json' --data '{"content_format":"ZIP"}' | jq
For example:
curl --request POST 'https://api.confluent.cloud/connect/v1/presigned-upload-url' --header 'Authorization: Basic Uk1IWFE1MjNBRVFTRUJKVDphOFBn******************SlptcXd5aXNnBINnR3em1GVUw1bDVXQVNX' --header 'content-type: application/json' --data '{"content_format":"ZIP"}' | jq
The output displays the details for the presigned URL and upload ID. For example:
{
"api_version": "connect/v1",
"content_format": "ZIP",
"kind": "PresignedUrl",
"upload_form_data": {
"bucket": "confluent-custom-connectors-prod-us-west-2",
"key": "staging/ccp/v1/.../custom-plugins/.../plugin.zip",
"policy": "...",
"x-amz-algorithm": "AWS4-HMAC-SHA256",
"x-amz-credential": "ASIA3UFUYVKN2TJBCYPQ/20230911/us-west-2/s3/aws4_request",
"x-amz-date": "20230911T210830Z",
"x-amz-security-token": "..."
},
"upload_id": "...",
"upload_url": "https://confluent-custom-connectors-prod-us-west-2.s3.dualstack.us-west-2.amazonaws.com/"
}
You will need to use the returned details above when uploading the custom connector plugin.
Upload a custom plugin¶
Once you have the presigned URL, ID, bucket policy and other security details, you upload your plugin to the bucket. The following example provides the curl command you can use to upload your plugin ZIP or JAR file.
Note
When specifying the upload file, you must use the @
symbol at the start
of the file path. For example, -F file=@</path/to/upload/file>
. If the
@
symbol is not used, you may see an error stating that Your proposed
upload is smaller than the minimum allowed size.
curl -X POST "https://confluent-custom-connectors-prod-us-west-2.s3.dualstack.us-west-2.amazonaws.com/" \
-F "bucket=confluent-custom-connectors-prod-us-west-2" \
-F "key=staging/ccp/v1/.../custom-plugins/.../plugin.zip" \
-F "policy=eyJleHBpcmF0...TA3Mzc0MTgyNF1dfQ==" \
-F "x-amz-algorithm=AWS4-HMAC-SHA256" \
-F "x-amz-credential=.../20230912/us-west-2/s3/aws4_request" \
-F "x-amz-date=20230912T151736Z" \
-F "x-amz-security-token=IQoJ...SmyuK0=" \
-F "x-amz-signature=2a6e8e5...a4d5bef0e" \
-F file=@/Users/<username>/Downloads/confluentinc-kafka-connect-datagen-0.6.0.zip
A successful command returns the command prompt.
Create a custom plugin¶
Use the following API request to create a custom connector plugin. The following are required configuration properties:
display_name
: A meaningful plugin name to be displayed in the UI and CLI.connector_class
: Java class or alias for connector. You can get connector class from connector documentation provided by developer.connector_type
:SOURCE`
orSINK
.upload_source
: For this property, you use the values"PRESIGNED_URL_LOCATION"
and the upload ID.
Optional configuration properties include the following:
description
: A meaningful description for the plugin.documentation_link
: You can include a URL for documentation users need to reference for the connector. This URL will be displayed for users configuring a connector using the plugin.sensitive_properties
: A sensitive property is a connector configuration property that must be hidden after a user enters the property value when setting up connector. For more information about sensitive properties, see Uploading and launching the connector.
curl --request POST \
--url https://api.confluent.cloud/connect/v1/custom-connector-plugins \
--header 'Authorization: Basic REPLACE_BASIC_AUTH' \
--header 'content-type: application/json' \
--data '{
"display_name":"<PLUGIN_NAME>",
"documentation_link":"<DOC_URL>",
"connector_class":"<CONNECTOR_CLASS>",
"connector_type":"SOURCE",
"sensitive_config_properties": ["<property-1>", "<property-2"],"
"upload_source":{"location":"PRESIGNED_URL_LOCATION","upload_id":"<UPLOAD_ID>"}
}' | jq
For example:
curl --request POST \
--url https://api.confluent.cloud/connect/v1/custom-connector-plugins \
--header 'Authorization: Basic Uk1IWFE1MjNBRVFTRUJKVDphOFBn******************SlptcXd5aXNnBINnR3em1GVUw1bDVXQVNX' \
--header 'content-type: application/json' \
--data '{
"display_name":"custom-datagen-2",
"description":"A test connector providing mock source data.",
"documentation_link":"https://github.com/confluentinc/kafka-connect-datagen",
"connector_class":"io.confluent.kafka.connect.datagen.DatagenConnector",
"connector_type":"SOURCE"
"upload_source":{"location":"PRESIGNED_URL_LOCATION","upload_id":"........-cf18-42e9-....-5c41a4e32f49"}
}' | jq
The output displays the details for the plugin:
{
"api_version": "connect/v1",
"connector_class": "io.confluent.kafka.connect.datagen.DatagenConnector",
"connector_type": "SOURCE",
"content_format": "ZIP",
"description": "A test connector providing mock source data.",
"display_name": "custom-datagen-2",
"documentation_link": "https://github.com/confluentinc/kafka-connect-datagen",
"id": "ccp-abc123",
"kind": "CustomConnectorPlugin",
"metadata": {
"created_at": "2023-09-12T16:31:05.746028Z",
"resource_name": "crn://confluent.cloud/organization=........-423f-43e6-....-........../custom-connector-plugins=ccp-abc123"",
"self": "http://api.confluent.cloud/connect/v1/custom-connector-plugins/ccp-abc123",
"updated_at": "2023-09-12T16:31:05.746028Z"
},
"sensitive_config_properties": []
}
}
Read a custom plugin¶
Use the following API request to read a custom connector plugin.
curl --request GET \
--url 'https://api.confluent.cloud/connect/v1/custom-connector-plugins/{id}' \
--header 'Authorization: Basic REPLACE_BASIC_AUTH'
For example:
curl --request GET \
--url 'https://api.confluent.cloud/connect/v1/custom-connector-plugins/ccp-abcdef' \
--header 'Authorization: Uk1IWFE1MjNBRVFTRUJKVDphOFBn******************SlptcXd5aXNnBINnR3em1GVUw1bDVXQVNX' | jq
The output displays the details for the plugin:
{
"api_version": "connect/v1",
"connector_class": "io.confluent.kafka.connect.datagen.DatagenConnector",
"connector_type": "SOURCE",
"content_format": "ZIP",
"description": "A source connector for dev testing.",
"display_name": "custom datagen 2",
"documentation_link": "https://github.com/confluentinc/kafka-connect-datagen",
"id": "ccp-abcdef",
"kind": "CustomConnectorPlugin",
"metadata": {
"created_at": "2023-09-12T17:38:04.867797Z",
"resource_name": "crn://confluent.cloud/organization=14383aee-1bc5-.............../custom-connector-plugins=ccp-abcdef",
"self": "http://api.confluent.cloud/connect/v1/custom-connector-plugins/ccp-abcdef",
"updated_at": "2023-09-12T17:38:05.812088Z"
},
"sensitive_config_properties": []
}
Update a custom plugin¶
Use the following PATCH API request to update a custom connector plugin. You
only add properties in the --data
section that need to be updated.
curl --request PATCH \
--url 'https://api.confluent.cloud/connect/v1/custom-connector-plugins/{id}' \
--header 'Authorization: Basic REPLACE_BASIC_AUTH' \
--header 'content-type: application/json' \
--data '{"display_name":"string","description":"string","documentation_link":"https://github.com/confluentinc/kafka-connect-datagen","connector_class":"io.confluent.kafka.connect.datagen.DatagenConnector","connector_type":"SOURCE","sensitive_config_properties":["passwords","keys","tokens"],"upload_source":{"location":"PRESIGNED_URL_LOCATION","upload_id":"e53bb2e8-8de3-49fa-9fb1-4e3fd9a16b66"}}' | jq
For example, to change the connector plugin display name:
curl --request PATCH --url 'https://api.confluent.cloud/connect/v1/custom-connector-plugins/ccp-abcdef' --header 'Authorization: Basic Uk1IWFE1MjNBRVFTRUJKVDphOFBn******************SlptcXd5aXNnBINnR3em1GVUw1bDVXQVNX' --header 'content-type: application/json' --data '{"display_name":"custom-datagen-1"}' | jq
The output displays the details for the updated plugin:
{
"api_version": "connect/v1",
"connector_class": "io.confluent.kafka.connect.datagen.DatagenConnector",
"connector_type": "SOURCE",
"content_format": "ZIP",
"description": "A source connector for dev testing.",
"display_name": "custom-datagen-1",
"documentation_link": "https://github.com/confluentinc/kafka-connect-datagen",
"id": "ccp-abcdef",
"kind": "CustomConnectorPlugin",
"metadata": {
"created_at": "2023-09-12T17:38:04.867797Z",
"resource_name": "crn://confluent.cloud/organization=14383aee-1bc5-.............../custom-connector-plugins=ccp-abcdef",
"self": "http://api.confluent.cloud/connect/v1/custom-connector-plugins/ccp-abcdef",
"updated_at": "2023-09-12T17:38:05.812088Z"
},
"sensitive_config_properties": []
}
Delete a custom plugin¶
Use the following API request to delete a custom connector plugin.
curl --request DELETE \
--url 'https://api.confluent.cloud/connect/v1/custom-connector-plugins/{id}' \
--header 'Authorization: Basic REPLACE_BASIC_AUTH'
For example:
curl --request DELETE \
--url 'https://api.confluent.cloud/connect/v1/custom-connector-plugins/ccp-abcdef' \
--header 'Authorization: Uk1IWFE1MjNBRVFTRUJKVDphOFBn******************SlptcXd5aXNnBINnR3em1GVUw1bDVXQVNX' | jq
A successful command returns the command prompt.
Note
See the Confluent Cloud Connect API specification for the most recent APIs, a complete list of Connect API requests, all response codes, and other details.
Next Steps¶
For an example that shows fully-managed Confluent Cloud connectors in action with Confluent Cloud ksqlDB, see the Cloud ETL Demo. This example also shows how to use Confluent CLI to manage your resources in Confluent Cloud.