Manage Connectors for Confluent Platform Using Confluent for Kubernetes

Confluent for Kubernetes (CFK) supports declaratively creating and managing connectors as Connector custom resources (CRs) in Kubernetes.

Connectors are managed by Kafka Connect. Install connector plugins as described in Install connector plugin before you create connector CRs.

Create connectors

Each Connector CR is mapped to a connector in Kafka Connect worker.

Note

When RBAC is enabled in this Confluent Platform environment, the super user you configured for Kafka (kafka.spec.authorization.superUsers) does not have access to resources in the Connect cluster. If you want the super user to be able to create connectors, grant the super user the permission on the Connect cluster.

  1. Create a connector as a Connector CR:

    metadata:
      name:                --- [1]
    kind: Connector
    spec:
      name:                --- [2]
      taskMax:             --- [3]
      class:               --- [4]
      configs:             --- [5]
      restartPolicy:
        type:              --- [6]
        maxRetry:          --- [7]
      connectClusterRef:   --- [8]
      connectRest:         --- [9]
    
    • [1] Required. The name of this Connector CR.

      To copy the settings of a connector to create a new connector, change this value with the name of the new connector.

    • [2] Optional. Typically, this is the same as metadata.name. If you need to have a name which Kubernetes cannot support, specify that name here.

    • [3] Required. The max number of tasks for the connector.

      When you set this, consider resource consumption. You may need to test in a development environment, first, if setting this to a big number.

    • [4] Required. The class name of the connector.

    • [5] Connector-specific configuration settings as key-value maps. Consult the connector documentation for the required settings.

      See Connector configs for more information, if configuration requires sensitive data or certificates.

    • [6] Required. The policy to restart failed tasks of the connector. Set to OnFailure or Never. The default value is OnFailure, which means it will automatically restart the task when a task failed if maxRetry is not reached.

    • [7] The max retry times to restart when restartPolicy type is OnFailure. The default value is 10.

    • [8] See Discover Connect using Connect cluster name.

    • [9] See Discover Connect using Connect endpoint.

  2. Apply the configuration to create the connector:

    kubectl apply -f <connector CR>
    

For a list of Connector CR examples, see the sample connector CRs in CFK Example Rep.

Connector configs

You can use connector configs (spec.configs) for categories of settings:

Mounted secrets for credentials

Mounted secrets allows you to protect sensitive data, such as passwords, in connector configuration.

To use mounted secrets for connectors:

  1. Create a mounted secret and specify the secret name in the Connect CR as described in Provide Connect credentials using mounted secrets.

  2. In the connector CR, specify the locations of the secret as variables, and CFK dynamically resolves the variables when the connector starts.

    For example, with a secret named, my-credential:

    kind: Connector
    spec:
      configs:
        connection.url: "${file:/mnt/secrets/my-credential/custom.properties:connector-url}"
        connection.user: "${file:/mnt/secrets/my-credential/custom.properties:connector-username}"
        connection.password: "${file:/mnt/secrets/my-credential/custom.properties:connector-password}"
    

Connector TLS certificates

When required connector TLS certificates are not present in the Connect cluster, follow the steps to have the certificates available to the connectors in the Connect cluster.

  1. Create a secret and specify the secret name in the Connect CR as described in Provide connector certificates.

  2. Get the details of the certificates, such as keystore path, truststore path, JKS password, using the following command:

    kubectl get connect -oyaml
    

    The paths are in status.connectorTLSFilePaths.

  3. In the Connector CR, specify the information retrieved in the previous step.

    For example:

    spec:
      configs:
        ssl.truststore.location: "/mnt/sslcerts/truststore.jks"
        ssl.truststore.password: "${file:/mnt/sslcerts/jksPassword.txt:jksPassword}"
        ssl.keystore.location: "/mnt/sslcerts/keystore.jks"
        ssl.keystore.password: "${file:/mnt/sslcerts/jksPassword.txt:jksPassword}"
    

For an example configuration, see Replicator Connector Configuration.

Discover Connect cluster

A connector can discover which Connect cluster it belongs to using one of the following methods:

  • Provide the name of the Connect cluster in connectClusterRef.

    Use this option when the Connect cluster is deployed by CFK as a custom resource.

  • Provide the connection information for the Connect cluster in connectRest.

    Use this option when the Connect cluster is deployed outside of CFK.

  • If neither connectClusterRef or connectRest is provided in the connector CR, CFK tries to discover the Connect cluster in the same namespace. If there are multiple Connect clusters in the same namespace, CFK will logs errors.

Discover Connect using Connect cluster name

To have the connector discover its Connect cluster by the Connect cluster name, use connectClusterRef in the Connector CR:

spec:
  connectClusterRef:
    name:                 --- [1]
    namespace:            --- [2]
  • [1] Required. The name of the Connect cluster.
  • [2] Optional. The namespace where the Connect cluster is deployed. If omitted, the same namespace as this connector is assumed.

Discover Connect using Connect endpoint

You can provide information of the Connect cluster that the connector belongs to in the Connector CR.

Kafka Connect endpoint

spec:
  connectRest:
    endpoint:             --- [1]
  • [1] Connect URL and the port.

For example:

spec:
  connectRest:
    endpoint: https://connect.operator.svc.cluster.local:8083

Basic authentication to Connect

spec:
  connectRest:
    authentication:
      type: basic                 --- [1]
      basic:
        secretRef:                --- [2]
        directoryPathInContainer: --- [3]

mTLS authentication to Connect

spec:
  connectRest:
    authentication:
      type: mtls                 --- [1]
    tls:
      secretRef:                 --- [2]
      directoryPathInContainer:  --- [3]

Bearer authentication to Connect (for RBAC)

When RBAC is enabled, you can configure bearer authentication as below:

spec:
  connectRest:
    authentication:
      type: bearer                --- [1]
      bearer:
        secretRef:                --- [2]
        directoryPathInContainer: --- [3]

OAuth authorization and authentication to Connect

spec:
  connectRest:
    authentication:
      type: oauth                 --- [1]
      oauth:
        secretRef:                --- [2]
        directoryPathInContainer: --- [3]
        configuration:            --- [4]

Update connectors

To update a connector:

  1. Edit the Connector CR configuration.

    If you change metadata.name, a new connector will be created.

  2. Apply the changes using the kubectl apply command.

Delete Connectors

To delete a connector, using the connector CR:

kubectl delete -f <connector CR>

To delete a connector, using the connector name:

kubectl delete connector <connector name>

View state of connectors

To see the current status of a connector, run the following command:

kubectl describe connector <connector-CR-name>

The output contains the following fields:

conditions
The latest observations of the connector.
connectRestEndpoint
The rest endpoint of the Connect cluster.
connectorState
The actual status of the connector instance.
failedTasks
The connector tasks with state FAILED.
failedTasksCount
The number of failed tasks.
kafkaClusterId
The Kafka cluster Id the connector belongs to.
restartPolicy
The policy to restart failed tasks of the connector.
state
The CR state of the connector.
tasksReady
The number of running tasks based on taskMax.
trace
The error trace message for the connector instance.
workerId
The worker Id for the connector instance.

You can run the following command to get the up-to-date list of the above status fields:

kubectl explain connector.status