Kafka Connect の構成

Kafka Connect は、Kafka と他のデータシステムとの間でデータのストリーミングを行うためのツールです。これは、コネクターを使用して Kafka との間でデータのストリーミングを行います。Kafka Connect では、コネクターによってデータのコピー先とコピー元が決まります。

コネクタープラグインは、コネクターのクラスや抽象化を実装するバイナリまたは JAR です。コネクタープラグインは Connect ワーカーにインストールされます。

Connect を構成するときに、Connect をデプロイするためのコネクタープラグインのインストール方法を指定する必要があります。

Confluent for Kubernetes (CFK)でコネクタープラグインをインストールするには以下の方法があります。

コネクター情報を設定し、その他の必要な設定を行った後、kubectl apply を使用して Connect をデプロイします。

Confluent for Kubernetes (CFK)2.1.0 以降、コネクターのカスタムリソース定義(CRD)を使用して、Kubernetes でコネクターを宣言的に管理できます。このトピックの説明に従ってコネクタープラグインをインストールした後に、「コネクターの管理」を参照してコネクターの管理について確認してください。

コネクタープラグインのインストール

コネクタープラグインの自動ダウンロードおよびインストール

CFK は、Confluent Hub またはカスタムのアーティファクトの場所の URL からコネクタープラグインまたは JAR を自動的にダウンロードし、インストールすることができます。

現在、CFK では、confluentHuburl の両方に設定した 2 つの異なるロケーションタイプ(locationType)が指定された Connect ワーカーのデプロイをサポートしていません。

コネクタープラグインを保管するには、指定されたサイズのノードボリュームを使用可能な状態にしておく必要があります。デフォルトのサイズは 4 GB ですが、storageLimit を使用して Connect CR で別のサイズを指定できます。

ダウンロード情報を Connect CR に次のように指定します。

To download from Confluent Hub:

kind: Connect
metadata:
  annotations:
    platform.confluent.io/confluent-hub-install-extra-args:    --- [1]
spec:
  build:
    type: onDemand                                             --- [2]
    onDemand:
      plugins:
        locationType: confluentHub                             --- [3]
        confluentHub:                                          --- [4]
          - name:                                              --- [5]
            owner:                                             --- [6]
            version:                                           --- [7]
      storageLimit:                                            --- [8]
  • [1] Optional. An annotation for the additional arguments to be used when the Connect starts up and downloads plugins from Confluent Hub. For example:

    platform.confluent.io/confluent-hub-install-extra-args: “--worker-configs /dev/null --component-dir /mnt/plugins”
    
  • [2] CFK でコネクタープラグインを自動的にダウンロードする場合に必須です。

  • [3](必須)``locationType: confluentHub`` に設定して、このプラグインを Confluent Hub からダウンロードします。

  • [4] [3] で locationType: confluentHub が設定されている場合に必須です。ダウンロード対象のプラグインの配列を指定します。

  • [5](必須)このコネクタープラグインの名前。

  • [6](必須)このプラグインを提供する個人または組織(confluentinc など)。

  • [7](必須)このプラグインのバージョン。このプラグインのバージョンまたは latest に設定します。

  • [8](省略可)コネクタープラグインを保管するために使用できるノードボリュームの最大サイズ。デフォルト値は 4G です。

To download from a custom URL:

kind: Connect
spec:
  build:
    type: onDemand                                             --- [1]
    onDemand:                                                  --- [2]
      plugins:
        locationType: url                                      --- [3]
        url:                                                   --- [4]
        - name:                                                --- [5]
          archivePath:                                         --- [6]
          checksum:                                            --- [7]
      storageLimit:                                            --- [8]
  • [1] Required to have CFK automatically download connector plugins.
  • [2] Required when type: onDemand set in [1].
  • [3] Required. Set to locationType: url to download this plugin from a custom location.
  • [4] Required when locationType: url set in [3]. Provide an array of plugins to be downloaded.
  • [5](必須)このコネクタープラグインの名前。
  • [6] Required. The archive path of the zip file that contains this plugin.
  • [7] Required. Defines the sha512sum checksum of the plugin's remote file. It is used to verify the remote file after download.
  • [8](省略可)コネクタープラグインを保管するために使用できるノードボリュームの最大サイズ。デフォルト値は 4G です。

注釈

FileStream コネクターを使用する場合などに、カスタムの plugin.path プロパティを spec.configOverrides に設定する場合、/mnt/pluginsplugin.path に含める必要があります。例を次に示します。

spec:
  configOverrides:
    server:
    - plugin.path=/usr/share/java,/mnt/plugins

Connect の CR の例については、GitHub の CFK サンプルリポジトリ を参照してください。

Connect init コンテナーログの表示

コネクタープラグインのインストール時に発生した問題をトラブルシューティングするには、kubectl logs コマンドを実行して、Connect ポッドの init コンテナーのログを表示します。以下に例を示します。

kubectl logs -f connect-0 -c config-init-container

コネクタープラグインを使用して Connect Docker イメージを拡張する

このセクションでは、コネクタープラグインを使用して Connect イメージを拡張する方法について説明します。

以下の Connect イメージのいずれかに新しいコネクターを追加します。

  • Confluent Platform 6.2.x 以降の場合は、cp-server-connect イメージを使用します。

    このドキュメントの以降の部分では、このイメージを使用します。

  • Confluent Platform 6.1.x 以前の場合、cp-server-connect-operator イメージを使用します。

イメージには、Connect およびそのすべての依存関係が含まれます。コネクターの JAR は一切含まれていません。

Connect イメージに新しいコネクターを追加するには、新しいコネクターがインストールされた新しい Docker イメージをビルドする必要があります。

  1. 1 つまたは複数のコネクターを cp-server-connect イメージに追加するには、<dockerfile-dir>Dockerfile を作成します。

    以下のいずれかを行います。

    • Confluent Hub からコネクターをプルします。
    • Docker ビルドを実行しているマシンにダウンロードしたコネクター JAR を使用します。

    Confluent Hub からコネクターをプルする場合

    以下のように Dockerfile を作成します。

    FROM confluentinc/cp-server-connect:<Confluent Platform release>
    USER root
    RUN confluent-hub install --no-prompt <connector1>:<connector1-version> \
      && confluent-hub install --no-prompt <connector2>:<connector2-version> \
      && ...
    USER 1001
    

    この Dockerfile の例では、Docker イメージの作成に Confluent Hub の data-gen コネクターを使用しています。

    FROM confluentinc/cp-server-connect:7.1.0
    USER root
    RUN  confluent-hub install --no-prompt confluentinc/kafka-connect-datagen:0.3.3
    USER 1001
    

    Docker ビルドを実行しているマシンにダウンロードしたコネクター JAR を使用する場合

    以下のように Dockerfile を作成します。

    FROM confluentinc/cp-server-connect:<Confluent Platform release>
    ADD <local-connector1-path> /usr/share/java/<connector1> \
      && <local-connector2-path> /usr/share/java/<connector2> \
      && ...
    USER 1001
    

    この例の Dockerfile では、ローカルマシンの <connector-dir> ディレクトリにある data-gen コネクターを使用します。

    FROM confluentinc/cp-server-connect:7.1.0
    ADD​ my-connector-dir/confluentinc-kafka-connect-datagen /usr/share/java/confluentinc-kafka-connect-datagen
    USER 1001
    
  2. 以下のコマンドを実行して、イメージをビルドしてプッシュします。

    docker build <dockerfile-dir> -t <someregistry>/<somerepository>:<sometag>
    
    docker push <someregistry>/<somerepository>:<sometag>
    
  3. Docker イメージの詳細を上記のプロセスの出力から入手し、Connect の CR でリポジトリとタグを指定します。

    spec:
      image:
        application: <someregistry>/<somerepository>:<sometag>
    

コネクターの証明書の指定

必要なコネクター TLS 証明書が Connect クラスターに存在しない場合は、以下の手順に従って、Connect クラスター内のコネクターで証明書を使用できるようにします。

  1. kubectl create secret コマンドを使用してシークレットを作成します。

  2. Connect CR で、次のようにシークレット名を指定します。

    spec:
      connectorTLSCerts:          --- [1]
      - directoryPathInContainer: --- [2]
        jksPassword:
          secretRef:              --- [3]
        secretRef:                --- [4]
    
    • [1] コネクターで使用するために Connect ポッドに挿入されたコネクターの TLS 証明書参照のリスト。

    • [2] keystore.jkstruststore.jksjksPassword.txt の各キーがマウントされているコンテナー内のディレクトリパス。

    • [3] JKS パスワードとして参照されるシークレット名。jksPassword.txt=jksPassword=<user_provided_password> という形式のキーと値が想定されます。

      省略した場合、CFK はデフォルトパスワードの mystorepassword を使用します。詳細については、「Java キーストア形式の TLS キーおよび証明書の提供」を参照してください。

    • [4] コネクターの TLS 証明書を格納するシークレット名。

  3. コネクターの CR で、場所を指定します。

    例については、「コネクターの TLS 証明書」を参照してください。

マウントされたシークレットを使用した Connect 認証情報の指定

マウントされたシークレット を使用して、コネクター構成内のパスワードなどの機密データを保護できます。

  1. カスタム Kubernetes シークレットのマウント」の説明に従って、シークレットを作成します。以下に例を示します。

    kubectl create secret generic my-credential \
      --from-file=my-credential.txt=/my-dir/my-credential.txt
    

    シークレット参照は、デフォルトパスの /mnt/secrets/<secret-name> にマウントされます。

  2. Connect CR で、上記のシークレット名を次のように指定します。

    spec:
      mountedSecrets:
        - secretRef:  # The name of the secret that contains the credentials.
    
  3. コネクター CR で、シークレットの場所を変数として指定すると、コネクターの起動時にそれらの変数が CFK によって動的に解決されます。

    例については、「認証情報のマウントされたシークレット」を参照してください。