Kafka Connect の概念¶
Kafka Connect は、Apache Kafka® との間でデータのストリーミングを行うためのフレームワークです。Confluent Platform にはいくつかの 組み込みのコネクター が付属しており、リレーショナルデータベースや HDFS などの一般的に使用されているシステムとの間でデータのストリーミングを行うために使用することができます。Kafka Connect の内部的な仕組みについて説明するうえで役立つため、ここでは、いくつかの主要な概念について説明します。
- コネクター -- タスクを管理して、データストリーミングを調整する機能の高度な抽象化
- タスク -- Kafka からデータをコピーする方法または Kafka にデータをコピーする方法の実装
- ワーカー -- コネクターおよびタスクを実行するために実行されるプロセス
- コンバーター -- Connect とデータを送受信するシステムとの間でやり取りされるデータを変換するために使用されるコード
- 変換 -- コネクターで生成されたメッセージやコネクターに送信されたメッセージに変更を加えるシンプルなロジック
- デッドレターキュー -- Connect でコネクターのエラーを処理する方法
コネクター¶
Kafka Connect では、コネクターによってデータのコピー先とコピー元が決まります。コネクターインスタンス は、Kafka と他のシステムとの間のデータのコピー処理を管理するための論理ジョブです。コネクターを実装するクラスやコネクターによって使用されるクラスはすべて コネクタープラグイン で定義されます。コネクターインスタンスとコネクタープラグインのどちらも "コネクター" と呼ばれることがありますが、文脈からどちらを指しているかは判断できます(たとえば、"コネクターのインストール" はプラグインのことであり、"コネクターのステータスのチェック" ではコネクターインスタンスを指しています)。
既存のコネクター を使用することが推奨されています。ただし、新しいコネクタープラグインをゼロから作成することもできます。開発者が新しいコネクタープラグインを作成する場合の大まかなワークフローは次のとおりです。詳細については、「開発者ガイド」を参照してください。

タスク¶
タスクは、Connect のデータモデルにおける主役です。実際にデータをコピーする一連の タスク の調整を各コネクターインスタンスが行います。コネクターで 1 つのジョブを複数のタスクに分割できるため、Kafka Connect では、並列処理とスケーラブルなデータのコピーが組み込みでサポートされており、必要な構成作業もほとんどありません。これらのタスクでは、タスク自体に状態(ステート)は保存されません。タスクのステートは Kafka の特別なトピックである config.storage.topic
および status.storage.topic
に保存され、関連するコネクターによって管理されます。そのため、タスクはいつでも開始、終了、再開することができ、回復性と拡張性を備えたデータパイプラインを実現できます。

Connect ソースタスクから Kafka に渡されるデータの大まかな流れ。内部オフセットはタスク自体ではなく、Kafka またはディスク上に保存されることに注意してください。¶
タスクのバランス調整¶
コネクターが初めてクラスターに送信されると、ワーカーは、各ワーカーの負荷がほぼ等しくなるように、クラスター内のすべてのコネクターおよびそれらのタスクのバランス調整を行います。コネクターで必要なタスクの数に増減があった場合や、コネクターの構成に変更があった場合にも、同様のバランス調整の手順が使用されます。ワーカーに障害が発生した場合は、アクティブなワーカー間でタスクのバランス調整が行われます。タスクが失敗した場合は、バランス調整は行われません。タスクの失敗は例外的なケースと見なされるためです。そのため、失敗したタスクがフレームワークによって自動的に再開されることはありません。REST API を使用して再開する必要があります。

ワーカーの障害発生時にタスクのバランス調整がどのように行われるかを示す、タスクのフェイルオーバーの例¶
ワーカー¶
コネクターとタスクは論理作業単位であり、プロセスで実行されるようにスケジュールが設定される必要があります。Kafka Connect ではこれらのプロセスを ワーカー と呼び、ワーカーにはスタンドアロンと分散の 2 種類があります。
スタンドアロンワーカー¶
スタンドアロンモードは最もシンプルなモードで、すべてのコネクターおよびタスクの実行を 1 つのプロセスで行います。
プロセスが単一であるため、必要な構成が最小限で済みます。スタンドアロンモードは、初期段階や開発中、また、ホストからのログの収集など、1 つのプロセスで十分な場合に役立ちます。ただし、プロセスが 1 つだけであるため、機能も制限されます。拡張性が 1 つのプロセスに限定され、1 つのプロセスに対してモニタリングを行う以上のフォールトトレランスがありません。
分散ワーカー¶
分散モードは、Kafka Connect に拡張性と自動フォールトトレランスをもたらします。分散モードでは、同じ group.id
を使用して複数のワーカープロセスを開始します。ワーカープロセスは自動的に調整を行い、使用可能なすべてのワーカーでコネクターおよびタスクの実行のスケジューリングを行います。ワーカーを追加した場合、ワーカーをシャットダウンした場合、またはワーカーで予期せず障害が発生した場合、残りのワーカーによってその状態が検出され、自動的に調整が行われ、その時点で使用可能なワーカー間でコネクターおよびタスクが再分配されます。コンシューマーグループのバランス調整と似ていることに注意してください。内部的には、Connect ワーカーではコンシューマーグループを使用して調整やバランス調整が行われます。
重要
同じ group.id
を持つワーカーはすべて同じ Connect クラスターに属します。たとえば、worker-a で group.id=connect-cluster-a
と設定されていて、worker-b の group.id
も同じである場合、worker-a と worker-b は、connect-cluster-a
というクラスターを構成します。

3 ノードの Kafka Connect 分散モードクラスター。コネクター(タスクの再構成が必要な変更の有無に関する、ソースシステムやシンクシステムのモニタリング)とタスク(コネクターのデータのサブセットのコピー)については、アクティブなワーカー間で自動的にバランス調整が行われます。パーティションごとに各タスクが割り当てられており、複数のタスクの負荷が分散されている様子を示しています。¶
コンバーター¶
Kafka Connect のデプロイで、Kafka への書き込みや Kafka からの読み取りの際に特定のデータフォーマットをサポートするために、コンバーターが必要になります。タスクではコンバーターを使用して、データのフォーマットをバイト型から Connect の内部データフォーマットに、またはその逆に変換します。
デフォルトでは、Confluent Platform には以下のコンバーターが用意されています。
- AvroConverter
io.confluent.connect.avro.AvroConverter
: Schema Registry に使用 - ProtobufConverter
io.confluent.connect.protobuf.ProtobufConverter
: Schema Registry に使用 - JsonSchemaConverter
io.confluent.connect.json.JsonSchemaConverter
: Schema Registry に使用 - JsonConverter
org.apache.kafka.connect.json.JsonConverter
(スキーマレジストリ なし): 構造化データに使用 - StringConverter
org.apache.kafka.connect.storage.StringConverter
: シンプルな文字列フォーマット - ByteArrayConverter
org.apache.kafka.connect.converters.ByteArrayConverter
: 変換を行わない "パススルー" のオプションを提供
コンバーターはコネクター自体とは切り離されているため、さまざまなコネクターでコンバーターを無理なく再利用できます。たとえば、同じ Avro コンバーターを使用して、JDBC Source Connector による Kafka への Avro データの書き込みと、HDFS Sink Connector による Kafka からの Avro データの読み取りを実現できます。つまり、たとえば JDBC ソースから返された ResultSet
が Parquet ファイルとして HDFS に書き込まれる場合などにも、同じコンバーターを使用することができます。
次の図は、JDBC Source Connector を使用したデータベースからの読み取り、Kafka への書き込み、さらに HDFS Sink Connector を使用した HDFS への書き込みで、コンバーターがどのように使用されるかを示したものです。
コンバーターの詳細については、「キーコンバーターと値コンバーターの構成」を参照してください。コンバーターと スキーマレジストリ の動作の詳細については、「Kafka Connect と スキーマレジストリ の使用」を参照してください。
ちなみに
コンバーターの詳細については、「コンバーターとシリアル化について 」を参照してください。
変換¶
変換を使用するようコネクターを構成して、個々のメッセージに簡単で軽微な変更を加えることができます。これは、小規模なデータの調整やイベントのルーティングなどに役立つ場合があります。コネクターの構成で、複数の変換を連続して使用することもできます。ただし、複数のメッセージに複雑な変換や操作を適用する場合は、 ksqlDB 概要 や Kafka Streams の概要 で実装するのが適しています。
変換は、1 件のレコードを入力に取り、加工済みのレコードを出力として返すシンプルな関数です。Kafka Connect で用意されている変換はいずれもシンプルですが、よく使用される便利な変更を実行できます。独自のカスタムロジックを構築して 変換 のインターフェイスを実装し、Kafka Connect プラグイン としてパッケージ化して、任意のコネクターで使用することもできます。
ソースコネクターで変換が使用される場合、Kafka Connect がコネクターで生成された各ソースレコードを最初の変換に渡すと、変更が加えられ、新しいソースレコードが出力されます。この更新済みのソースレコードがチェーン内の次の変換に渡され、新しく変更が加えられたソースレコードが生成されます。このようにして残りの変換も実行されます。最終的に更新されたソースレコードが バイナリ形式に変換 され、Kafka に書き込まれます。
変換は、シンクコネクターで使用することもできます。Kafka Connect は Kafka からメッセージを読み取り、 バイナリ表現をシンクレコードに変換 します。変換がある場合、Kafka Connect がレコードを最初の変換に渡すと、変更が加えられ、新しい更新済みのシンクレコードが出力されます。この更新済みのシンクレコードがチェーン内の次の変換に渡され、新しいシンクレコードが生成されます。このようにして残りの変換も実行され、最終的に更新されたシンクレコードがシンクコネクターに渡されて、処理されます。
詳細については、「Confluent Platform における Single Message Transform」を参照してください。
変換 | 説明 |
---|---|
Cast | フィールドや、キーまたは値の全体を特定の型に変換(キャスト)します(整数のフィールドを狭範囲の整数のフィールドに変換するなど)。 |
Drop | レコードの 1 つのキーまたは値をドロップ(破棄)して null を設定します。 |
DropHeaders | 現在、マネージド型コネクターでは利用できません。 各レコードから 1 つ以上のヘッダーをドロップします。 |
ExtractField | スキーマが存在する場合は Struct から、スキーマのないデータの場合は Map から、指定されたフィールドを抽出します。null 値はすべてそのまま渡されます。 |
ExtractTopic | レコードトピックを、キーまたは値から派生された新しいトピックに置き換えます。 |
Filter(Apache Kafka) | すべてのレコードをドロップします。述語 と組み合わせて使用します。 |
Filter(Confluent) | 構成可能な filter.condition と一致するレコードを含めるか、ドロップします。 |
Flatten | ネストしたデータ構造をフラット化します。これにより、階層ごとにフィールド名が区切り文字で連結されて、それぞれのフィールド名が生成されます(区切り文字は変更することができます)。 |
GzipDecompress | 現在、マネージド型コネクターでは利用できません。 レコードヘッダーとしてリテラル値を挿入します。 |
Drop | 現在、マネージド型コネクターでは利用できません。 レコードヘッダーとしてリテラル値を挿入します。 |
HoistField | スキーマが存在する場合は Struct の、スキーマのないデータの場合は Map の、指定されたフィールド名を使用してデータをラップします。 |
InsertField | レコードメタデータの属性または構成された静的な値を使用してフィールドを挿入します。 |
InsertHeader | 現在、マネージド型コネクターでは利用できません。 レコードヘッダーとしてリテラル値を挿入します。 |
MaskField | 指定されたフィールドを、そのフィールドの型に対して有効な null 値でマスクします。 |
MessageTimeStampRouter | 元のトピックの値とレコードのタイムスタンプフィールドに応じて、レコードのトピックフィールドを更新します。 |
RegexRouter | 現在、マネージド型コネクターでは利用できません。 構成されている正規表現と置換文字列を使用して、レコードのトピックを更新します。 |
ReplaceField | フィールドにフィルターをかけるか、フィールドの名前を変更します。 |
SetSchemaMetadata | レコードのキーまたは値のスキーマに対して、スキーマ名とバージョンのいずれか、または両方を設定します。 |
TimestampConverter | Unix エポック、文字列、Connect の Date 型および Timestamp 型など、タイムスタンプのフォーマットを別のフォーマットに変換します。 |
TimestampRouter | 元のトピックの値とレコードのタイムスタンプに応じて、レコードのトピックフィールドを更新します。 |
TombstoneHandler | tombstone レコードを管理します。tombstone レコードとは、ValueSchema の有無に関係なく、値フィールド全体が null になっているレコードと定義されています。 |
TopicRegexRouter | マネージド型ソースコネクターのみで利用できます。 構成されている正規表現と置換文字列を使用して、レコードのトピックをアップデートします。 |
ValueToKey | レコードキーを、レコード値のフィールドのサブセットから形成された新しいキーに置き換えます。 |
デッドレターキュー¶
さまざまな理由から、無効なレコードが生じることがあります。1 例として、シンクコネクターの構成で Avro フォーマットが想定されているにもかかわらず、そのシンクコネクターに JSON フォーマットでシリアル化されたレコードが届いた場合が挙げられます。無効なレコードをシンクコネクターで処理できない場合、コネクターの構成プロパティ errors.tolerance
に基づいてエラーが処理されます。
デッドレターキューは、シンクコネクターにのみ適用されます。
注釈
デッドレターキューのトピックは、Confluent Cloud シンクコネクター用に自動生成されます。詳細については、「Confluent Cloud デッドレターキュー」を参照してください。
この構成プロパティの有効な値としては、none
(デフォルト)と all
の 2 つがあります。
errors.tolerance
が none
に設定されている場合、エラーが発生するか無効なレコードが見つかると、その時点でコネクタータスクは失敗となり、コネクターのステータスが Failed になります。この問題を解決するには、Kafka Connect ワーカーのログを確認して障害が発生した原因を特定し、問題を修正して、コネクターを再起動する必要があります。
errors.tolerance
が all
に設定されている場合、エラーや無効なレコードはすべて無視され、処理が続行されます。Connect ワーカーのログには、エラーは書き込まれません。失敗したレコードがあるかどうかを判断するには、 内部メトリクス を使用するか、ソースのレコード数を計算して、処理されたレコード数と比較する必要があります。
エラー処理機能を使用することができ、その場合、無効なレコードはすべて特別なトピックにルーティングされ、エラーがレポートされます。このトピックには、シンクコネクターで処理できなかったレコードの デッドレターキュー が含まれます。
デッドレターキューのトピックの作成¶
デッドレターキューを作成するには、以下の構成プロパティをシンクコネクターの構成に追加します。
errors.tolerance = all
errors.deadletterqueue.topic.name = <dead-letter-topic-name>
デッドレターキューが有効になっている、GCS のシンクコネクターの構成の例を以下に示します。
{
"name": "gcs-sink-01",
"config": {
"connector.class": "io.confluent.connect.gcs.GcsSinkConnector",
"tasks.max": "1",
"topics": "gcs_topic",
"gcs.bucket.name": "<my-gcs-bucket>",
"gcs.part.size": "5242880",
"flush.size": "3",
"storage.class": "io.confluent.connect.gcs.storage.GcsStorage",
"format.class": "io.confluent.connect.gcs.format.avro.AvroFormat",
"partitioner.class": "io.confluent.connect.storage.partitioner.DefaultPartitioner",
"value.converter": "io.confluent.connect.avro.AvroConverter",
"value.converter.schema.registry.url": "http://localhost:8081",
"schema.compatibility": "NONE",
"confluent.topic.bootstrap.servers": "localhost:9092",
"confluent.topic.replication.factor": "1",
"errors.tolerance": "all",
"errors.deadletterqueue.topic.name": "dlq-gcs-sink-01"
}
}
デッドレタートピックには失敗したレコードが含まれていますが、それでは、理由はわかりません。さらに次の構成プロパティを追加すると、失敗したレコードのヘッダー情報を含めることができます。
errors.deadletterqueue.context.headers.enable = true
このパラメーターが true
に設定されている場合(デフォルトでは false)、レコードのヘッダーがデッドレターキューに追加されます。これで、kcat (旧 kafkacat)ユーティリティ を使用してレコードのヘッダーを表示し、レコードが失敗した理由を確認することができます。また、エラーが Connect Reporter に送信されます。
注釈
元のレコードヘッダーとの競合を回避するため、デッドレターキューのコンテキストヘッダーキーは先頭が _connect.errors
になります。
先ほどの例で、ヘッダーを有効にした構成がこちらです。
{
"name": "gcs-sink-01",
"config": {
"connector.class": "io.confluent.connect.gcs.GcsSinkConnector",
"tasks.max": "1",
"topics": "gcs_topic",
"gcs.bucket.name": "<my-gcs-bucket>",
"gcs.part.size": "5242880",
"flush.size": "3",
"storage.class": "io.confluent.connect.gcs.storage.GcsStorage",
"format.class": "io.confluent.connect.gcs.format.avro.AvroFormat",
"partitioner.class": "io.confluent.connect.storage.partitioner.DefaultPartitioner",
"value.converter": "io.confluent.connect.avro.AvroConverter",
"value.converter.schema.registry.url": "http://localhost:8081",
"schema.compatibility": "NONE",
"confluent.topic.bootstrap.servers": "localhost:9092",
"confluent.topic.replication.factor": "1",
"errors.tolerance": "all",
"errors.deadletterqueue.topic.name": "dlq-gcs-sink-01",
"errors.deadletterqueue.context.headers.enable":true
}
}
参考
このトピックの詳細については、「Kafka Connect の詳説 – エラー処理とデッドレターキュー 」を参照してください。
セキュリティを有効にしたデッドレターキューの使用¶
セキュリティを有効にして Confluent Platform を使用すると、Confluent Platform 管理クライアント によってデッドレターキューのトピックが作成されます。無効なレコードは、まず、それらのレコードを送信するように構成されている内部のプロデューサーに渡されます。その後、管理クライアントによってデッドレターキューのトピックが作成されます。
セキュアな Confluent Platform 環境でデッドレターキューを使用するには、追加の管理クライアントの構成プロパティ(.admin
というプレフィックス付き)を Connect ワーカーの構成に追加する必要があります。追加の Connect ワーカーの構成プロパティを指定する SASL/PLAIN の例を以下に示します。
admin.ssl.endpoint.identification.algorithm=https
admin.sasl.mechanism=PLAIN
admin.security.protocol=SASL_SSL
admin.request.timeout.ms=20000
admin.retry.backoff.ms=500
admin.sasl.jaas.config=org.apache.kafka.common.security.plain.PlainLoginModule required \
username="<user>" \
password="<secret>";
参考
ロールベースアクセス制御(RBAC)環境における Connect ワーカー、シンクコネクター、デッドレターキュートピックの構成の詳細については、「 Kafka Connect と RBAC」を参照してください。