Create a Connector
POST/connect/v1/environments/:environment_id/clusters/:kafka_cluster_id/connectors
Create a new connector. Returns the new connector information if successful.
Request
Responses
- 201
- 400
- 401
- 429
- 500
Created
Response Headers
Bad Request
Unauthorized
Rate Limit Exceeded
Response Headers
X-Request-Id
The unique identifier for the API request.
X-RateLimit-Limit
The maximum number of requests you're permitted to make per time period.
X-RateLimit-Remaining
The number of requests remaining in the current rate limit window.
X-RateLimit-Reset
The relative time in seconds until the current rate-limit window resets.
Important: This differs from Github and Twitter's same-named header which uses UTC epoch seconds. We use relative time to avoid client/server time synchronization issues.
Retry-After
The number of seconds to wait until the rate limit window resets. Only sent when the rate limit is reached.
Internal Server Error
OpenAPI definition (YAML)
paths:
/connect/v1/environments/{environment_id}/clusters/{kafka_cluster_id}/connectors:
post:
x-lifecycle-stage: General Availability
operationId: createConnectv1Connector
description: '[](#section/Versioning/API-Lifecycle-Policy)
Create a new connector. Returns the new connector information if successful.'
tags:
- Connectors (connect/v1)
security:
- cloud-api-key: []
- confluent-sts-access-token: []
responses:
'201':
description: Created
content:
application/json:
schema:
type: object
properties:
name:
type: string
description: Name of the connector
config:
type: object
description: 'Configuration parameters for the connector. These configurations
are the minimum set of key-value pairs which can be used to
define how the connector connects Kafka to the external system.
Some of these key-value pairs are common to all the connectors, such as
connection parameters to Kafka, connector metadata, etc. The list
of common connector configurations is as follows
- cloud.environment
- cloud.provider
- connector.class
- kafka.api.key
- kafka.api.secret
- kafka.endpoint
- kafka.region
- name
A specific connector such as `GcsSink` would have additional
parameters such as `gcs.bucket.name`, `flush.size`, etc.'
required:
- cloud.environment
- cloud.provider
- connector.class
- name
- kafka.endpoint
- kafka.region
- kafka.api.key
- kafka.api.secret
properties:
cloud.environment:
type: string
description: The cloud environment type.
cloud.provider:
type: string
description: The cloud service provider, e.g. aws, azure, etc.
enum:
- aws
- azure
- gcp
connector.class:
type: string
description: The connector class name. E.g. BigQuerySink, GcsSink, etc.
name:
type: string
description: Name or alias of the class (plugin) for this connector.
kafka.endpoint:
type: string
description: The Kafka cluster endpoint.
kafka.region:
type: string
description: The Kafka cluster region.
kafka.api.key:
type: string
description: The Kafka cluster API key.
kafka.api.secret:
type: string
description: The Kafka cluster API secret.
x-redact: true
additionalProperties:
type: string
tasks:
type: array
description: List of active tasks generated by the connector
items:
type: object
properties:
connector:
type: string
description: The name of the connector the task belongs to
task:
type: integer
description: Task ID within the connector
required:
- connector
- task
type:
type: string
description: Type of connector, sink or source
enum:
- sink
- source
offsets:
type: array
description: Array of offsets which are categorised into partitions.
items:
type: object
properties:
partition:
type: object
additionalProperties: true
description: "The partition information. For sink connectors this is the kafka\
\ topic and \npartition. For source connectors this is depends on the partitions\
\ defined by the \nsource connector. For example, the table which this task\
\ is pulling data from in a\nJDBC based MySQL source connector.\nPlease refer\
\ to the [documentation](https://docs.confluent.io/cloud/current/connectors/offsets.html#manage-offsets-for-fully-managed-connectors-in-ccloud)\
\ for \nmore information."
offset:
type: object
additionalProperties: true
description: "The offset of the partition. For sink connectors this is the kafka\
\ offset. For \nsource connectors this is depends on the offset defined by\
\ the source connector. \nFor example, the timestamp and incrementing column\
\ info in a table, for a JDBC based \nMySQL source connector.\nPlease refer\
\ to the [documentation](https://docs.confluent.io/cloud/current/connectors/offsets.html#manage-offsets-for-fully-managed-connectors-in-ccloud)\
\ for \nmore information."
title: connect.v1.Offsets
required:
- name
- config
title: connect.v1.ConnectorWithOffsets
example:
name: MyGcsLogsBucketConnector
config:
cloud.environment: prod
cloud.provider: aws
connector.class: GcsSink
data.format: BYTES
flush.size: '1000'
gcs.bucket.name: APILogsBucket
gcs.credentials.config: '****************'
kafka.api.key: '****************'
kafka.api.secret: '****************'
kafka.endpoint: SASL_SSL://pkc-xxxxx.us-west-2.aws.confluent.cloud:9092
kafka.region: us-west-2
name: MyGcsLogsBucketConnector
tasks.max: '1'
time.interval: DAILY
topics: APILogsTopic
tasks:
- connector: MyGcsLogsBucketConnector
task: 0
type: sink
offsets:
- partition:
kafka_partition: 0
kafka_topic: APILogsTopic
offset:
kafka_offset: 1000
headers: {}
'400':
description: Bad Request
content:
application/json:
schema:
type: object
properties:
code:
type: integer
message:
type: string
example:
error:
code: 400
message: Unauthorized
'401':
description: Unauthorized
content:
application/json:
schema:
type: object
properties:
error:
type: object
description: Connector Error with error code and message.
properties:
code:
type: integer
description: Error code for the type of error
message:
type: string
description: Human readable error message
title: connect.v1.ConnectorError
example:
error:
code: 401
message: Unauthorized
'429':
description: Rate Limit Exceeded
headers:
X-Request-Id:
schema:
type: string
description: The unique identifier for the API request.
X-RateLimit-Limit:
schema:
type: integer
description: The maximum number of requests you're permitted to make per time period.
X-RateLimit-Remaining:
schema:
type: integer
description: The number of requests remaining in the current rate limit window.
X-RateLimit-Reset:
schema:
type: integer
description: "The relative time in seconds until the current rate-limit window resets. \
\ \n \n**Important:** This differs from Github and Twitter's same-named header which\
\ uses UTC epoch seconds. We use relative time to avoid client/server time synchronization\
\ issues."
Retry-After:
schema:
type: integer
description: The number of seconds to wait until the rate limit window resets. Only sent
when the rate limit is reached.
'500':
description: Internal Server Error
content:
application/json:
schema:
type: object
properties:
error_code:
type: integer
message:
type: string
example:
error_code: 500
message: Failed to find any class that implements Connector and which name matches io.confluent.connect.<connector-class>...
requestBody:
content:
application/json:
schema:
type: object
properties:
name:
type: string
description: Name of the connector to create.
config:
type: object
description: Configuration parameters for the connector. All values should be strings.
x-redact: true
required:
- connector.class
- name
- kafka.api.key
- kafka.api.secret
properties:
connector.class:
type: string
description: \[Required for Managed Connector, Ignored for Custom Connector\] The
connector class name, e.g., BigQuerySink, GcsSink, etc.
name:
type: string
description: Name or alias of the class (plugin) for this connector. For custom
connector, it must be the same as the name of the connector to create.
kafka.api.key:
type: string
description: The kafka cluster api key.
kafka.api.secret:
type: string
description: The kafka cluster api secret key.
x-redact: true
confluent.connector.type:
type: string
description: '\[Required for Custom Connector\] The connector type.
'
example: CUSTOM
default: MANAGED
enum:
- CUSTOM
- MANAGED
confluent.custom.plugin.id:
type: string
example: ccp-lq5m06
description: '\[Required for Custom Connector\] The custom plugin id of custom connector,
e.g., `ccp-lq5m06`
'
confluent.custom.connection.endpoints:
type: string
description: '\[Optional for Custom Connector\] Egress endpoint(s) for the connector
to use when attaching to the sink or source data system.
'
confluent.custom.schema.registry.auto:
type: string
description: '\[Optional for Custom Connector\] Automatically add the required schema
registry properties in a custom connector config if schema registry is enabled.
'
example: 'FALSE'
default: 'FALSE'
enum:
- 'TRUE'
- 'FALSE'
confluent.custom.connect.plugin.runtime:
type: string
description: '\[Optional for Custom Connector\] The runtime of the custom connector
plugin.
'
example: 3.9.0
confluent.custom.connect.java.version:
type: string
description: '\[Optional for Custom Connector\] The Java version of the custom connector
plugin.
'
example: '17'
additionalProperties:
type: string
description: Other configuration parameters for the connector. All values should be
strings. See the connector's docs for details.
offsets:
type: array
description: Array of offsets which are categorised into partitions.
items:
type: object
properties:
partition:
type: object
additionalProperties: true
description: "The partition information. For sink connectors this is the kafka\
\ topic and \npartition. For source connectors this is depends on the partitions\
\ defined by the \nsource connector. For example, the table which this task\
\ is pulling data from in a\nJDBC based MySQL source connector.\nPlease refer\
\ to the [documentation](https://docs.confluent.io/cloud/current/connectors/offsets.html#manage-offsets-for-fully-managed-connectors-in-ccloud)\
\ for \nmore information."
offset:
type: object
additionalProperties: true
description: "The offset of the partition. For sink connectors this is the kafka\
\ offset. For \nsource connectors this is depends on the offset defined by the\
\ source connector. \nFor example, the timestamp and incrementing column info\
\ in a table, for a JDBC based \nMySQL source connector.\nPlease refer to the\
\ [documentation](https://docs.confluent.io/cloud/current/connectors/offsets.html#manage-offsets-for-fully-managed-connectors-in-ccloud)\
\ for \nmore information."
title: connect.v1.Offsets
examples:
sink:
value:
name: MyGcsLogsBucketConnector
config:
connector.class: GcsSink
data.format: BYTES
flush.size: '1000'
gcs.bucket.name: APILogsBucket
gcs.credentials.config: '****************'
kafka.api.key: '****************'
kafka.api.secret: '****************'
name: MyGcsLogsBucketConnector
tasks.max: '2'
time.interval: DAILY
topics: APILogsTopic
offsets:
- partition:
kafka_partition: 0
kafka_topic: APILogsTopic
offset:
kafka_offset: 1000
source:
value:
name: MySqlCdcSourceV2Connector_0
config:
connector.class: MySqlCdcSourceV2
output.data.format: JSON
flush.size: '1000'
database.hostname: 12.34.567.98
database.password: '****************'
database.port: '1234'
database.user: '****'
kafka.api.key: '****************'
kafka.api.secret: '****************'
name: MySqlCdcSourceV2Connector_0
tasks.max: '1'
time.interval: DAILY
topic.prefix: test
offsets:
- partition:
server: test
offset:
file: mysql-bin.000123
pos: 154
ts_sec: 1712907333
description: ''
parameters:
- name: environment_id
in: path
schema:
type: string
required: true
description: The unique identifier of the environment this resource belongs to.
- name: kafka_cluster_id
in: path
schema:
type: string
required: true
description: The unique identifier for the Kafka cluster.
servers:
- url: https://api.confluent.cloud
description: Confluent Cloud API
jsonRequestBodyExample:
name: string
config:
connector.class: string
name: string
kafka.api.key: string
kafka.api.secret: string
confluent.connector.type: CUSTOM
confluent.custom.plugin.id: ccp-lq5m06
confluent.custom.connection.endpoints: string
confluent.custom.schema.registry.auto: 'FALSE'
confluent.custom.connect.plugin.runtime: 3.9.0
confluent.custom.connect.java.version: '17'
offsets:
- partition: {}
offset: {}