Configure Kafka Connect

Kafka Connect is a tool for streaming data between Kafka and other data systems. It uses connectors to stream data in to or out of Kafka. Connectors in Kafka Connect define where data should be copied to and from.

Connector plugins are the binaries or JARs that implement the classes/abstractions of connectors. Connector plugins are installed in Connect workers.

When configuring Connect, you need to specify how to install connector plugins for the Connect deployment.

You have the following options to install connector plugins in Confluent for Kubernetes (CFK):

After setting the connector info and other required settings, deploy Connect with kubectl apply.

Starting in Confluent for Kubernetes (CFK) 2.1.0, you can declaratively manage connectors in Kubernetes using the Connector custom resource definition (CRD). After you install the connector plugins as described in this topic, see Manage Connectors for managing connectors.

Install connector plugin

Automatically download and install connector plugins

CFK can automatically download and install connector plugins/jars from Confluent Hub or a custom artifacts location URL.

Currently, CFK does not support deploying a connect worker with two different location types (locationType), set to both confluentHub and url.

To store the connector plugins, you need to have a specified size of node volume available. The default size is 4 GB, but you can specify a different size in the Connect CR using storageLimit.

Provide the download info in the Connect CR as below:

To download from Confluent Hub:

spec:
  build:
    type: onDemand                  --- [1]
    onDemand:
      plugins:
        locationType: confluentHub  --- [2]
        confluentHub:               --- [3]
          - name:                   --- [4]
            owner:                  --- [5]
            version:                --- [6]
      storageLimit:                 --- [7]
  • [1] Required to have CFK automatically download connector plugins.
  • [2] Required. Set to locationType: confluentHub to download this plugin from Confluent Hub.
  • [3] Required when locationType: confluentHub set in [4]. Provide an array of plugins to be downloaded.
  • [4] Required. The name of this connector plugin.
  • [5] Required. The individual or organization that provides the plugin, for example, confluentinc.
  • [6] Required. The version of this plugin. Set to the version of the plugin or latest.
  • [7] Optional. The max amount of node volume that can be used to store the connector plugins. The default value is 4G.

To download from a custom URL:

spec:
  build:
    type: onDemand                  --- [1]
    onDemand:                       --- [2]
      plugins:
        locationType: url           --- [3]
        url:                        --- [4]
        - name:                     --- [5]
          archivePath:              --- [6]
          checksum:                 --- [7]
      storageLimit:                 --- [8]
  • [1] Required to have CFK automatically download connector plugins.
  • [2] Required when type: onDemand set in [1].
  • [3] Required. Set to locationType: url to download this plugin from a custom location.
  • [4] Required when locationType: url set in [4]. Provide an array of plugins to be downloaded.
  • [5] Required. The name of this connector plugin.
  • [6] Required. The archive path of the zip file that contains this plugin.
  • [7] Required. Defines the sha512sum checksum of the plugin’s remote file. It is used to verify the remote file after download.
  • [8] Optional. The max amount of node volume that can be used to store the connector plugins. The default value is 4G.

Note

If you are setting the custom plugin.path property in spec.configOverrides, such as, to use the FileStream connectors, you must include /mnt/plugins in plugin.path. For example:

spec:
  configOverrides:
    server:
    - plugin.path=/usr/share/java,/mnt/plugins

For an example Connect CR, see CFK GitHub examples repo.

View Connect init container logs

To troubleshoot issues while installing connector plugins, run the kubectl logs command to view the logs from the init container of the Connect pod. For example:

kubectl logs -f connect-0 -c config-init-container

Extend the Connect Docker image with connector plugins

This section describes how to extend the Connect image with connector plugins.

Add a new connector to one of the following Connect images:

  • For Confluent Platform 6.2.x and higher, use the cp-server-connect image.

    The remainder of this document uses this image.

  • For Confluent Platform 6.1.x and below, use the cp-server-connect-operator image.

The image contains Connect and all of its dependencies. It does not contain any Connector jars.

To add new connectors to the Connect image, you need to build a new Docker image that has the new connectors installed.

  1. Create a Dockerfile in <dockerfile-dir> to add one or more connectors to the cp-server-connect image.

    You can either:

    • Pull connectors from Confluent Hub.
    • Use the connector JAR downloaded to the machine you are running the Docker build from.

    To pull connectors from Confluent Hub:

    Create a Dockerfile as follows:

    FROM confluentinc/cp-server-connect:<Confluent Platform release>
    USER root
    RUN confluent-hub install --no-prompt <connector1>:<connector1-version> \
      && confluent-hub install --no-prompt <connector2>:<connector2-version> \
      && ...
    USER 1001
    

    An example Dockerfile to create a Docker image with the data-gen connector from Confluent Hub:

    FROM confluentinc/cp-server-connect:7.0.1
    USER root
    RUN  confluent-hub install --no-prompt confluentinc/kafka-connect-datagen:0.3.3
    USER 1001
    

    To use the connector JAR downloaded to the machine you are running the Docker build from:

    Create a Dockerfile as follows:

    FROM confluentinc/cp-server-connect:<Confluent Platform release>
    ADD <local-connector1-path> /usr/share/java/<connector1> \
      && <local-connector2-path> /usr/share/java/<connector2> \
      && ...
    USER 1001
    

    An example Dockerfile to use the data-gen connector existing on your local machine in the <connector-dir> directory:

    FROM confluentinc/cp-server-connect:7.0.1
    ADD​ my-connector-dir/confluentinc-kafka-connect-datagen /usr/share/java/confluentinc-kafka-connect-datagen
    USER 1001
    
  2. Build and push the image with the following commands;

    docker build <dockerfile-dir> -t <someregistry>/<somerepository>:<sometag>
    
    docker push <someregistry>/<somerepository>:<sometag>
    
  3. Get the Docker image details from the output of the above process and specify the repository and tag in the Connect CR.

    spec:
      image:
        application: <someregistry>/<somerepository>:<sometag>
    

Provide connector certificates

When required connector TLS certificates are not present in the Connect cluster, follow the steps to have the certificates available to the connectors in the Connect cluster.

  1. Create the secret using the kubectl create secret command.

  2. Specify the secret name in the Connect CR:

    spec:
      connectorTLSCerts:          --- [1]
      - directoryPathInContainer: --- [2]
        jksPassword:
          secretRef:              --- [3]
        secretRef:                --- [4]
    
    • [1] A list of connector TLS certificates reference injected in the Connect pod for the connector use.

    • [2] The directory path in container where keystore.jks, truststore.jks, jksPassword.txt keys are mounted.

    • [3] The secret name referenced for the JKS password. Expects the key/value in the following format: jksPassword.txt=jksPassword=<user_provided_password>

      If omitted, CFK will use the default password, mystorepassword. See Provide TLS keys and certificates in Java Keystore format for more information.

    • [4] The secret name that contains the connector TLS certificates.

  3. In the connector CR, specify the locations.

    See Connector TLS certificates for an example.

Provide Connect credentials using mounted secrets

You can use mounted secrets to protect sensitive data, such as passwords, in connector configuration.

  1. Create a secret as described in Provide mounted secrets. For example:

    kubectl create secret generic my-credential \
      --from-file=my-credential.txt=/my-dir/my-credential.txt
    

    The secret reference is mounted in the default path, /mnt/secrets/<secret-name>.

  2. Specify the above secret name in the Connect CR:

    spec:
      mountedSecrets:
        - secretRef:  # The name of the secret that contains the credentials.
    
  3. In the connector CR, you specify the locations of the secret as variables, and CFK dynamically resolves the variables when the connector starts.

    See Mounted secrets for credentials for an example.