Kafka Connect の構成

Kafka Connect は、Kafka と他のデータシステムとの間でデータのストリーミングを行うためのツールです。これは、コネクターを使用して Kafka との間でデータのストリーミングを行います。Kafka Connect では、コネクターによってデータのコピー先とコピー元が決まります。

コネクタープラグインは、コネクターのクラスや抽象化を実装するバイナリまたは JAR です。コネクタープラグインは Connect ワーカーにインストールされます。

Connect を構成するときに、Connect をデプロイするためのコネクタープラグインのインストール方法を指定する必要があります。

Confluent for Kubernetes (CFK)でコネクタープラグインをインストールするには以下の方法があります。

コネクター情報を設定し、その他の必要な設定を行った後、kubectl apply を使用して Connect をデプロイします。

Confluent for Kubernetes (CFK)2.1.0 以降、コネクターのカスタムリソース定義(CRD)を使用して、Kubernetes でコネクターを宣言的に管理できます。このトピックの説明に従ってコネクタープラグインをインストールした後に、「コネクターの管理」を参照してコネクターの管理について確認してください。

コネクタープラグインのインストール

コネクタープラグインの自動ダウンロードおよびインストール

CFK は、Confluent Hub またはカスタムのアーティファクトの場所の URL からコネクタープラグインまたは JAR を自動的にダウンロードし、インストールすることができます。

現在、CFK では、confluentHuburl の両方に設定した 2 つの異なるロケーションタイプ(locationType)が指定された Connect ワーカーのデプロイをサポートしていません。

コネクタープラグインを保管するには、指定されたサイズのノードボリュームを使用可能な状態にしておく必要があります。デフォルトのサイズは 4 GB ですが、storageLimit を使用して Connect CR で別のサイズを指定できます。

ダウンロード情報を Connect CR に次のように指定します。

Confluent Hub からダウンロードするには、次のように指定します。

spec:
  build:
    type: onDemand                  --- [1]
    onDemand:
      plugins:
        locationType: confluentHub  --- [2]
        confluentHub:               --- [3]
          - name:                   --- [4]
            owner:                  --- [5]
            version:                --- [6]
      storageLimit:                 --- [7]
Copy
  • [1] CFK でコネクタープラグインを自動的にダウンロードする場合に必須です。
  • [2](必須) locationType: confluentHub に設定して、このプラグインを Confluent Hub からダウンロードします。
  • [3] locationType: confluentHub が [4] で設定されている場合に必須。ダウンロードするプラグインの配列を指定します。
  • [4](必須)このコネクタープラグインの名前。
  • [5](必須)このプラグインを提供する個人または組織。たとえば、 confluentinc などです。
  • [6](必須)このプラグインのバージョン。このプラグインのバージョンまたは latest に設定します。
  • [7](省略可)コネクタープラグインを保管するために使用できるノードボリュームの最大サイズ。デフォルト値は 4G です。

カスタム URL からダウンロードするには、次のように指定します。

spec:
  build:
    type: onDemand                  --- [1]
    onDemand:                       --- [2]
      plugins:
        locationType: url           --- [3]
        url:                        --- [4]
        - name:                     --- [5]
          archivePath:              --- [6]
          checksum:                 --- [7]
      storageLimit:                 --- [8]
Copy
  • [1] CFK でコネクタープラグインを自動的にダウンロードする場合に必須です。
  • [2] type: onDemand が [1] で設定されている場合に必須。
  • [3](必須)``locationType: url`` に設定して、このプラグインをカスタムの場所からダウンロードします。
  • [4] locationType: url が [4] で設定されている場合に必須。ダウンロード対象のプラグインの配列を指定します。
  • [5](必須)このコネクタープラグインの名前。
  • [6](必須)このプラグインを含む zip ファイルのアーカイブパス。
  • [7](必須)プラグインのリモートファイルの sha512sum チェックサムを定義します。これは、ダウンロード後にリモートファイルを検証するために使用します。
  • [8](省略可)コネクタープラグインを保管するために使用できるノードボリュームの最大サイズ。デフォルト値は 4G です。

注釈

FileStream コネクターを使用する場合などに、カスタムの plugin.path プロパティを spec.configOverrides に設定する場合、/mnt/pluginsplugin.path に含める必要があります。例を次に示します。

spec:
  configOverrides:
    server:
    - plugin.path=/usr/share/java,/mnt/plugins
Copy

Connect の CR の例については、GitHub の CFK サンプルリポジトリ を参照してください。

Connect init コンテナーログの表示

コネクタープラグインのインストール時に発生した問題をトラブルシューティングするには、kubectl logs コマンドを実行して、Connect ポッドの init コンテナーのログを表示します。以下に例を示します。

kubectl logs -f connect-0 -c config-init-container
Copy

コネクタープラグインを使用して Connect Docker イメージを拡張する

このセクションでは、コネクタープラグインを使用して Connect イメージを拡張する方法について説明します。

以下の Connect イメージのいずれかに新しいコネクターを追加します。

  • Confluent Platform 6.2.x 以降の場合は、cp-server-connect イメージを使用します。

    このドキュメントの以降の部分では、このイメージを使用します。

  • Confluent Platform 6.1.x 以前の場合、cp-server-connect-operator イメージを使用します。

イメージには、Connect およびそのすべての依存関係が含まれます。コネクターの JAR は一切含まれていません。

Connect イメージに新しいコネクターを追加するには、新しいコネクターがインストールされた新しい Docker イメージをビルドする必要があります。

  1. 1 つまたは複数のコネクターを cp-server-connect イメージに追加するには、<dockerfile-dir>Dockerfile を作成します。

    以下のいずれかを行います。

    • Confluent Hub からコネクターをプルします。
    • Docker ビルドを実行しているマシンにダウンロードしたコネクター JAR を使用します。

    Confluent Hub からコネクターをプルする場合

    以下のように Dockerfile を作成します。

    FROM confluentinc/cp-server-connect:<Confluent Platform release>
    USER root
    RUN confluent-hub install --no-prompt <connector1>:<connector1-version> \
      && confluent-hub install --no-prompt <connector2>:<connector2-version> \
      && ...
    USER 1001
    
    Copy

    この Dockerfile の例では、Docker イメージの作成に Confluent Hub の data-gen コネクターを使用しています。

    FROM confluentinc/cp-server-connect:7.0.1
    USER root
    RUN  confluent-hub install --no-prompt confluentinc/kafka-connect-datagen:0.3.3
    USER 1001
    
    Copy

    Docker ビルドを実行しているマシンにダウンロードしたコネクター JAR を使用する場合

    以下のように Dockerfile を作成します。

    FROM confluentinc/cp-server-connect:<Confluent Platform release>
    ADD <local-connector1-path> /usr/share/java/<connector1> \
      && <local-connector2-path> /usr/share/java/<connector2> \
      && ...
    USER 1001
    
    Copy

    この例の Dockerfile では、ローカルマシンの <connector-dir> ディレクトリにある data-gen コネクターを使用します。

    FROM confluentinc/cp-server-connect:7.0.1
    ADD​ my-connector-dir/confluentinc-kafka-connect-datagen /usr/share/java/confluentinc-kafka-connect-datagen
    USER 1001
    
    Copy
  2. 以下のコマンドを実行して、イメージをビルドしてプッシュします。

    docker build <dockerfile-dir> -t <someregistry>/<somerepository>:<sometag>
    
    docker push <someregistry>/<somerepository>:<sometag>
    
    Copy
  3. Docker イメージの詳細を上記のプロセスの出力から入手し、Connect の CR でリポジトリとタグを指定します。

    spec:
      image:
        application: <someregistry>/<somerepository>:<sometag>
    
    Copy

コネクターの証明書の指定

必要なコネクター TLS 証明書が Connect クラスターに存在しない場合は、以下の手順に従って、Connect クラスター内のコネクターで証明書を使用できるようにします。

  1. kubectl create secret コマンドを使用してシークレットを作成します。

  2. Connect CR で、次のようにシークレット名を指定します。

    spec:
      connectorTLSCerts:          --- [1]
      - directoryPathInContainer: --- [2]
        jksPassword:
          secretRef:              --- [3]
        secretRef:                --- [4]
    
    Copy
    • [1] コネクターで使用するために Connect ポッドに挿入されたコネクターの TLS 証明書参照のリスト。

    • [2] keystore.jkstruststore.jksjksPassword.txt の各キーがマウントされているコンテナー内のディレクトリパス。

    • [3] JKS パスワードとして参照されるシークレット名。jksPassword.txt=jksPassword=<user_provided_password> という形式のキーと値が想定されます。

      省略した場合、CFK はデフォルトパスワードの mystorepassword を使用します。詳細については、「Java キーストア形式の TLS キーおよび証明書の提供」を参照してください。

    • [4] コネクターの TLS 証明書を格納するシークレット名。

  3. コネクターの CR で、場所を指定します。

    例については、「コネクターの TLS 証明書」を参照してください。

マウントされたシークレットを使用した Connect 認証情報の指定

マウントされたシークレット を使用して、コネクター構成内のパスワードなどの機密データを保護できます。

  1. マウントされたシークレットの指定」の説明に従って、シークレットを作成します。以下に例を示します。

    kubectl create secret generic my-credential \
      --from-file=my-credential.txt=/my-dir/my-credential.txt
    
    Copy

    シークレット参照は、デフォルトパスの /mnt/secrets/<secret-name> にマウントされます。

  2. Connect CR で、上記のシークレット名を次のように指定します。

    spec:
      mountedSecrets:
        - secretRef:  # The name of the secret that contains the credentials.
    
    Copy
  3. コネクター CR で、シークレットの場所を変数として指定すると、コネクターの起動時にそれらの変数が CFK によって動的に解決されます。

    例については、「認証情報のマウントされたシークレット」を参照してください。