Manage Connectors

Confluent for Kubernetes (CFK) supports declaratively creating and managing connectors as Connector custom resources (CRs) in Kubernetes.

Connectors are managed by Kafka Connect. Install connector plugins as described in Install connector plugin before you create connector CRs.

Create connectors

Each Connector CR is mapped to a connector in Kafka Connect worker.

  1. Create a connector as a Connector CR:

      name:                --- [1]
    kind: Connector
      name:                --- [2]
      taskMax:             --- [3]
      class:               --- [4]
      configs:             --- [5]
        type:              --- [6]
        maxRetry:          --- [7]
      connectClusterRef:   --- [8]
      connectRest:         --- [9]
    • [1] Required. The name of this Connector CR.

      To copy the settings of a connector to create a new connector, change this value with the name of the new connector.

    • [2] Optional. Typically, this is the same as If you need to have a name which Kubernetes cannot support, specify that name here.

    • [3] Required. The max number of tasks for the connector.

      When you set this, consider resource consumption. You may need to test in a development environment, first, if setting this to a big number.

    • [4] Required. The class name of the connector.

    • [5] Connector-specific configuration settings as key-value maps. Consult the connector documentation for the required settings.

      See Connector configs for more information, if configuration requires sensitive data or certificates.

    • [6] Required. The policy to restart failed tasks of the connector. Set to OnFailure or Never. The default value is OnFailure, which means it will automatically restart the task when a task failed if maxRetry is not reached.

    • [7] The max retry times to restart when restartPolicy type is OnFailure. The default value is 10.

    • [8] See Discover Connect using Connect cluster name.

    • [9] See Discover Connect using Connect endpoint.

  2. Apply the configuration to create the connector:

    kubectl apply -f <connector CR>

Connector configs

You can use connector configs (spec.configs) for categories of settings:

Mounted secrets for credentials

Mounted secrets allows you to protect sensitive data, such as passwords, in connector configuration.

To use mounted secrets for connectors:

  1. Create a mounted secret and specify the secret name in the Connect CR as described in Provide Connect credentials using mounted secrets.

  2. In the connector CR, specify the locations of the secret as variables, and CFK dynamically resolves the variables when the connector starts.

    For example, with a secret named, my-credential:

        connection.url: "${file:/mnt/secrets/my-credential/}"
        connection.user: "${file:/mnt/secrets/my-credential/}"
        connection.password: "${file:/mnt/secrets/my-credential/}"

Connector TLS certificates

When required connector TLS certificates are not present in the Connect cluster, follow the steps to have the certificates available to the connectors in the Connect cluster.

  1. Create a secret and specify the secret name in the Connect CR as described in Provide connector certificates.

  2. Get the details of the certificates, such as keystore path, truststore path, JKS password, using the following command:

    kubectl get connect -oyaml

    The paths are in status.connectorTLSFilePaths.

  3. In the connector CR, specify the information retrieved in the previous step.

    For example:

        ssl.truststore.location: "/mnt/sslcerts/truststore.jks"
        ssl.truststore.password: "${file:/mnt/sslcerts/jksPassword.txt:jksPassword}"
        ssl.keystore.location: "/mnt/sslcerts/keystore.jks"
        ssl.keystore.password: "${file:/mnt/sslcerts/jksPassword.txt:jksPassword}"

Discover Connect cluster

A connector can discover which Connect cluster it belongs to using one of the following methods:

  • Provide the name of the Connect cluster in connectClusterRef.

    Use this option when the Connect cluster is deployed by CFK as a custom resource.

  • Provide the connection information for the Connect cluster in connectRest.

    Use this option when the Connect cluster is deployed outside of CFK.

  • If neither connectClusterRef or connectRest is provided in the connector CR, CFK tries to discover the Connect cluster in the same namespace. If there are multiple Connect clusters in the same namespace, CFK will logs errors.

Discover Connect using Connect cluster name

To have the connector discover its Connect cluster by the Connect cluster name, use connectClusterRef in the Connector CR:

    name:                 --- [1]
    namespace:            --- [2]
  • [1] Required. The name of the Connect cluster.
  • [2] Optional. The namespace where the Connect cluster is deployed. If omitted, the same namespace as this connector is assumed.

Discover Connect using Connect endpoint

You can provide information of the Connect cluster that the connector belongs to in the Connector CR.

Kafka Connect endpoint

    endpoint:             --- [1]
  • [1] Connect URL and the port.

For example:

    endpoint: https://connect.operator.svc.cluster.local:8083

Basic authentication to Connect

      type: basic                 --- [1]
        secretRef:                --- [2]
        directoryPathInContainer: --- [3]

mTLS authentication to Connect

      type: mtls                 --- [1]
      secretRef:                 --- [2]
      directoryPathInContainer:  --- [3]

Bearer authentication to Connect (for RBAC)

When RBAC is enabled, you can configure bearer authentication as below:

      type: bearer                --- [1]
        secretRef:                --- [2]
        directoryPathInContainer: --- [3]

Update connectors

To update a connector:

  1. Edit the Connector CR configuration.

    If you change, a new connector will be created.

  2. Apply the changes using the kubectl apply command.

Delete Connectors

To delete a connector, using the connector CR:

kubectl delete -f <connector CR>

To delete a connector, using the connector name:

kubectl delete connector <connector name>

View state of connectors

To see the current status of a connector, run the following command:

kubectl describe connector <connector-CR-name>

The output contains the following fields:

The latest observations of the connector.
The rest endpoint of the Connect cluster.
The actual status of the connector instance.
The connector tasks with state FAILED.
The number of failed tasks.
The Kafka cluster Id the connector belongs to.
The policy to restart failed tasks of the connector.
The CR state of the connector.
The number of running tasks based on taskMax.
The error trace message for the connector instance.
The worker Id for the connector instance.

You can run the following command to get the up-to-date list of the above status fields:

kubectl explain connector.status