Configure Kafka Connect

Kafka Connect is a tool for streaming data between Kafka and other data systems. It uses connectors to stream data in to or out of Kafka. Connectors in Kafka Connect define where data should be copied to and from.

Connector plugins are the binaries or JARs that implement the classes/abstractions of connectors. Connector plugins are installed in Connect workers.

When configuring Connect, you need to specify how to install connector plugins for the Connect deployment.

You have the following options to install connector plugins in Confluent for Kubernetes (CFK):

After setting the connector info and other required settings, deploy Connect with kubectl apply.

Starting in Confluent for Kubernetes (CFK) 2.1.0, you can declaratively manage connectors in Kubernetes using the Connector custom resource definition (CRD). After you install the connector plugins as described in this topic, see Manage Connectors using Confluent for Kubernetes for managing connectors.

Install connector plugin

Automatically download and install connector plugins

CFK can automatically download and install connector plugins/JARs from Confluent Hub or a custom artifacts location URL. By specifying both options in the same Connect CR, you can download multiple connectors from Confluent Hub and from custom URLs.

To store the connector plugins, you need to have a specified size of node volume available. The default size is 4 GB, but you can specify a different size in the Connect CR using storageLimit.

Download connector plugins from Confluent Hub

Provide the download info in the Connect CR as below:

kind: Connect
metadata:
  annotations:
    platform.confluent.io/confluent-hub-install-extra-args:    --- [1]
spec:
  build:
    type: onDemand                                             --- [2]
    onDemand:
      plugins:
        confluentHub:                                          --- [3]
          - name:                                              --- [4]
            owner:                                             --- [5]
            version:                                           --- [6]
      storageLimit:                                            --- [7]
  • [1] Optional. An annotation for the additional arguments to be used when the Connect starts up and downloads plugins from Confluent Hub. For example:

    platform.confluent.io/confluent-hub-install-extra-args: “--worker-configs /dev/null --component-dir /mnt/plugins”
    
  • [2] Required to have CFK automatically download connector plugins.

  • [3] Provide an array of plugins to be downloaded.

  • [4] Required. The name of this connector plugin.

  • [5] Required. The individual or organization that provides the plugin, for example, confluentinc.

  • [6] Required. The version of this plugin. Set to the version of the plugin or latest.

  • [7] Optional. The max amount of node volume that can be used to store the connector plugins. The default value is 4G.

Download connector plugins from a custom URL

Provide the download info in the Connect CR as below:

kind: Connect
metadata:
  annotations:
  platform.confluent.io/connector-wget-extra-args:   --- [1]
spec:
  build:
    type: onDemand                                   --- [2]
    onDemand:                                        --- [3]
      plugins:
        url:                                         --- [4]
        - name:                                      --- [5]
          archivePath:                               --- [6]
          checksum:                                  --- [7]
      storageLimit:                                  --- [8]
  • [1] Optional. An annotation for the additional arguments to be used when the Connect starts up and downloads plugins from a custom URL.

    Set the annotation to "--no-check-certificate" to ignore SSL verification when downloading plugins from a site with SSL enabled but when the CA is not available to CFK:

    Caution

    This annotation is not recommended as it could pose a security risk:

  • [2] Required to have CFK automatically download connector plugins.

  • [3] Required when type: onDemand set in [1].

  • [4] Provide an array of plugins to be downloaded.

  • [5] Required. The name of this connector plugin.

  • [6] Required. The archive path of the zip file that contains this plugin.

  • [7] Required. Defines the sha512sum checksum of the plugin’s remote file. It is used to verify the remote file after download.

  • [8] Optional. The max amount of node volume that can be used to store the connector plugins. The default value is 4G.

Note

If you are setting the custom plugin.path property in spec.configOverrides, such as, to use the FileStream connectors, you must include /mnt/plugins in plugin.path. For example:

spec:
  configOverrides:
    server:
    - plugin.path=/usr/share/java,/mnt/plugins

For an example Connect CR, see CFK GitHub examples repo.

View Connect init container logs

To troubleshoot issues while installing connector plugins, run the kubectl logs command to view the logs from the init container of the Connect pod. For example:

kubectl logs -f connect-0 -c config-init-container

Extend the Connect Docker image with connector plugins

This section describes how to extend the Connect image with connector plugins.

Add a new connector to one of the following Connect images:

  • For Confluent Platform 6.2.x and higher, use the cp-server-connect image.

    The remainder of this document uses this image.

  • For Confluent Platform 6.1.x and below, use the cp-server-connect-operator image.

The image contains Connect and all of its dependencies. It does not contain any Connector JARs.

To add new connectors to the Connect image, you need to build a new Docker image that has the new connectors installed.

  1. Create a Dockerfile in <dockerfile-dir> to add one or more connectors to the cp-server-connect image.

    You can either:

    • Pull connectors from Confluent Hub.
    • Use the connector JAR downloaded to the machine you are running the Docker build from.

    To pull connectors from Confluent Hub:

    Create a Dockerfile as follows:

    FROM confluentinc/cp-server-connect:<Confluent Platform release>
    USER root
    RUN confluent-hub install --no-prompt <connector1>:<connector1-version> \
      && confluent-hub install --no-prompt <connector2>:<connector2-version> \
      && ...
    USER 1001
    

    An example Dockerfile to create a Docker image with the data-gen connector from Confluent Hub:

    FROM confluentinc/cp-server-connect:7.4.5
    USER root
    RUN  confluent-hub install --no-prompt confluentinc/kafka-connect-datagen:0.3.3
    USER 1001
    

    To use the connector JAR downloaded to the machine you are running the Docker build from:

    Create a Dockerfile as follows:

    FROM confluentinc/cp-server-connect:<Confluent Platform release>
    ADD <local-connector1-path> /usr/share/java/<connector1> \
      && <local-connector2-path> /usr/share/java/<connector2> \
      && ...
    USER 1001
    

    An example Dockerfile to use the data-gen connector existing on your local machine in the <connector-dir> directory:

    FROM confluentinc/cp-server-connect:7.4.5
    ADD​ my-connector-dir/confluentinc-kafka-connect-datagen /usr/share/java/confluentinc-kafka-connect-datagen
    USER 1001
    
  2. Build and push the image with the following commands;

    docker build <dockerfile-dir> -t <someregistry>/<somerepository>:<sometag>
    
    docker push <someregistry>/<somerepository>:<sometag>
    
  3. Get the Docker image details from the output of the above process and specify the repository and tag in the Connect CR.

    spec:
      image:
        application: <someregistry>/<somerepository>:<sometag>
    

Provide connector certificates

Follow the steps to have the required certificates available to the connectors in the Connect cluster.

  1. If using a Kubernetes secret for the certificates, create the secret using the kubectl create secret command, and pass the secret name in the next step in secretRef ([4]).

  2. If using Vault for the certificates, inject the certificates as described in Provide secrets in Vault, and pass the directory path in the next step in directoryPathInContainer ([2]).

    For example use cases, see CFK examples for using Vault.

  3. Specify the secret name in the Connect CR:

    kind: Connect
    spec:
      connectorTLSCerts:          --- [1]
      - directoryPathInContainer: --- [2]
        jksPassword:
          secretRef:              --- [3]
        secretRef:                --- [4]
    
    • [1] A list of connector TLS certificates reference injected in the Connect pod for the connector use.

      Use one of directoryPathInContainer, jksPassword, or secretRef for each set of certificate information.

    • [2] [4] CFK supports the secretRef and directoryPathInContainer methods to inject the connector TLS certificates. Specify either directoryPathInContainer ([2]) or secretRef ([4]).

    • [2] The directory path in container where keystore.jks, truststore.jks, jksPassword.txt keys are mounted. For example:

      spec:
        connectorTLSCerts:
        - directoryPathInContainer: /vault/secrets
      

      When you use the Vault directory path (directoryPathInContainer), CFK does not automate the creation of keystore and truststore. You need to create the keystore and truststore first, and those must be present in the Vault directory path.

    • [3] The secret name referenced for the JKS password. Expects the key/value in the following format: jksPassword.txt=jksPassword=<user_provided_password>

      If omitted, CFK will use the default password, mystorepassword. For more information, see Provide TLS keys and certificates in Java KeyStore format.

    • [4] The Kubernetes secret name that contains the connector TLS certificates.

  4. In the connector CR, specify the locations.

    See Connector TLS certificates for an example.

For an example configuration, see Replicator Connector Configuration.

Provide Connect credentials using mounted secrets

You can use mounted secrets to protect sensitive data, such as passwords, in connector configuration.

  1. Create a secret as described in Mount custom Kubernetes secrets. For example:

    kubectl create secret generic my-credential \
      --from-file=my-credential.txt=/my-dir/my-credential.txt
    

    The secret reference is mounted in the default path, /mnt/secrets/<secret-name>.

  2. Specify the above secret name in the Connect CR:

    spec:
      mountedSecrets:
        - secretRef:  # The name of the secret that contains the credentials.
    
  3. In the connector CR, you specify the locations of the secret as variables, and CFK dynamically resolves the variables when the connector starts.

    See Mounted secrets for credentials for an example.