Kafka Connect の使用方法 - スタートガイド

This page includes the following topics and will help you in getting started with Kafka Connect. Confluent recommends you read and understand Kafka Connect Concepts before proceeding. Kafka Connect 101 is also a free course you can check out before moving ahead.

デプロイに関する考慮事項

To get started with Kafka Connect, you must have a set of Kafka brokers. The Kafka brokers can be an earlier broker version, or the latest version. For more details, see Cross-Component Compatibility.

In addition to Kafka brokers, there are a few deployment options to consider as well. Understanding and acting on these deployment options ensures your Kafka Connect deployment will scale and support the long-term needs of your data pipeline.

Confluent Schema Registry

Schema Registry は Kafka Connect の必須のサービスではありませんが、Schema Registry により、コネクターが読み書きする Kafka レコードの共通データフォーマットとして Avro、Protobuf、および JSON スキーマを簡単に使用できるようになります。これにより、カスタムコードを記述する必要性を最小限に抑え、柔軟なフォーマットでデータを標準化することができます。また、スキーマの進化および互換性ルールの適用により、さらなる利点が得られます。詳細については、「Kafka Connect と Schema Registry の使用」および「キーコンバーターと値コンバーターの構成」を参照してください。

スタンドアロンと分散モード

コネクターとタスクは論理作業単位であり、プロセスとして実行されます。Kafka Connect では、このプロセスを ワーカー と呼びます。ワーカーの実行モードには、"スタンドアロンモード" と "分散モード" の 2 つがあります。はじめに、ご使用の環境に最適なモードを確認しておく必要があります。

  • スタンドアロンモード は、ローカルマシンで Kafka Connect の開発やテストを行う場合に便利です。これは、通常、単一エージェントを使用する環境(たとえば、ウェブサーバーのログを Kafka に送信する場合)にも使用できます。

  • Distributed mode: Runs Connect workers on multiple machines (nodes), which form a Connect cluster. Kafka Connect distributes running connectors across the cluster. You can add or remove nodes as your needs evolve.

    This mode is also more fault tolerant. For example, if a node unexpectedly leaves the cluster, Kafka Connect distributes the work of that node to other nodes in the cluster. And, because Kafka Connect stores connector configurations, status, and offset information inside the Kafka cluster where it is safely replicated, losing the node where a Connect worker runs does not result in any loss of data.

    重要

    分散モードには拡張性、高可用性、および管理面での利点があるため、本稼働環境には分散モードが推奨されています。

Try to identify which mode works best for your environment before getting started. For more information about the different deployment modes for Kafka Connect workers check out this video.

運用環境

Connect ワーカーは、コンテナー、および Kubernetes、Apache Mesos、Docker Swarm、Yarn などの管理環境で適切に動作します。分散ワーカーではすべてのステートが Kafka に保管されるため、クラスターの管理が容易になります。また、Kafka Connect はワーカーの再起動やスケーリングを自動的に処理するようには設計されていません。これは、既存のクラスター管理ソリューションを透過的に使用し続けることができることを意味します。スタンドアロンワーカーのステートは、ローカルファイルシステムに保管されることに注意してください。

参考

Kafka Connect ワーカーは、十分なリソースがある共有マシン上で実行することができる JVM プロセスです。Connect ワーカーのハードウェア要件は、標準の Java プロデューサーおよびコンシューマーの要件と同様です。リソースの要件は主に、ワーカーで実行されるコネクターのタイプによって異なります。送信されるメッセージのサイズが大きい環境では、必要なメモリー容量も大きくなります。また、多数のメッセージがバッファに格納され、集約されてから外部システムに書き込まれる環境でも、より多くのメモリーが必要になります。圧縮を継続的に使用するには、より強力な CPU が必要です。

1 台のマシンで複数のワーカーを同時に実行する場合は、リソースの制限(CPU とメモリー)を知っておく必要があります。デフォルトのヒープサイズの設定で開始し、 内部メトリクスのモニタリング とシステムのモニタリングを行います。負荷に対して十分な CPU、メモリー、およびネットワーク(10GbE 以上)が割り当てられていることを確認してください。

Connect プラグインのインストール

Kafka Connect は拡張可能な設計であるため、開発者はカスタムのコネクターや変換、コンバーターを作成することができ、ユーザーはそれらをインストールして実行することができます。

Defining plugins

Kafka Connect プラグインは、1 つ以上のコネクター、変換、またはコンバーターの実装を含む JAR ファイルのセットです。Connect では各プラグインが互いに分離されるため、1 つのプラグインのライブラリが他のプラグインのライブラリから影響を受けることはありません。これは、複数のプロバイダーのコネクターを組み合わせて使用する場合に非常に重要なポイントです。

注意

一般的には、Connect のデプロイには、多数のプラグインがインストールされます。各プラグインで 1 つのバージョン だけがインストールされるようにしてください。

Kafka Connect プラグインには、次のいずれかを使用できます。

  • プラグインに必要なすべての JAR ファイルとサードパーティの依存関係が含まれている、ファイルシステム上の ディレクトリ。これが最も一般的であり、推奨されます。
  • プラグインのクラスファイルとサードパーティの依存関係がすべて含まれている、単一の uber JAR

Kafka Connect では、plugin.path ワーカーの構成プロパティ でディレクトリパスのコンマ区切りリスト形式で指定された "プラグインパス" を使用して、プラグインが検索されます。plugin.path ワーカーの構成プロパティの例を以下に示します。

plugin.path=/usr/local/share/kafka/plugins

Connect プラグインのインストール

プラグインをインストールするには、プラグインパスのリストに既に含まれているディレクトリに、プラグインディレクトリまたは uber JAR(またはこのどちらかに解決されるシンボリックリンク)を配置します。または、プラグインパスを更新して、プラグインが含まれているディレクトリの絶対パスを追加することもできます。上記のプラグインパスの例では、Connect が実行されている 各マシン上/usr/local/share/kafka/plugins ディレクトリを作成し、そのディレクトリにプラグインディレクトリ(または uber JAR)を配置しています。

Connect ワーカーを起動すると、各ワーカーにより、プラグインパス上のディレクトリ内にあるすべてのコネクター、変換、およびコンバーターのプラグインが検出されます。コネクター、変換、またはコンバーターを使用する場合、Connect ワーカーにより、最初にそれぞれのプラグインからクラスが読み込まれ、その後 Kafka Connect ランタイムと Java ライブラリが読み込まれます。Connect では、他のプラグインに含まれているライブラリはすべて明示的に回避されます。これにより競合が防止され、さまざまなプロバイダーが開発したコネクターや変換の追加および使用が非常に簡単になります。

To find the components that fit your needs, check out the Confluent Hub page–it has an ecosystem of connectors, transforms, and converters. From Confluent Hub, you can easily install the components into your local Confluent Platform environment. For Confluent Hub Client installation instructions, see Confluent Hub Client. For a full list of supported connectors, see Supported Connectors.

Kafka Connect の以前のバージョンでコネクター、変換、およびコンバーターをインストールする場合は、別のアプローチが必要でした。Connect を実行するすべてのスクリプトで、環境変数 CLASSPATH が認識されていました。ユーザーはこの変数をエクスポートして、コネクターの JAR ファイルへのパスのリストを指定していました。以前の CLASSPATH エクスポート変数のメカニズムの例を以下に示します。

export CLASSPATH=/path/to/my/connectors/*
bin/connect-standalone standalone.properties new-custom-connector.properties

CLASSPATH のエクスポートは、推奨されていません。このメカニズムを使用してプラグインへのパスを作成すると、ライブラリの競合が発生し、Kafka Connect とコネクターが失敗する可能性があります。plugin.path 構成プロパティを使用してください。このプロパティを使用すると、各プラグインが他のプラグインやライブラリから適切に分離されます。

注釈

As described in Installing Connect Plugins, connector plugin JAR files are placed in the plugin path (Connect worker property: plugin.path). However, a few connectors may require that you additionally export the CLASSPATH to the plugin JAR files when starting the connector (export CLASSPATH=<path-to-jar-files>). While not recommended, CLASSPATH is required for these connectors because Kafka Connect uses classloading isolation to distinguish between system classes and regular classes, and some plugins load system classes (for example, javax.naming and others in the package javax). An example error message showing this issue is provided below. If you see an error that resembles the example below, in addition to adding the plugin path, you must also export CLASSPATH=<path-to-jar-files> when starting the connector.

Caused by: javax.naming.NoInitialContextException:
Cannot instantiate class: com.tibco.tibjms.naming.TibjmsInitialContextFactory
[Root exception is java.lang.ClassNotFoundException: com.tibco.tibjms.naming.TibjmsInitialContextFactory]

ちなみに

デフォルトで、コネクターは、Kafka トピックで使用されているパーティショナーを継承します。コネクターのカスタムパーティショナーを作成することもできます。カスタムパーティショナーは、コネクターの /lib フォルダーに配置する必要があります。

You can also put partitioners in a common location of choice. If you choose this option, you must add a symlink to the location from each connector’s /lib folder. For example, you would place a custom partitioner in the path share/confluent-hub-components/partitioners and then add the symlink share/confluent-hub-components/kafka-connect-s3/lib/partitioners -> ../../partitioners.

ワーカーの構成と実行

以下のセクションでは、スタンドアロンモードまたは分散モードでのワーカーの実行について説明します。ワーカーの構成プロパティのリストについては、「Kafka Connect ワーカーの構成プロパティ」を参照してください。

このビデオ では、Kafka Connect ワーカーのさまざまなデプロイモードを説明しています。

スタンドアロンモード

スタンドアロンモードは通常、開発やテスト、軽量な単一エージェント環境(たとえば、ウェブサーバーログの Kafka への送信)に使用されます。スタンドアロンモードでワーカーを起動するコマンドの例を以下に示します。

bin/connect-standalone worker.properties connector1.properties [connector2.properties connector3.properties ...]

最初のパラメーター(worker.properties)は、 ワーカーの構成プロパティファイル です。worker.properties は、ファイル名の 1 つの例であることに注意してください。ワーカーの構成ファイルには、任意の有効なファイル名を使用できます。このファイルにより、使用する Kafka クラスターやシリアル化のフォーマットなどの設定を制御することができます。スタンドアロンモードで AvroSchema Registry を使用するサンプル構成ファイルを参照するには、etc/schema-registry/connect-avro-standalone.properties にあるファイルを開きます。このファイルをコピーして変更し、スタンドアロンワーカーのプロパティファイルとして使用することができます。

2 番目のパラメーター(connector1.properties)は、コネクター構成プロパティファイルです。すべてのコネクターには、ワーカーで読み込まれる構成プロパティがあります。例に示されているように、このコマンドを使用して複数のコネクターを起動することができます。

同じホストマシン上で複数のスタンドアロンワーカーを実行する場合は、以下の 2 つの構成プロパティを各ワーカーで一意にする必要があります。

  • offset.storage.file.filename: コネクターオフセット用のストレージファイル名。スタンドアロンモードでは、このファイルはローカルファイルシステムに保管されます。2 つのワーカーで同じファイル名を使用すると、オフセットデータの削除や、別の値による上書きが生じる可能性があります。

  • listeners: REST API がリッスンする URI のリスト。形式は protocol://host:port,protocol2://host2:port (protocol は HTTP または HTTPS)。ホスト名に 0.0.0.0 を指定してすべてのインターフェイスにバインドすることも、ホスト名を空のままにしてデフォルトインターフェイスにバインドすることもできます。

    注釈

    You update the etc/schema-registry/connect-avro-standalone.properties file if you need to apply a change to Connect when starting Confluent Platform services using the Confluent CLI.

分散モード

分散モードには、ワーカー構成ファイルの読み込み以外の追加のコマンドラインパラメーターはありません。新しいワーカーは、新しいグループを開始するか、group.id が一致する既存のグループに参加します。その後、ワーカーはコンシューマーグループと調整し、行うべき作業を分散させます。

分散モードでワーカーを起動するコマンドの例を以下に示します。

bin/connect-distributed worker.properties

Avro と Schema Registry を使用する分散モードの構成ファイルの例を参照するには、etc/schema-registry/connect-avro-distributed.properties にあるファイルを開きます。このファイルのコピーを作成して変更を加え、新しい worker.properties ファイルとして使用することができます。worker.properties は、ファイル名の 1 つの例であることに注意してください。プロパティファイルには、任意の有効なファイル名を使用できます。

スタンドアロンモードでは、コネクター構成プロパティファイルがコマンドラインパラメーターとして追加されます。一方、分散モードでは、コネクターは REST API リクエストを使用してデプロイされ、管理されます。コネクターを作成するには、ワーカーを起動してから、コネクターを作成する REST リクエストを実行します。REST リクエストの例については、多くの サポートされているコネクター のドキュメントに記載されています。たとえば、Azure Blob Storage Source Connector の REST ベースの例 では、1 つの例が紹介されています。

開発とテストのために複数の分散ワーカーを 1 台のホストマシン上で実行する場合、listeners 構成プロパティは各ワーカーで一意でなければなりません。これは、HTTP リクエストを受け取るために REST インターフェイスがリッスンするポートです。

注釈

You update the etc/schema-registry/connect-avro-distributed.properties file if you need to apply a change to Connect when starting Confluent Platform services using the Confluent CLI.

Connect の内部トピック

Connect では、コネクターとタスクの構成、オフセット、およびステータスがいくつかの Kafka トピックに保管されます。それらは、Kafka Connect の内部トピックと呼ばれます。この内部トピックには、高いレプリケーション係数、圧縮クリーンアップポリシー、および適切な数のパーティションを持たせることが重要です。

Kafka Connect では、起動時に内部トピックが自動的に作成されます。Connect の ワーカーの構成プロパティ を使用して、それらのトピックの名前、レプリケーション係数、およびパーティションの数を指定できます。Connect では、プロパティが要件を満たしていることが検証され、すべてのトピックが圧縮クリーンアップポリシーが有効な状態で作成されます。

複数の分散ワーカーに同じ group.id 値が構成されている場合は、自動的に互いを検出し、1 つの Kafka Connect クラスターを形成します。クラスター内のワーカーはすべて、コネクター構成、オフセットデータ、およびステータスのアップデートを共有するために同じ 3 つの内部トピックを使用します。そのため、同じ Connect クラスター内の すべての分散ワーカーの構成で、config.storage.topicoffset.storage.topic、および status.storage.topic プロパティが一致している必要があります。

分散ワーカーの構成では、3 つの必須の内部トピック名に加え、以下のプロパティにも同じ値を設定する必要があります。この設定により、クラスター内のどのワーカーでも、不足している内部トピックが、適切なプロパティ値を使用して作成されます。これらの構成プロパティには、現実的なデフォルト値 が設定されていることに注意してください。

  • config.storage.replication.factor
  • offset.storage.replication.factor
  • offset.storage.partitions
  • status.storage.replication.factor
  • status.storage.partitions

注釈

Confluent Platform バージョン 6.0 以降の Kafka Connect では、Kafka ブローカーの default.replication.factor 値と num.partitions 値を使用して、内部トピックを作成できるようになりました。詳細については、「Kafka ブローカーのデフォルトトピック設定の使用」を参照してください。

各分散ワーカーの起動時に既に Kafka の内部トピックが存在している場合は、そのトピックが使用されます。存在しない場合、ワーカーにより、ワーカーの構成プロパティを使用してトピックの作成が試行されます。これにより、トピック固有の設定が必要な場合や、トピックを作成するために必要な権限が Kafka Connect にない場合に、Kafka Connect を起動する前に手動でトピックを作成することができます。トピックを手動で作成する場合は、必ず 構成プロパティ のリストに記載されているガイドラインに従ってください。

既存の Connect クラスターから独立した分散ワーカーを作成する必要がある場合は、新しいワーカー構成プロパティを作成する必要があります。以下の構成プロパティには、既存のクラスターで使用されているワーカー構成と異なるものを設定する必要があります。

  • group.id
  • config.storage.topic
  • offset.storage.topic
  • status.storage.topic

Connect クラスターでは、グループ ID や内部トピックを共有することができません。ただ group.id を変更するだけでは、既存の Connect クラスターに属さない新しいワーカーを作成することはできません。新しい group.id には、一意の内部トピックを関連付ける必要があります。そのためには、新しい group.id に一意の config.storage.topicoffset.storage.topic、および status.storage.topic 構成プロパティを設定する必要があります。

また、コンシューマーグループはコネクター名に基づいて作成されるため、既存の Connect クラスターで使用されているコネクター名とは異なるコネクター名を使用する必要があります。Connect クラスター内の各コネクターは、同じコンシューマーグループを共有します。

Kafka ブローカーのデフォルトトピック設定の使用

Connect ワーカーでは、レプリケーション係数とパーティションの数に Kafka ブローカーのデフォルトを使用して内部トピックを作成することができます。Kafka ブローカーのデフォルトをレプリケーション係数とパーティションの数に使用するには、内部トピックのワーカーの構成プロパティに -1 を使用します。スニペットの例を以下に示します。

# Use the broker default properties
config.storage.replication.factor=-1
offset.storage.replication.factor=-1
status.storage.replication.factor=-1
offset.storage.partitions=-1
status.storage.partitions=-1

config.storage 内部トピックには、常に厳密に 1 つのパーティションを指定する必要があります。そのため、このプロパティには、Kafka ブローカーのデフォルトを使用することができません。

トピック固有のプロパティのオーバーライド

内部トピックが作成される Kafka ブローカーのバージョンに対して有効な トピックプロパティ をオーバーライドすることができます。以下の例は、ワーカーの構成でこれらのプロパティを入力する方法を示しています。

config.storage.<topic-specific-property>
offset.storage.<topic-specific-property>
status.storage.<topic-specific-property>

以下の例では、レプリケーション係数に Kafka ブローカーのデフォルトのプロパティを使用し、デフォルトの同期レプリカの最小数(min.insync.replicas)プロパティをオーバーライドしています(1 が Kafka ブローカーのデフォルトです)。

# Use the broker default properties
config.storage.replication.factor=-1
offset.storage.replication.factor=-1
status.storage.replication.factor=-1
# Override the broker default properties
config.storage.min.insync.replicas=3
offset.storage.min.insync.replicas=3
status.storage.min.insync.replicas=3

Kafka ブローカーのバージョンでトピックプロパティが有効でない場合、Connect ワーカーは起動時に失敗します。

内部トピックの手動作成

Kafka Connect でこれらの内部トピックの自動作成を許可することをお勧めします。ただし、手動でトピックを作成することもできます。これらのトピックを手動で作成する場合として、2 つの例を以下に示します。

  • セキュリティ上の理由で、Connect などのクライアントによる Kafka トピックの作成を許可しないようにブローカーが構成されている場合があります。
  • Connect によって自動では設定されない、または自動的に作成される設定とは異なる、他の高度なトピック固有の設定が必要になる場合があります。

以下のコマンド例は、Connect を起動する前に、圧縮およびレプリケートされる Kafka トピックを手動で作成する方法を示しています。パラメーターを入力する際には、 分散ワーカー のガイドラインに従ってください。

# config.storage.topic=connect-configs
bin/kafka-topics --create --bootstrap-server localhost:9092 --topic connect-configs --replication-factor 3 --partitions 1 --config cleanup.policy=compact
# offset.storage.topic=connect-offsets
bin/kafka-topics --create --bootstrap-server localhost:9092 --topic connect-offsets --replication-factor 3 --partitions 50 --config cleanup.policy=compact
# status.storage.topic=connect-status
bin/kafka-topics --create --bootstrap-server localhost:9092 --topic connect-status --replication-factor 3 --partitions 10 --config cleanup.policy=compact

Connect クラスター内のすべてのワーカーで、同じ内部トピックが使用されます。クラスターが異なるワーカーは、それぞれ異なる内部トピックを使用する必要があります。詳細については、「分散ワーカーの構成」を参照してください。

ワーカーの構成プロパティファイル

使用するモードに関係なく、Kafka Connect ワーカーは、ワーカーの構成プロパティファイルを最初のパラメーターとして渡すことによって構成されます。以下に例を示します。

bin/connect-distributed worker.properties

Confluent Platform には、作業を開始するうえで役立つワーカーの構成プロパティファイルのサンプルが含まれています。Avro のサンプルファイルの場所は、次のとおりです。

  • etc/schema-registry/connect-avro-distributed.properties
  • etc/schema-registry/connect-avro-standalone.properties

これらのファイルのいずれかを開始点として使用します。これらのファイルには、Schema Registry と統合された Avro コンバーターを使用するために必要な構成プロパティが含まれています。これらは、ローカルで実行されている Kafka サービスおよび Schema Registry サービスと適切に連動するように構成されています。複数のブローカーを実行する必要がないため、Kafka Connect をローカルで簡単にテストすることができます。

サンプルの構成ファイルを本稼働環境のデプロイ用に編集することもできます。それには、Kafka および Schema Registry 用の正しいホスト名を使用し、内部トピックのレプリケーション係数に許容可能な値(またはデフォルト値)を使用します。

ワーカーの構成プロパティのリストについては、「Kafka Connect ワーカーの構成プロパティ」を参照してください。

キーコンバーターと値コンバーターの構成

Schema Registry では、以下のコンバーターは使用されません。

  • AvroConverter io.confluent.connect.avro.AvroConverter: use with Schema Registry
  • ProtobufConverter io.confluent.connect.protobuf.ProtobufConverter: use with Schema Registry
  • JsonSchemaConverter io.confluent.connect.json.JsonSchemaConverter: use with Schema Registry
  • JsonConverter org.apache.kafka.connect.json.JsonConverter (Schema Registry なし): 構造化データに使用
  • StringConverter org.apache.kafka.connect.storage.StringConverter: シンプルな文字列フォーマット
  • ByteArrayConverter org.apache.kafka.connect.converters.ByteArrayConverter: 変換を行わない "パススルー" のオプションを提供

Kafka Connect 101 コース では、コンバーターについて詳しく説明しています。

key.converter プロパティと value.converter プロパティには、使用する コンバーター のタイプを指定します。すべてのコネクターのデフォルトコンバーターは、 ワーカーの構成 で指定します。ただし、どのコネクターでも、キー、値、およびヘッダーのコンバーターを完全に定義することで、デフォルトのコンバーターをオーバーライドすることができます。ほとんどのコネクターで使用できるデフォルトのキー、値、およびヘッダーのコンバーターをワーカーで定義し、別のコンバーターが必要なコネクターがある場合には、そのコネクターの構成を定義することをお勧めします。ワーカーで定義されているデフォルトの header.converter は、StringConverter を使用してヘッダーの値を文字列としてシリアル化し、ヘッダーの値を最も適切な数値、ブール値、配列、またはマップ表現に逆シリアル化します。スキーマはシリアル化されませんが、可能な場合は逆シリアル化するときに推測されます。

コネクターの構成にコンバーターが追加されていない場合、 ワーカーの構成 にあるコンバーターの構成プロパティが、ワーカー上で実行されるすべてのコネクターで使用されます。

コネクターの構成にコンバーターを追加した場合、ワーカーの構成にあるコンバータータイプのプレフィックス(key.converter.*value.converter.*)が付加されたコンバータープロパティはすべて使用されません。コネクターの構成にコンバーターを追加する場合は注意してください。たとえば、ワーカーの構成に以下のような値コンバータープロパティが存在するとします。

value.converter=io.confluent.connect.avro.AvroConverter
value.converter.schema.registry.url=http://localhost:8081

このとき、コネクターの構成に以下のプロパティを追加します。

{
 "value.converter": "AvroConverter",
 "value.converter.basic.auth.credentials.source": "USER_INFO",
 "value.converter.basic.auth.user.info": "<username>:<password>"
}

この場合、必要な Schema Registry URL プロパティ value.converter.schema.registry.url=http://localhost:8081 がコンバーターに指定されていないため、コネクターの起動時にエラーが発生します。

以下のセクションでは、コンバーターの説明と例を示します。これらのコンバーターの Schema Registry での動作については、「Kafka Connect と Schema Registry の使用」を参照してください。

Avro

Schema Registry と一緒に AvroConverter を使用するには、ワーカーの構成で key.converter プロパティと value.converter プロパティを指定します。Schema Registry の URL を指定するコンバータープロパティも追加する必要があります。以下の例は、構成に追加される AvroConverter のキープロパティと値プロパティを示しています。

key.converter=io.confluent.connect.avro.AvroConverter
key.converter.schema.registry.url=http://localhost:8081
value.converter=io.confluent.connect.avro.AvroConverter
value.converter.schema.registry.url=http://localhost:8081

Avro のキーコンバーターと値コンバーターは、相互に独立して使用できます。たとえば、キーには StringConverter を使用し、値には AvroConverter または JsonConverter を使用することができます。独立しているキーと値のプロパティの例を次に示します。

key.converter=org.apache.kafka.connect.storage.StringConverter
value.converter=io.confluent.connect.avro.AvroConverter
value.converter.schema.registry.url=http://localhost:8081

JSON スキーマと Protobuf

JSON スキーマのコンバーターと Protobuf のコンバーターはどちらも、Avro コンバーターと同じように実装されています。以下に、値コンバーターに ProtobufConverter または JsonSchemaConverter、キーコンバーターに StringConverter を使用した構成例をいくつか示します。

key.converter=org.apache.kafka.connect.storage.StringConverter
value.converter=io.confluent.connect.protobuf.ProtobufConverter
value.converter.schema.registry.url=http://localhost:8081
key.converter=org.apache.kafka.connect.storage.StringConverter
value.converter=io.confluent.connect.json.JsonSchemaConverter
value.converter.schema.registry.url=http://localhost:8081

Avro と JSON スキーマのどちらもスキーマを JSON で表現しており、認識されていないプロパティが検出された場合にも寛容です。そのため、コンバーターはカスタム JSON プロパティを使用して、Avro および JSON スキーマには相当するものがない Kafka Connect スキーマオブジェクトをキャプチャすることができます。

一方、Protobuf には JSON と異なる独自のインターフェイス定義言語(IDL)があり、カスタムのアドホックなプロパティは許容されません。そのため、Protobuf には直接相当するものがない場合に Kafka Connect スキーマから Protobuf に変換すると、データの損失や不整合が生じる可能性があります。

たとえば、Kafka Connect のスキーマでは、int8、int16、および int32 のデータ型がサポートされています。Protobuf では、int32 および int64 がサポートされています。Connect のデータが Protobuf に変換されると、int8 と int16 のフィールドは int32 または int64 に対応付けられ、ソースが int8 または int16 であったことは示されません。

JSON スキーマでは、数値型と整数型のフィールドのみがサポートされています。ただし、JSON スキーマコンバーター( JsonSchemaConverter)は、JSON スキーマにはそれに相当するものがないデータを connect.type という名前のプロパティに保管します。このプロパティは JSON スキーマパーサーでは無視されるため、フィールドはダウンストリームのコンポーネントによって適切な型に復元されます。

エンコーディングの詳細については、『JSON encoding for Avro 』および『JSON encoding for Protobuf 』を参照してください。さらに、JSON スキーマでは、allOf、anyOf、および oneOf という 3 つの方法でスキーマを結合することができます。しかし、JSON スキーマコンバーターでは oneOf のみがサポートされており、Avro コンバーターにおける union の処理方法および Protobuf コンバーターにおける oneof の処理方法と同様に処理されます。

ロールベースアクセス制御(RBAC)が構成された環境で Avro、Protobuf、または JSON スキーマのコンバーターを構成する場合は、 RBAC を使用する場合のキーコンバーターと値コンバーターの構成 を参照してください。

Schema Registry では、以下のコンバーターは使用されません。

JSON(Schema Registry を使用しない)

Connect データに対して Schema Registry を使わずに JSON を使用する必要がある場合は、Kafka でサポートされている JsonConverter を使用することができます。以下の例は、構成に追加される JsonConverter のキープロパティと値プロパティを示しています。

key.converter=org.apache.kafka.connect.json.JsonConverter
value.converter=org.apache.kafka.connect.json.JsonConverter
key.converter.schemas.enable=false
value.converter.schemas.enable=false

プロパティ key.converter.schemas.enable および value.converter.schemas.enabletrue に設定されている場合、キーまたは値はプレーンな JSON として扱われず、内部スキーマとデータの両方を含む複合 JSON オブジェクトとして扱われます。それらをソースコネクターで有効にすると、スキーマとデータの両方が複合 JSON オブジェクトに含められます。それらをシンクコネクターで有効にすると、スキーマとデータが複合 JSON オブジェクトから抽出されます。この実装では、Schema Registry が使用されないことに注意してください。

プロパティ key.converter.schemas.enable および value.converter.schemas.enablefalse に設定されている場合(デフォルト)、データだけが渡され、スキーマは渡されません。これにより、スキーマを必要としないアプリケーションのペイロードのオーバーヘッドが減少します。

文字列フォーマットと未加工バイト

org.apache.kafka.connect.storage.StringConverter は、内部 Connect フォーマットをシンプルな文字列フォーマットに変換するために使用されます。Connect のデータをバイトに変換する場合、スキーマは無視され、データはシンプルな文字列に変換されます。バイトを Connect データフォーマットに変換する場合、コンバーターはオプションの文字列スキーマと文字列(または null)を返します。

org.apache.kafka.connect.converters.ByteArrayConverter では、データが変換されません。バイトは変換されずにそのままコネクターを通過します。

コンバーターの詳細については、「コンバーターとシリアル化について」を参照してください。

Connect プロデューサーおよびコンシューマー

Kafka Connect では、内部的に Java 標準のプロデューサーとコンシューマーを使用して Kafka と通信します。Connect によって、これらのプロデューサーおよびコンシューマーのインスタンスのデフォルト設定が構成されます。これらの設定には、データが失われることなく順番に Kafka に送信されることを保証するプロパティが含まれています。

Connect プロデューサーのデフォルトのプロパティ

デフォルトでは、Connect により、以下の重要なプロパティを使用してソースコネクターの Kafka プロデューサーが構成されます。

  • プロデューサーのブートストラップサーバーで、Connect クラスターで使用されているのと同じ Kafka クラスターを参照します。
  • コネクターのキーおよび値のコンバーターで動作するキーおよび値のシリアライザーを構成します。
  • パターン connector-producer-<connectorName>-<taskId> を使用して、コネクターとタスクに基づいてプロデューサーの client.id を生成します。
  • acks=all を設定して、生成されたメッセージがすべての同期レプリカ(ISR)に適切に書き込まれるようにします。
  • 再試行可能な例外については、無限の再試行でデータが重複する可能性を低減するため、Connect により以下のプロパティでプロデューサーが構成されます。
    • request.timeout.ms=<max>
    • max.block.ms=<max>
    • max.in.flight.requests.per.connection=1
    • delivery.timeout.ms=<max>

ワーカーの構成で producer.* プロパティを使用するか、コネクターの構成で producer.override.* プロパティを使用することでこれらのデフォルトのプロパティをオーバーライドできますが、これらのデフォルトのプロパティを変更すると、Connect によるデリバリーの保証に影響が出る可能性があります。

プロデューサーとコンシューマーのオーバーライド

前のセクションで説明した以外のデフォルト設定をオーバーライドする必要が生じる場合があります。そのような必要が生じる場合の例を 2 つ、以下に示します。

ワーカーのオーバーライドの例

ログファイルのコネクターを実行するスタンドアロンプロセスを考えてみましょう。ログ収集では、低レイテンシでベストエフォート型のデリバリーが適している場合があります。これは、クライアントでのデータのバッファリングを避けるため、接続の問題があっても、小規模なデータの損失をアプリケーションが許容できる場合です。これにより、ログ収集を可能な限り軽量化しておくことができます。

ワーカーによって制御されるすべてのコネクターで、 プロデューサーの構成プロパティコンシューマーの構成プロパティ をオーバーライドするには、以下の例に示すようにワーカーの構成プロパティに producer. または consumer. をプレフィックスとして付加します。

producer.retries=1
consumer.max.partition.fetch.bytes=10485760

上記の例では、メッセージの送信を一度だけ再試行するように、デフォルトのプロデューサーの retries プロパティをオーバーライドしています。コンシューマーのオーバーライドでは、1 回のリクエストでパーティションからフェッチするデフォルトのデータ量を 10 MB に増やしています。

コネクターごとのオーバーライドの例

デフォルトでは、コネクターで使用されるプロデューサーとコンシューマーは、Connect 自体の内部トピックに使用されるプロパティと同じプロパティを使用して作成されます。そのため、同じ Kafka プリンシパルで、すべての内部トピック、およびコネクターで使用されるすべてのトピックの読み書きが可能である必要があります。

コネクターで使用されるプロデューサーとコンシューマーで、異なる Kafka プリンシパルを使用することもできます。コネクターの構成では、プロデューサーとコンシューマーの作成に使用されたワーカーのプロパティをオーバーライドすることができます。これらのプロパティには、producer.override. および consumer.override. のプレフィックスを付加します。コネクターごとのオーバーライドの詳細については、「ワーカーの構成のオーバーライド」を参照してください。

プロデューサーとコンシューマーの詳細については、「Kafka プロデューサー」と「Kafka コンシューマー」を参照してください。構成プロパティのリストについては、「プロデューサーの構成プロパティ」と「コンシューマーの構成プロパティ」を参照してください。

ソースコネクターのトピックの自動作成の構成

Beginning with Confluent Platform version 6.0, Kafka Connect can create topics for source connectors if the topics do not exist on the Apache Kafka® broker. To use auto topic creation for source connectors, you must set the Connect worker property to true for all workers in the Connect cluster. In addition, you must create supporting properties in each source connector configuration.

重要

  • この機能は、シンクコネクターやその構成には影響しません。シンクコネクターに追加されたトピック作成プロパティは無視され、ログに警告が生成されます。
  • 不足しているトピックがソースコネクターで作成されないようにするには、Connect ワーカーで topic.creation.enable=false と設定して、この機能を無効にする必要があります。

ワーカープロパティ

ソースコネクターのトピックの自動作成を有効または無効にするには、次のワーカープロパティを使用します。

topic.creation.enable

デフォルトは true です。この機能は、関連する ソースコネクターのプロパティ がソースコネクターの構成に含まれている場合にのみ有効になります。

  • 型: ブール値
  • デフォルト: true
  • 重要度: 低

ソースコネクターのプロパティ

ソースコネクタープロパティの中には、ワーカープロパティ topic.creation.enable と関連付けられているものがあります。それらは、デフォルトのレプリケーション係数、パーティションの数、トピックが存在しない場合にトピックを作成するために Kafka Connect で使用されるその他のトピック固有の設定を設定するためのプロパティです。これらのプロパティにはいずれもデフォルト値がありません。

このとき、コネクターの構成に以下のプロパティを追加します。

  • topic.creation.$alias.replication.factor
  • topic.creation.$alias.partitions

ソースコネクターのトピックの自動作成機能は、ワーカーの構成で機能が有効になっており、かつ、ソースコネクターの構成で必須の レプリケーション係数 と 1 つのグループの パーティションの数 が指定されている場合にのみ有効になります。ユーザーは、レプリケーション係数またはパーティションの数を -1 に設定することで、Kafka ブローカーで指定されているデフォルト値を使用することができます。

You can define more connector properties using configuration property groups. Configuration property groups are added using the property topic.creation.groups. The hierarchy of groups is built on top of a single foundational group called the default configuration property group. The default group always exists and does not need to be listed in the topic.creation.groups property in the connector configuration. Including default in topic.creation.groups results in a warning.

The following source connector configuration properties are used in association with the topic.creation.enable=true worker property. For example properties, see 構成の例.

注釈

構成プロパティには、Java regex として定義されている正規表現(regex)を使用できます。

topic.creation.groups

一致するトピックにグループ別のトピック構成を定義するために使用するグループ別名のリスト default グループは常に存在し、すべてのトピックに一致します。

  • 型: String 型のリスト
  • デフォルト: 空
  • 指定可能な値: このプロパティの値には、任意の追加グループを指定できます。トピックの構成には常に default グループが定義されています。
topic.creation.$alias.replication.factor

コネクターで作成する新規トピックのレプリケーション係数。この値は、Kafka クラスターのブローカーの数を超えてはなりません。この値が Kafka ブローカーの数よりも大きい場合、コネクターがトピックの作成を試行するとエラーが発生します。default グループの場合、これは 必須のプロパティ です。topic.creation.groups で定義されている他のグループの場合、このプロパティは省略可能です。他のグループでは、Kafka ブローカーのデフォルト値が使用されます。

  • 型: int
  • デフォルト: なし
  • 指定可能な値 : 具体的な有効値を指定する場合は >= 1 で指定し、Kafka ブローカーのデフォルト値を使用する場合は -1 を指定します。
topic.creation.$alias.partitions

このコネクターによって作成されたトピックのパーティションの数。default グループの場合、これは 必須のプロパティ です。topic.creation.groups で定義されている他のグループの場合、このプロパティは任意です。他のグループは、Kafka ブローカーのデフォルト値を使用します。

  • 型: int
  • デフォルト: なし
  • 指定可能な値 : 具体的な有効値を指定する場合は >= 1 で指定し、Kafka ブローカーのデフォルト値を使用する場合は -1 を指定します。
topic.creation.$alias.include

トピック名に一致する正規表現を表す文字列のリスト。このリストは、一致する値を持つトピックを対象に含め、このグループの特定の構成を一致するトピックに適用するために使用します。topic.creation.groups で定義されているすべてのグループに、$alias が適用されます。default グループには、このプロパティは適用されません。

  • 型: String 型のリスト
  • デフォルト: 空
  • 指定可能な値: 正確なトピック名または正規表現のコンマ区切りのリスト。
topic.creation.$alias.exclude

トピック名に一致する正規表現を表す文字列のリスト。このリストは、一致する値を持つトピックを、グループの特定の構成の適用から除外するために使用します。topic.creation.groups で定義されているすべてのグループに、$alias が適用されます。default グループには、このプロパティは適用されません。トピックの除外ルールは、すべての包含ルールをオーバーライドすることに注意してください。

  • 型: String 型のリスト
  • デフォルト: 空
  • 指定可能な値: 正確なトピック名または正規表現のコンマ区切りのリスト。
topic.creation.$alias.${kafkaTopicSpecificConfigName}

Any of the topic-level configurations for the version of the Kafka broker where the records will be written. The broker's topic-level configuration value is used if the configuration is not specified for the rule. $alias applies to the default group as well as any group defined in topic.creation.groups.

  • 型: プロパティ値
  • デフォルト : Kafka ブローカー値

構成の例

以下の構成スニペットの例は、Connect ワーカーの構成で topic.creation.enable が有効になっている場合に、ソースコネクターの構成プロパティの入力がどのようになるかを示しています。

Example 1

例 1: Connect によって作成されるすべての新しいトピックには、レプリケーション係数 3 と、5 つのパーティションが設定されています。default は唯一のグループであるため、config topic.creation.groups は使用されません。

...omitted

topic.creation.default.replication.factor=3
topic.creation.default.partitions=5

...omitted

Example 2

例 2: Connect によって作成される新しいトピックには、レプリケーション係数 3 と、5 つのパーティションが設定されています。この構成設定の例外は inorder グループの包含リストに一致するトピックで、1 つのパーティションが設定されています。

...omitted

topic.creation.groups=inorder
topic.creation.default.replication.factor=3
topic.creation.default.partitions=5

topic.creation.inorder.include=status, orders.*
topic.creation.inorder.partitions=1

...omitted

Example 3

例 3: Connect によって作成される新しいトピックには、レプリケーション係数 3 と、5 つのパーティションが設定されています。プレフィックス configurations で始まる key_value_topic トピックと another.compacted.topic トピックは圧縮され、レプリケーション係数は 5、パーティションは 1 つとなります。

...omitted

topic.creation.groups=compacted
topic.creation.default.replication.factor=3
topic.creation.default.partitions=5

topic.creation.compacted.include=key_value_topic, another.compacted.topic, configurations.*
topic.creation.compacted.replication.factor=5
topic.creation.compacted.partitions=1
topic.creation.compacted.cleanup.policy=compact

...omitted

Example 4

例 4: Connect によって作成される新しいトピックには、レプリケーション係数 3 と、5 つのパーティションが設定されています。プレフィックス configurations で始まるトピックは圧縮されます。highly_parallel の包含リストに一致し、その除外リストに一致しないトピックは、レプリケーション係数が 1、パーティションが 1 つとなります。

...omitted

topic.creation.groups=compacted, highly_parallel
topic.creation.default.replication.factor=3
topic.creation.default.partitions=5

topic.creation.highly_parallel.include=hpc.*,parallel.*
topic.creation.highly_parallel.exclude=.*internal, .*metadata, .*config.*
topic.creation.highly_parallel.replication.factor=1
topic.creation.highly_parallel.partitions=100

topic.creation.compacted.include=configurations.*
topic.creation.compacted.cleanup.policy=compact

...omitted

セキュリティ構成の例

セキュリティが構成されている場合、Connect ワーカーには、トピックの情報表示(DESCRIBE)および作成(CREATE)を行えるように構成されているプリンシパルが必要です。このプリンシパルは、ワーカー内のすべてのコネクターに継承されます。Connect ワーカー構成で指定されているものとは 異なるセキュリティ設定 が必要な場合は、次の例に示すように、producer.overrride プロパティをソースコネクター構成に追加してセキュリティ認証情報を提供することができます。

...omitted

producer.override.sasl.jaas.config=org.apache.kafka.common.security.plain.PlainLoginModule required \
    username="alice" \
    password="alice-secret";

...omitted
...omitted

producer.override.sasl.jaas.config=org.apache.kafka.common.security.plain.PlainLoginModule required \
    username="alice" \
    password="alice-secret";

admin.override.sasl.jaas.config=org.apache.kafka.common.security.plain.PlainLoginModule required \
    username="bob" \
    password="bob-secret";

...omitted

topic.creation.enable=true が構成されており、ワーカーのプロパティでもコネクターのオーバーライドでも新しいトピックの作成が許可されていない場合は、エラーがログに記録され、タスクは失敗します。

Connect Reporter

Kafka Connect Reporter は、シンク操作の結果をレポータートピックに送信します。レコードのシンクに成功した後、またはエラー条件に従って、結果レポートを提出するために Connect Reporter が呼び出されます。このレポートは、シンクイベントに関する追加情報とともに、元のレコードの処理方法に関する詳細が含められるように構成されています。それらのレコードは、消費できるように、構成可能な成功トピックおよびエラートピックに書き込まれます。シンクコネクター構成に追加されている基本的な Connect Reporter 構成プロパティの例を以下に示します。

reporter.bootstrap.servers=localhost:9092
reporter.result.topic.name=success-responses
reporter.result.topic.replication.factor=1
reporter.error.topic.name=error-responses
reporter.error.topic.replication.factor=1

To completely disable Connect Reporter, see Disabling Connect Reporter.

環境がセキュアな場合は、管理クライアントとプロデューサーの両方に対して構成ブロックを使用します。プロデューサーは、レポータートピックにレコードを送信するように構築されています。管理クライアントは、トピックを作成します。認証情報は、セキュアな環境で追加する必要があります。管理プロパティとプロデューサープロパティの例を以下に示します。

reporter.admin.bootstrap.servers=localhost:9092
reporter.admin.sasl.jaas.config=org.apache.kafka.common.security.plain.PlainLoginModule \
required username="<username>" password="<password>";
reporter.admin.security.protocol=SASL_SSL
reporter.admin.sasl.mechanism=PLAIN

reporter.producer.bootstrap.servers=localhost:9092
reporter.producer.sasl.jaas.config=org.apache.kafka.common.security.plain.PlainLoginModule \
required username="<username>" password="<password>";
reporter.producer.security.protocol=SASL_SSL
reporter.producer.sasl.mechanism=PLAIN

Additional Reporter configuration property examples are provided in each applicable Kafka Connect sink connector document. For an example, see the Reporter properties in the HTTP Sink connector.

Reporter と Kerberos のセキュリティ

The following configuration example shows a sink connector with all the necessary configuration properties for Reporter and Kerberos security. This example shows the Prometheus Metrics Sink connector, but can be modified for any applicable sink connector.

{

  "name" : "prometheus-connector",
  "config" : {
    "topics":"prediction-metrics",
    "connector.class" : "io.confluent.connect.prometheus.PrometheusMetricsSinkConnector",
    "tasks.max" : "1",
    "confluent.topic.bootstrap.servers":"localhost:9092",
    "confluent.topic.ssl.truststore.location":"/etc/pki/hadoop/kafkalab.jks",
    "confluent.topic.ssl.truststore.password":"xxxx",
    "confluent.topic.ssl.keystore.location":"/etc/pki/hadoop/kafkalab.jks",
    "confluent.topic.ssl.keystore.password":"xxxx",
    "confluent.topic.ssl.key.password":"xxxx",
    "confluent.topic.security.protocol":"SASL_SSL",
    "confluent.topic.replication.factor": "3",
    "confluent.topic.sasl.kerberos.service.name":"kafka",
    "confluent.topic.sasl.jaas.config":"com.sun.security.auth.module.Krb5LoginModule required \nuseKeyTab=true \nstoreKey=true \nkeyTab=\"/etc/security/keytabs/svc.kfkconnect.lab.keytab\" \nprincipal=\"svc.kfkconnect.lab@DS.DTVENG.NET\";",
    "prometheus.scrape.url": "http://localhost:8889/metrics",
    "value.converter": "org.apache.kafka.connect.json.JsonConverter",
    "value.converter.schemas.enable": "false",
    "behavior.on.error": "LOG",
    "reporter.result.topic.replication.factor": "3",
    "reporter.error.topic.replication.factor": "3",
    "reporter.bootstrap.servers":"localhost:9092",
    "reporter.producer.ssl.truststore.location":"/etc/pki/hadoop/kafkalab.jks",
    "reporter.producer.ssl.truststore.password":"xxxx",
    "reporter.producer.ssl.keystore.location":"/etc/pki/hadoop/kafkalab.jks",
    "reporter.producer.ssl.keystore.password":"xxxx",
    "reporter.producer.ssl.key.password":"xxxx",
    "reporter.producer.security.protocol":"SASL_SSL",
    "reporter.producer.sasl.kerberos.service.name":"kafka",
    "reporter.producer.sasl.jaas.config":"com.sun.security.auth.module.Krb5LoginModule required \nuseKeyTab=true \nstoreKey=true \nkeyTab=\"/etc/security/keytabs/svc.kfkconnect.lab.keytab\" \nprincipal=\"svc.kfkconnect.lab@DS.DTVENG.NET\";",
    "reporter.admin.ssl.truststore.location":"/etc/pki/hadoop/kafkalab.jks",
    "reporter.admin.ssl.truststore.password":"xxxx",
    "reporter.admin.ssl.keystore.location":"/etc/pki/hadoop/kafkalab.jks",
    "reporter.admin.ssl.keystore.password":"xxxx",
    "reporter.admin.ssl.key.password":"xxxx",
    "reporter.admin.security.protocol":"SASL_SSL",
    "reporter.admin.sasl.kerberos.service.name":"kafka",
    "reporter.admin.sasl.jaas.config":"com.sun.security.auth.module.Krb5LoginModule required \nuseKeyTab=true \nstoreKey=true \nkeyTab=\"/etc/security/keytabs/svc.kfkconnect.lab.keytab\" \nprincipal=\"svc.kfkconnect.lab@DS.DTVENG.NET\";",
    "confluent.license":"eyJ0eXAiOiJK ...omitted"

  }

Connect Reporter の無効化

Connect Reporter を無効にするには、reporter.error.topic.name 構成プロパティと reporter.result.topic.name 構成プロパティに空の文字列を設定します。

注釈

reporter.bootstrap.servers プロパティは必須ではなく、デフォルト値は空です。しかし、これらのトピック名の構成プロパティは、デフォルトで空ではないため、reporter.bootstrap.servers が必須になります。

  • reporter.error.topic.name = ${connector}-error
  • reporter.result.topic.name = ${connector}-success

Metrics Reporter のメタデータ

Confluent Platform 6.0 のリリースに伴い、Kafka Connect では、コネクターがメトリクスのコンテキストメタデータを組み込みのメトリクスクライアントライブラリに渡すための構成プレフィックスが利用可能になりました。Connect ワーカーは、インスタンス化されたすべてのコネクターにコンテキストの構成を渡します。これらの ワーカーの構成プロパティ に関する情報を以下に示します。

metrics.context.<key>=<value>

構成されたキー/値ペアは、Connect ワーカー内のコネクターによって、MetricsContext インターフェイスを使用して、構成された Confluent Metrics Reporter に渡されます。たとえば、metrics.context.foo.bar=baz と設定して Connect ワーカーを構成すると、MetricsContext メタデータの Metrics Reporter の値 baz にマップされたフィールド foo.bar が追加されます。

Connect ワーカーは以下のフィールドを MetricsContext に渡すことができます。

  • connect.kafka.cluster.id 。Apache Kafka® をサポートしているクラスターを示します。
  • connect.group.id 。Connect ワーカーの調整に使用されるグループ ID を示します。connect.group.id は、 分散モード でのみ有効です。詳細については、「Connect の内部トピック」を参照してください。

ConfigProvider インターフェイス

The ConfigProvider class interface allows you to use variables in your worker configuration that are dynamically resolved upon startup. It also allows you to use variables in your connector configurations that are dynamically resolved when the connector is (re)started. You can use variables within configuration property values in place of secrets, or in place of any information that should be resolved dynamically at runtime.

注釈

コネクターの構成は、Connect REST API の変数で保持され、共有されます。コネクターの起動時にのみ、メモリー内の変数の一時的な解決や置き換えが行われます。シークレットは、コネクター構成、ログ、または REST API のリクエストや応答の中には 保持されません

Connect ワーカーは、変数の解決に、ワーカー構成で定義されている名前付きの ConfigProviders に依存します。各変数では、使用するべき ConfigProvider の名前と、ConfigProvider が変数を置換文字列に解決するために使用する情報を指定します。

All ConfigProvider implementations are discovered using the standard Java ServiceLoader mechanism. To create a custom implementation of ConfigProvider, implement the ConfigProvider interface. Package the implementation class(es) and a file named META-INF/services/org.apache.kafka.common.config.provider.ConfigProvider containing the fully qualified name of the ConfigProvider implementation class into a JAR file. Note that the JAR file can use third-party libraries other than those provided by the Connect framework, but they must be installed with the JAR file as described below.

ConfigProvider のカスタム実装をインストールするには、JAR ファイルを含む新しいサブディレクトリを Connect の plugin.path にあるディレクトリに追加し、Connect ワーカーを(再)起動します。Connect ワーカーが起動すると、ワーカー構成で指定された ConfigProvider のすべての実装がインスタンス化されます。config.providers.[provider].param. のプレフィックスが付加されているプロパティはすべて、ConfigProviderconfigure() メソッドに渡されます。Connect ワーカーがシャットダウンされると、ConfigProviderclose() メソッドが呼び出されます。

重要

Connect クラスター内のすべてのワーカーは、ワーカー構成内のすべての変数と、すべてのコネクター構成で使用されるすべての変数を解決できなければなりません。そのためには、以下の要件を満たしている必要があります。

  • Connect クラスター内のすべてのワーカーに、同じ一連の名前付き構成プロバイダーが必要です。
  • 各ワーカーの各プロバイダーには、ワーカー構成またはコネクター構成で使用される変数を解決するために必要なリソースへのアクセス権限が付与されている必要があります。

これを実現するため、分散ワーカーの構成には以下の構成プロパティが追加されています。

  • config.providers: プロバイダーの名前のコンマ区切りリスト。
  • config.providers.{name}.class: プロバイダーの Java クラス名。
  • config.providers.{name}.param.{param-name}: 初期化時に上記の Java クラスに渡すパラメーター。

FileConfigProvider

Kafka は、変数の参照を各ワーカーのローカルファイルの値に置き換えることができる、FileConfigProvider と呼ばれる ConfigProvider の実装を提供しています。たとえば、構成プロパティにシークレットを設定するのではなく、ローカルファイルにシークレットを保存して、コネクター構成の変数を使用することができます。コネクターが起動されると、Connect は、ファイル構成プロバイダーを使用して変数を解決し、実際のシークレットに置き換えます。このようにすることで、コネクターの構成が Connect REST API で保持され、共有される場合に、コネクターの構成にシークレットを含めないようにすることができます。

重要

Connect クラスター内の すべてのワーカー は、構成プロバイダーを参照するすべての変数によって参照されるファイルにアクセスできる必要があります。

Variables that use FileConfigProvider should be in the form ${provider:[path:]key}. The path is the fully-qualified path of the property file on a Connect worker; the key is the name of the key within that property file. Note that the FileConfigProvider supports reading any file, where the path (and property key in that file) is specified in each variable. When Connect resolves one of these variables, it will read the properties file, extract the value for the corresponding key, and replace the whole variable with that value.

For example:

The following shows a JDBC connector configuration with properties that you do not want exposed:

connection.url=jdbc:oracle:thin:@myhost:1521:orcl
connection.user=scott
connection.password=<my-secret-password>

Using FileConfigProvider, you store these secrets in a separate file accessible to each Connect worker. For this example, the separate file you create is named /opt/connect-secrets.properties. Example properties you include in this file are shown below. Note the additional properties for another connector (other-connector).

connector-url=jdbc:oracle:thin:@myhost:1521:orcl
connector-username=<username>
connector-password=<jdbc-password>
other-connector-url=jdbc:oracle:thin:@myhost:1521:orcl
other-connector-username=<username>
other-connector-password=<other-password>

Then, you add the properties config.providers=file and FileConfigProvider to all Connect workers with connectors that require these secrets. The following example worker configuration shows these entries:

# Additional properties added to the worker configuration

config.providers=file
config.providers.file.class=org.apache.kafka.common.config.provider.FileConfigProvider

You can now add variables in the JDBC connector configuration that point to /opt/connect-secrets.properties. For example:

# Additional properties added to the connector configuration

connection.url=${file:/opt/connect-secrets.properties:connector-url}
connection.user=${file:/opt/connect-secrets.properties:connector-username}
connection.password=${file:/opt/connect-secrets.properties:connector-password}

You can also add variables to the other connector configuration that point to the same /opt/connect-secrets.properties file.

# Additional properties added to another connector configuration

connection.url=${file:/opt/connect-secrets.properties:other-connector-url}
connection.user=${file:/opt/connect-secrets.properties:other-connector-username}
connection.password=${file:/opt/connect-secrets.properties:other-connector-password}

InternalSecretConfigProvider

Confluent Platform provides another implementation of ConfigProvider named InternalSecretConfigProvider which is used with the Connect Secret Registry. The Secret Registry is a secret serving layer that enables Connect to store encrypted Connect credentials in a topic exposed through a REST API. This eliminates any unencrypted credentials being located in the actual connector configuration. The following example shows how InternalSecretConfigProvider is configured in the worker configuration file:

### Secret Provider

config.providers=secret
config.providers.secret.class=io.confluent.connect.secretregistry.rbac.config.provider.InternalSecretConfigProvider

config.providers.secret.param.master.encryption.key=<encryption key>
config.providers.secret.param.kafkastore.bootstrap.servers=SASL_PLAINTEXT://<Kafka broker URLs>
config.providers.secret.param.kafkastore.security.protocol=SASL_PLAINTEXT
config.providers.secret.param.kafkastore.sasl.mechanism=OAUTHBEARER
config.providers.secret.param.kafkastore.sasl.login.callback.handler.class=io.confluent.kafka.clients.plugins.auth.token.TokenUserLoginCallbackHandler
config.providers.secret.param.kafkastore.sasl.jaas.config=org.apache.kafka.common.security.oauthbearer.OAuthBearerLoginModule required \
  username="<service-principal-username>" \
  password="<service-principal-password>" \
  metadataServerUrls="<metadata server URLs>";

Kafka Connect のシャットダウン

Confluent Platform スタンドアロンモードで Connect を実行している場合は、以下のローカル Confluent CLI コマンドを使用できます。

confluent local services connect stop

詳細については、「confluent local services connect stop」を参照してください。

Confluent CLI を使用していない場合や、Connect を分散モードで実行している場合は、以下の手順を使用できます。

  1. Connect ノードのいずれかでターミナルセッションを開始します。

  2. 実行中の Connect プロセスを検索します。例を次に示します。

    ps aux | grep ConnectDistributed
    
  3. 出力で PID を特定します。

    <node-hostname> <worker-pid>   0.2  2.1  8414400 351944 s003  S    12:42PM   2:52.62 /path/to/your/bin/java ...omitted... org.apache.kafka.connect.cli.ConnectDistributed /path/to/your/connect.properties
    
  4. プロセスを停止します。例を次に示します。

    kill <worker-pid>
    
  5. 残りのすべての |kconnect|ノードで、この実行中のプロセスを停止します。

重要

プロセスを停止するために kill -9 を使用しないでください。

次のステップ

デプロイを開始した後については、以下の追加の Kafka Connect のドキュメントをご覧ください。

ちなみに

オンプレミスの Kafka Connect、Confluent Cloud、および Confluent for Kubernetes に対応した、エンドツーエンドのデモ をお試しください。