Kafka Connect の構成¶
Kafka Connect は、Kafka と他のデータシステムとの間でデータのストリーミングを行うためのツールです。これは、コネクターを使用して Kafka との間でデータのストリーミングを行います。Kafka Connect では、コネクターによってデータのコピー先とコピー元が決まります。
コネクタープラグインは、コネクターのクラスや抽象化を実装するバイナリまたは JAR です。コネクタープラグインは Connect ワーカーにインストールされます。
Connect を構成するときに、Connect をデプロイするためのコネクタープラグインのインストール方法を指定する必要があります。
Confluent for Kubernetes (CFK)でコネクタープラグインをインストールするには以下の方法があります。
コネクター情報を設定し、その他の必要な設定を行った後、kubectl apply
を使用して Connect をデプロイします。
Confluent for Kubernetes (CFK)2.1.0 以降、コネクターのカスタムリソース定義(CRD)を使用して、Kubernetes でコネクターを宣言的に管理できます。このトピックの説明に従ってコネクタープラグインをインストールした後に、「コネクターの管理」を参照してコネクターの管理について確認してください。
コネクタープラグインのインストール¶
コネクタープラグインの自動ダウンロードおよびインストール¶
CFK は、Confluent Hub またはカスタムのアーティファクトの場所の URL からコネクタープラグインまたは JAR を自動的にダウンロードし、インストールすることができます。
現在、CFK では、confluentHub
と url
の両方に設定した 2 つの異なるロケーションタイプ(locationType
)が指定された Connect ワーカーのデプロイをサポートしていません。
コネクタープラグインを保管するには、指定されたサイズのノードボリュームを使用可能な状態にしておく必要があります。デフォルトのサイズは 4 GB ですが、storageLimit
を使用して Connect CR で別のサイズを指定できます。
ダウンロード情報を Connect CR に次のように指定します。
Confluent Hub からダウンロードするには、次のように指定します。
spec:
build:
type: onDemand --- [1]
onDemand:
plugins:
locationType: confluentHub --- [2]
confluentHub: --- [3]
- name: --- [4]
owner: --- [5]
version: --- [6]
storageLimit: --- [7]
- [1] CFK でコネクタープラグインを自動的にダウンロードする場合に必須です。
- [2](必須)
locationType: confluentHub
に設定して、このプラグインを Confluent Hub からダウンロードします。 - [3]
locationType: confluentHub
が [4] で設定されている場合に必須。ダウンロードするプラグインの配列を指定します。 - [4](必須)このコネクタープラグインの名前。
- [5](必須)このプラグインを提供する個人または組織。たとえば、
confluentinc
などです。 - [6](必須)このプラグインのバージョン。このプラグインのバージョンまたは
latest
に設定します。 - [7](省略可)コネクタープラグインを保管するために使用できるノードボリュームの最大サイズ。デフォルト値は 4G です。
カスタム URL からダウンロードするには、次のように指定します。
spec:
build:
type: onDemand --- [1]
onDemand: --- [2]
plugins:
locationType: url --- [3]
url: --- [4]
- name: --- [5]
archivePath: --- [6]
checksum: --- [7]
storageLimit: --- [8]
- [1] CFK でコネクタープラグインを自動的にダウンロードする場合に必須です。
- [2]
type: onDemand
が [1] で設定されている場合に必須。 - [3](必須)``locationType: url`` に設定して、このプラグインをカスタムの場所からダウンロードします。
- [4]
locationType: url
が [4] で設定されている場合に必須。ダウンロード対象のプラグインの配列を指定します。 - [5](必須)このコネクタープラグインの名前。
- [6](必須)このプラグインを含む
zip
ファイルのアーカイブパス。 - [7](必須)プラグインのリモートファイルの sha512sum チェックサムを定義します。これは、ダウンロード後にリモートファイルを検証するために使用します。
- [8](省略可)コネクタープラグインを保管するために使用できるノードボリュームの最大サイズ。デフォルト値は 4G です。
注釈
FileStream コネクターを使用する場合などに、カスタムの plugin.path
プロパティを spec.configOverrides
に設定する場合、/mnt/plugins
を plugin.path
に含める必要があります。例を次に示します。
spec:
configOverrides:
server:
- plugin.path=/usr/share/java,/mnt/plugins
Connect の CR の例については、GitHub の CFK サンプルリポジトリ を参照してください。
Connect init コンテナーログの表示¶
コネクタープラグインのインストール時に発生した問題をトラブルシューティングするには、kubectl logs
コマンドを実行して、Connect ポッドの init コンテナーのログを表示します。以下に例を示します。
kubectl logs -f connect-0 -c config-init-container
コネクタープラグインを使用して Connect Docker イメージを拡張する¶
このセクションでは、コネクタープラグインを使用して Connect イメージを拡張する方法について説明します。
以下の Connect イメージのいずれかに新しいコネクターを追加します。
Confluent Platform 6.2.x 以降の場合は、
cp-server-connect
イメージを使用します。このドキュメントの以降の部分では、このイメージを使用します。
Confluent Platform 6.1.x 以前の場合、
cp-server-connect-operator
イメージを使用します。
イメージには、Connect およびそのすべての依存関係が含まれます。コネクターの JAR は一切含まれていません。
Connect イメージに新しいコネクターを追加するには、新しいコネクターがインストールされた新しい Docker イメージをビルドする必要があります。
1 つまたは複数のコネクターを
cp-server-connect
イメージに追加するには、<dockerfile-dir>
にDockerfile
を作成します。以下のいずれかを行います。
- Confluent Hub からコネクターをプルします。
- Docker ビルドを実行しているマシンにダウンロードしたコネクター JAR を使用します。
Confluent Hub からコネクターをプルする場合
以下のように
Dockerfile
を作成します。FROM confluentinc/cp-server-connect:<Confluent Platform release> USER root RUN confluent-hub install --no-prompt <connector1>:<connector1-version> \ && confluent-hub install --no-prompt <connector2>:<connector2-version> \ && ... USER 1001
この
Dockerfile
の例では、Docker イメージの作成に Confluent Hub の data-gen コネクターを使用しています。FROM confluentinc/cp-server-connect:7.0.1 USER root RUN confluent-hub install --no-prompt confluentinc/kafka-connect-datagen:0.3.3 USER 1001
Docker ビルドを実行しているマシンにダウンロードしたコネクター JAR を使用する場合
以下のように
Dockerfile
を作成します。FROM confluentinc/cp-server-connect:<Confluent Platform release> ADD <local-connector1-path> /usr/share/java/<connector1> \ && <local-connector2-path> /usr/share/java/<connector2> \ && ... USER 1001
この例の
Dockerfile
では、ローカルマシンの<connector-dir>
ディレクトリにある data-gen コネクターを使用します。FROM confluentinc/cp-server-connect:7.0.1 ADD my-connector-dir/confluentinc-kafka-connect-datagen /usr/share/java/confluentinc-kafka-connect-datagen USER 1001
以下のコマンドを実行して、イメージをビルドしてプッシュします。
docker build <dockerfile-dir> -t <someregistry>/<somerepository>:<sometag> docker push <someregistry>/<somerepository>:<sometag>
Docker イメージの詳細を上記のプロセスの出力から入手し、Connect の CR でリポジトリとタグを指定します。
spec: image: application: <someregistry>/<somerepository>:<sometag>
コネクターの証明書の指定¶
必要なコネクター TLS 証明書が Connect クラスターに存在しない場合は、以下の手順に従って、Connect クラスター内のコネクターで証明書を使用できるようにします。
kubectl create secret
コマンドを使用してシークレットを作成します。Connect CR で、次のようにシークレット名を指定します。
spec: connectorTLSCerts: --- [1] - directoryPathInContainer: --- [2] jksPassword: secretRef: --- [3] secretRef: --- [4]
[1] コネクターで使用するために Connect ポッドに挿入されたコネクターの TLS 証明書参照のリスト。
[2]
keystore.jks
、truststore.jks
、jksPassword.txt
の各キーがマウントされているコンテナー内のディレクトリパス。[3] JKS パスワードとして参照されるシークレット名。
jksPassword.txt=jksPassword=<user_provided_password>
という形式のキーと値が想定されます。省略した場合、CFK はデフォルトパスワードの
mystorepassword
を使用します。詳細については、「Java キーストア形式の TLS キーおよび証明書の提供」を参照してください。[4] コネクターの TLS 証明書を格納するシークレット名。
コネクターの CR で、場所を指定します。
例については、「コネクターの TLS 証明書」を参照してください。
マウントされたシークレットを使用した Connect 認証情報の指定¶
マウントされたシークレット を使用して、コネクター構成内のパスワードなどの機密データを保護できます。
「マウントされたシークレットの指定」の説明に従って、シークレットを作成します。以下に例を示します。
kubectl create secret generic my-credential \ --from-file=my-credential.txt=/my-dir/my-credential.txt
シークレット参照は、デフォルトパスの
/mnt/secrets/<secret-name>
にマウントされます。Connect CR で、上記のシークレット名を次のように指定します。
spec: mountedSecrets: - secretRef: # The name of the secret that contains the credentials.
コネクター CR で、シークレットの場所を変数として指定すると、コネクターの起動時にそれらの変数が CFK によって動的に解決されます。
例については、「認証情報のマウントされたシークレット」を参照してください。