Skip to main content

Create a Connector

POST 

/connect/v1/environments/:environment_id/clusters/:kafka_cluster_id/connectors

General Availability

Create a new connector. Returns the new connector information if successful.

Request

Responses

Created

Response Headers
    OpenAPI definition (YAML)
    paths:
      /connect/v1/environments/{environment_id}/clusters/{kafka_cluster_id}/connectors:
        post:
          x-lifecycle-stage: General Availability
          operationId: createConnectv1Connector
          description: '[![General Availability](https://img.shields.io/badge/Lifecycle%20Stage-General%20Availability-%2345c6e8)](#section/Versioning/API-Lifecycle-Policy)
    
    
            Create a new connector. Returns the new connector information if successful.'
          tags:
          - Connectors (connect/v1)
          security:
          - cloud-api-key: []
          - confluent-sts-access-token: []
          responses:
            '201':
              description: Created
              content:
                application/json:
                  schema:
                    type: object
                    properties:
                      name:
                        type: string
                        description: Name of the connector
                      config:
                        type: object
                        description: 'Configuration parameters for the connector. These configurations
    
                          are the minimum set of key-value pairs which can be used to
    
                          define how the connector connects Kafka to the external system.
    
                          Some of these key-value pairs are common to all the connectors, such as
    
                          connection parameters to Kafka, connector metadata, etc. The list
    
                          of common connector configurations is as follows
    
                          - cloud.environment
    
                          - cloud.provider
    
                          - connector.class
    
                          - kafka.api.key
    
                          - kafka.api.secret
    
                          - kafka.endpoint
    
                          - kafka.region
    
                          - name
    
                          A specific connector such as `GcsSink` would have additional
    
                          parameters such as `gcs.bucket.name`, `flush.size`, etc.'
                        required:
                        - cloud.environment
                        - cloud.provider
                        - connector.class
                        - name
                        - kafka.endpoint
                        - kafka.region
                        - kafka.api.key
                        - kafka.api.secret
                        properties:
                          cloud.environment:
                            type: string
                            description: The cloud environment type.
                          cloud.provider:
                            type: string
                            description: The cloud service provider, e.g. aws, azure, etc.
                            enum:
                            - aws
                            - azure
                            - gcp
                          connector.class:
                            type: string
                            description: The connector class name. E.g. BigQuerySink, GcsSink, etc.
                          name:
                            type: string
                            description: Name or alias of the class (plugin) for this connector.
                          kafka.endpoint:
                            type: string
                            description: The Kafka cluster endpoint.
                          kafka.region:
                            type: string
                            description: The Kafka cluster region.
                          kafka.api.key:
                            type: string
                            description: The Kafka cluster API key.
                          kafka.api.secret:
                            type: string
                            description: The Kafka cluster API secret.
                            x-redact: true
                        additionalProperties:
                          type: string
                      tasks:
                        type: array
                        description: List of active tasks generated by the connector
                        items:
                          type: object
                          properties:
                            connector:
                              type: string
                              description: The name of the connector the task belongs to
                            task:
                              type: integer
                              description: Task ID within the connector
                          required:
                          - connector
                          - task
                      type:
                        type: string
                        description: Type of connector, sink or source
                        enum:
                        - sink
                        - source
                      offsets:
                        type: array
                        description: Array of offsets which are categorised into partitions.
                        items:
                          type: object
                          properties:
                            partition:
                              type: object
                              additionalProperties: true
                              description: "The partition information. For sink connectors this is the kafka\
                                \ topic and \npartition. For source connectors this is depends on the partitions\
                                \ defined by the \nsource connector. For example, the table which this task\
                                \ is pulling data from in a\nJDBC based MySQL source connector.\nPlease refer\
                                \ to the [documentation](https://docs.confluent.io/cloud/current/connectors/offsets.html#manage-offsets-for-fully-managed-connectors-in-ccloud)\
                                \ for \nmore information."
                            offset:
                              type: object
                              additionalProperties: true
                              description: "The offset of the partition. For sink connectors this is the kafka\
                                \ offset. For \nsource connectors this is depends on the offset defined by\
                                \ the source connector. \nFor example, the timestamp and incrementing column\
                                \ info in a table, for a JDBC based \nMySQL source connector.\nPlease refer\
                                \ to the [documentation](https://docs.confluent.io/cloud/current/connectors/offsets.html#manage-offsets-for-fully-managed-connectors-in-ccloud)\
                                \ for \nmore information."
                        title: connect.v1.Offsets
                    required:
                    - name
                    - config
                    title: connect.v1.ConnectorWithOffsets
                  example:
                    name: MyGcsLogsBucketConnector
                    config:
                      cloud.environment: prod
                      cloud.provider: aws
                      connector.class: GcsSink
                      data.format: BYTES
                      flush.size: '1000'
                      gcs.bucket.name: APILogsBucket
                      gcs.credentials.config: '****************'
                      kafka.api.key: '****************'
                      kafka.api.secret: '****************'
                      kafka.endpoint: SASL_SSL://pkc-xxxxx.us-west-2.aws.confluent.cloud:9092
                      kafka.region: us-west-2
                      name: MyGcsLogsBucketConnector
                      tasks.max: '1'
                      time.interval: DAILY
                      topics: APILogsTopic
                    tasks:
                    - connector: MyGcsLogsBucketConnector
                      task: 0
                    type: sink
                    offsets:
                    - partition:
                        kafka_partition: 0
                        kafka_topic: APILogsTopic
                      offset:
                        kafka_offset: 1000
              headers: {}
            '400':
              description: Bad Request
              content:
                application/json:
                  schema:
                    type: object
                    properties:
                      code:
                        type: integer
                      message:
                        type: string
                  example:
                    error:
                      code: 400
                      message: Unauthorized
            '401':
              description: Unauthorized
              content:
                application/json:
                  schema:
                    type: object
                    properties:
                      error:
                        type: object
                        description: Connector Error with error code and message.
                        properties:
                          code:
                            type: integer
                            description: Error code for the type of error
                          message:
                            type: string
                            description: Human readable error message
                    title: connect.v1.ConnectorError
                  example:
                    error:
                      code: 401
                      message: Unauthorized
            '429':
              description: Rate Limit Exceeded
              headers:
                X-Request-Id:
                  schema:
                    type: string
                  description: The unique identifier for the API request.
                X-RateLimit-Limit:
                  schema:
                    type: integer
                  description: The maximum number of requests you're permitted to make per time period.
                X-RateLimit-Remaining:
                  schema:
                    type: integer
                  description: The number of requests remaining in the current rate limit window.
                X-RateLimit-Reset:
                  schema:
                    type: integer
                  description: "The relative time in seconds until the current rate-limit window resets. \
                    \ \n  \n**Important:** This differs from Github and Twitter's same-named header which\
                    \ uses UTC epoch seconds. We use relative time to avoid client/server time synchronization\
                    \ issues."
                Retry-After:
                  schema:
                    type: integer
                  description: The number of seconds to wait until the rate limit window resets. Only sent
                    when the rate limit is reached.
            '500':
              description: Internal Server Error
              content:
                application/json:
                  schema:
                    type: object
                    properties:
                      error_code:
                        type: integer
                      message:
                        type: string
                  example:
                    error_code: 500
                    message: Failed to find any class that implements Connector and which name matches io.confluent.connect.<connector-class>...
          requestBody:
            content:
              application/json:
                schema:
                  type: object
                  properties:
                    name:
                      type: string
                      description: Name of the connector to create.
                    config:
                      type: object
                      description: Configuration parameters for the connector. All values should be strings.
                      x-redact: true
                      required:
                      - connector.class
                      - name
                      - kafka.api.key
                      - kafka.api.secret
                      properties:
                        connector.class:
                          type: string
                          description: \[Required for Managed Connector, Ignored for Custom Connector\] The
                            connector class name, e.g., BigQuerySink, GcsSink, etc.
                        name:
                          type: string
                          description: Name or alias of the class (plugin) for this connector. For custom
                            connector, it must be the same as the name of the connector to create.
                        kafka.api.key:
                          type: string
                          description: The kafka cluster api key.
                        kafka.api.secret:
                          type: string
                          description: The kafka cluster api secret key.
                          x-redact: true
                        confluent.connector.type:
                          type: string
                          description: '\[Required for Custom Connector\] The connector type.
    
                            '
                          example: CUSTOM
                          default: MANAGED
                          enum:
                          - CUSTOM
                          - MANAGED
                        confluent.custom.plugin.id:
                          type: string
                          example: ccp-lq5m06
                          description: '\[Required for Custom Connector\] The custom plugin id of custom connector,
                            e.g., `ccp-lq5m06`
    
                            '
                        confluent.custom.connection.endpoints:
                          type: string
                          description: '\[Optional for Custom Connector\] Egress endpoint(s) for the connector
                            to use when attaching to the sink or source data system.
    
                            '
                        confluent.custom.schema.registry.auto:
                          type: string
                          description: '\[Optional for Custom Connector\] Automatically add the required schema
                            registry properties in a custom connector config if schema registry is enabled.
    
                            '
                          example: 'FALSE'
                          default: 'FALSE'
                          enum:
                          - 'TRUE'
                          - 'FALSE'
                        confluent.custom.connect.plugin.runtime:
                          type: string
                          description: '\[Optional for Custom Connector\] The runtime of the custom connector
                            plugin.
    
                            '
                          example: 3.9.0
                        confluent.custom.connect.java.version:
                          type: string
                          description: '\[Optional for Custom Connector\] The Java version of the custom connector
                            plugin.
    
                            '
                          example: '17'
                      additionalProperties:
                        type: string
                        description: Other configuration parameters for the connector. All values should be
                          strings. See the connector's docs for details.
                    offsets:
                      type: array
                      description: Array of offsets which are categorised into partitions.
                      items:
                        type: object
                        properties:
                          partition:
                            type: object
                            additionalProperties: true
                            description: "The partition information. For sink connectors this is the kafka\
                              \ topic and \npartition. For source connectors this is depends on the partitions\
                              \ defined by the \nsource connector. For example, the table which this task\
                              \ is pulling data from in a\nJDBC based MySQL source connector.\nPlease refer\
                              \ to the [documentation](https://docs.confluent.io/cloud/current/connectors/offsets.html#manage-offsets-for-fully-managed-connectors-in-ccloud)\
                              \ for \nmore information."
                          offset:
                            type: object
                            additionalProperties: true
                            description: "The offset of the partition. For sink connectors this is the kafka\
                              \ offset. For \nsource connectors this is depends on the offset defined by the\
                              \ source connector. \nFor example, the timestamp and incrementing column info\
                              \ in a table, for a JDBC based \nMySQL source connector.\nPlease refer to the\
                              \ [documentation](https://docs.confluent.io/cloud/current/connectors/offsets.html#manage-offsets-for-fully-managed-connectors-in-ccloud)\
                              \ for \nmore information."
                      title: connect.v1.Offsets
                examples:
                  sink:
                    value:
                      name: MyGcsLogsBucketConnector
                      config:
                        connector.class: GcsSink
                        data.format: BYTES
                        flush.size: '1000'
                        gcs.bucket.name: APILogsBucket
                        gcs.credentials.config: '****************'
                        kafka.api.key: '****************'
                        kafka.api.secret: '****************'
                        name: MyGcsLogsBucketConnector
                        tasks.max: '2'
                        time.interval: DAILY
                        topics: APILogsTopic
                      offsets:
                      - partition:
                          kafka_partition: 0
                          kafka_topic: APILogsTopic
                        offset:
                          kafka_offset: 1000
                  source:
                    value:
                      name: MySqlCdcSourceV2Connector_0
                      config:
                        connector.class: MySqlCdcSourceV2
                        output.data.format: JSON
                        flush.size: '1000'
                        database.hostname: 12.34.567.98
                        database.password: '****************'
                        database.port: '1234'
                        database.user: '****'
                        kafka.api.key: '****************'
                        kafka.api.secret: '****************'
                        name: MySqlCdcSourceV2Connector_0
                        tasks.max: '1'
                        time.interval: DAILY
                        topic.prefix: test
                      offsets:
                      - partition:
                          server: test
                        offset:
                          file: mysql-bin.000123
                          pos: 154
                          ts_sec: 1712907333
            description: ''
          parameters:
          - name: environment_id
            in: path
            schema:
              type: string
            required: true
            description: The unique identifier of the environment this resource belongs to.
          - name: kafka_cluster_id
            in: path
            schema:
              type: string
            required: true
            description: The unique identifier for the Kafka cluster.
          servers:
          - url: https://api.confluent.cloud
            description: Confluent Cloud API
          jsonRequestBodyExample:
            name: string
            config:
              connector.class: string
              name: string
              kafka.api.key: string
              kafka.api.secret: string
              confluent.connector.type: CUSTOM
              confluent.custom.plugin.id: ccp-lq5m06
              confluent.custom.connection.endpoints: string
              confluent.custom.schema.registry.auto: 'FALSE'
              confluent.custom.connect.plugin.runtime: 3.9.0
              confluent.custom.connect.java.version: '17'
            offsets:
            - partition: {}
              offset: {}