Manage Connectors¶
Confluent for Kubernetes (CFK) supports declaratively creating and managing connectors as Connector custom resources (CRs) in Kubernetes.
Connectors are managed by Kafka Connect. Install connector plugins as described in Install connector plugin before you create connector CRs.
Create connectors¶
Each Connector CR is mapped to a connector in Kafka Connect worker.
Create a connector as a Connector CR:
metadata: name: --- [1] kind: Connector spec: name: --- [2] taskMax: --- [3] class: --- [4] configs: --- [5] restartPolicy: type: --- [6] maxRetry: --- [7] connectClusterRef: --- [8] connectRest: --- [9]
[1] Required. The name of this Connector CR.
To copy the settings of a connector to create a new connector, change this value with the name of the new connector.
[2] Optional. Typically, this is the same as
metadata.name
. If you need to have a name which Kubernetes cannot support, specify that name here.[3] Required. The max number of tasks for the connector.
When you set this, consider resource consumption. You may need to test in a development environment, first, if setting this to a big number.
[4] Required. The class name of the connector.
[5] Connector-specific configuration settings as key-value maps. Consult the connector documentation for the required settings.
See Connector configs for more information, if configuration requires sensitive data or certificates.
[6] Required. The policy to restart failed tasks of the connector. Set to
OnFailure
orNever
. The default value isOnFailure
, which means it will automatically restart the task when a task failed ifmaxRetry
is not reached.[7] The max retry times to restart when
restartPolicy
type isOnFailure
. The default value is10
.
Apply the configuration to create the connector:
kubectl apply -f <connector CR>
Connector configs¶
You can use connector configs (spec.configs
) for categories of settings:
- Connector-specific settings
- Sensitive information, such as user credentials
- Connector TLS certificates
Mounted secrets for credentials¶
Mounted secrets allows you to protect sensitive data, such as passwords, in connector configuration.
To use mounted secrets for connectors:
Create a mounted secret and specify the secret name in the Connect CR as described in Provide Connect credentials using mounted secrets.
In the connector CR, specify the locations of the secret as variables, and CFK dynamically resolves the variables when the connector starts.
For example, with a secret named,
my-credential
:spec: config: connection.url: "${file:/mnt/secrets/my-credential/custom.properties:connector-url}" connection.user: "${file:/mnt/secrets/my-credential/custom.properties:connector-username}" connection.password: "${file:/mnt/secrets/my-credential/custom.properties:connector-password}"
Connector TLS certificates¶
When required connector TLS certificates are not present in the Connect cluster, follow the steps to have the certificates available to the connectors in the Connect cluster.
Create a secret and specify the secret name in the Connect CR as described in Provide connector certificates.
Get the details of the certificates, such as keystore path, truststore path, JKS password, using the following command:
kubectl get connect -oyaml
The paths are in
status.connectorTLSFilePaths
.In the connector CR, specify the information retrieved in the previous step.
For example:
spec: configs: ssl.truststore.location: "/mnt/sslcerts/truststore.jks" ssl.truststore.password: "${file:/mnt/sslcerts/jksPassword.txt:jksPassword}" ssl.keystore.location: "/mnt/sslcerts/keystore.jks" ssl.keystore.password: "${file:/mnt/sslcerts/jksPassword.txt:jksPassword}"
Discover Connect cluster¶
A connector can discover which Connect cluster it belongs to using one of the following methods:
Provide the name of the Connect cluster in
connectClusterRef
.Use this option when the Connect cluster is deployed by CFK as a custom resource.
Provide the connection information for the Connect cluster in
connectRest
.Use this option when the Connect cluster is deployed outside of CFK.
If neither
connectClusterRef
orconnectRest
is provided in the connector CR, CFK tries to discover the Connect cluster in the same namespace. If there are multiple Connect clusters in the same namespace, CFK will logs errors.
Discover Connect using Connect cluster name¶
To have the connector discover its Connect cluster by the Connect cluster
name, use connectClusterRef
in the Connector CR:
spec:
connectClusterRef:
name: --- [1]
namespace: --- [2]
- [1] Required. The name of the Connect cluster.
- [2] Optional. The namespace where the Connect cluster is deployed. If omitted, the same namespace as this connector is assumed.
Discover Connect using Connect endpoint¶
You can provide information of the Connect cluster that the connector belongs to in the Connector CR.
Kafka Connect endpoint
spec:
connectRest:
endpoint: --- [1]
- [1] Connect URL and the port.
For example:
spec:
connectRest:
endpoint: https://connect.operator.svc.cluster.local:8083
Basic authentication to Connect
spec:
connectRest:
authentication:
type: basic --- [1]
basic:
secretRef: --- [2]
directoryPathInContainer: --- [3]
[1] Required.
[2] or [3] is required.
[2] The name of the secret that contains the credentials. See Basic authentication for the required format.
[3] The directory path in the container where required authentication credentials are injected by Vault.
See Basic authentication for the required format.
See Provide secrets for Confluent Platform application CR for providing the credential and required annotations when using Vault.
mTLS authentication to Connect
spec:
connectRest:
authentication:
type: mtls --- [1]
tls:
secretRef: --- [2]
directoryPathInContainer: --- [3]
[1] Required.
[2] or [3] is required.
[2] The name of the secret that contains the certificates. See Configure Network Encryption with Confluent for Kubernetes for more information on certificates.
[3] The directory path in the container where the required certificates are injected by Vault.
See Configure Network Encryption with Confluent for Kubernetes for more information on certificates.
See Provide secrets for Confluent Platform application CR for providing the certificates and required annotations when using Vault.
Bearer authentication to Connect (for RBAC)
When RBAC is enabled, you can configure bearer authentication as below:
spec:
connectRest:
authentication:
type: bearer --- [1]
bearer:
secretRef: --- [2]
directoryPathInContainer: --- [3]
[1] Required.
[2] Required. The name of the secret that contains the bearer credentials. See Bearer authentication for the required format.
- [3] The directory path in the container where bearer credentials are injected
by Vault.
See Bearer authentication for the required format.
See Provide secrets for Confluent Platform application CR for providing the credential and required annotations when using Vault.
Update connectors¶
To update a connector:
Edit the Connector CR configuration.
If you change
metadata.name
, a new connector will be created.Apply the changes using the
kubectl apply
command.
Delete Connectors¶
To delete a connector, using the connector CR:
kubectl delete -f <connector CR>
To delete a connector, using the connector name:
kubectl delete connector <connector name>
View state of connectors¶
To see the current status of a connector, run the following command:
kubectl describe connector <connector-CR-name>
The output contains the following fields:
conditions
- The latest observations of the connector.
connectRestEndpoint
- The rest endpoint of the Connect cluster.
connectorState
- The actual status of the connector instance.
failedTasks
- The connector tasks with state
FAILED
. failedTasksCount
- The number of failed tasks.
kafkaClusterId
- The Kafka cluster Id the connector belongs to.
restartPolicy
- The policy to restart failed tasks of the connector.
state
- The CR state of the connector.
tasksReady
- The number of running tasks based on
taskMax
. trace
- The error trace message for the connector instance.
workerId
- The worker Id for the connector instance.
You can run the following command to get the up-to-date list of the above status fields:
kubectl explain connector.status