Kafka Connect のアーキテクチャ

システム間でデータをコピーするという Kafka Connect の目的を達成するために、さまざまなフレームワークの取り組みが進められてきました。現在でも活発に開発や保守が行われているものが多数あります。このセクションでは、Kafka Connect の目的、どのような設計に適しているか、独自の機能、設計方針について説明します。

目的

既にさまざまな選択肢が存在するにもかかわらず、フレームワークが開発されるのはなぜでしょうか。多大な労力をかけて構築された、さまざまなシステムに対応するコネクターがあるなら、それを再利用すべきではないでしょうか。

結局のところ、こうしたソリューションの多くは、ストリームデータプラットフォーム との最適な連携ができません。ストリームデータプラットフォームでは、イベントベースのストリーミングデータが共通言語となり、Apache Kafka® が共通の手段として、あらゆるデータのハブの役割を担います。他のシステムのデータの送信先、データの抽出元となる集約型のハブがあれば、そのハブ(Kafka)と他のそれぞれのシステムとの間の個々の接続を最適化できれば理想的なツールとなります。

既存のフレームワークがこのようなユースケースに適していない理由を確認するために、想定されているユースケースと機能に基づいて、いくつかのカテゴリーに分類してみます。

  1. ログおよびメトリックの収集、処理、集約

    例: FlumeLogstashFluentdHeka

    これらのシステムの目的は、アプリケーションサーバーおよびインフラストラクチャサーバーからの大量のログやメトリックデータを収集し、処理するというニーズに対応することです。これらのシステムに共通するのは、各ノードで "エージェント" を使用する設計です。エージェントでログデータを収集し、障害に備えてバッファに蓄積してから、送信先ストレージシステムに転送するか、集約エージェントに転送してそこでさらに処理を行ってから転送します。ソースフォーマットのデータを、送信先に適したフォーマットにするため、これらのシステムには、イベントのデコード、フィルター処理、エンコードを行うフレームワークが用意されています。

    このモデルは、必然的に非常に多くのホストにデータが分散していて、各ホストで実行されているエージェントからでないとデータにアクセスできない環境で、最初のログ収集を行うのに、非常に適しています。ただし、他のさまざまなユースケースには拡張しづらくなっています。たとえば、これらのシステムでは、HDFS などのバッチシステムとの連携はうまく処理されません。各イベントの処理は適切に行われ、ほとんどのエラー処理はユーザーに任されることを想定して、システムが設計されているためです。

    また、これらのシステムは、大規模なデータパイプラインでは運用が複雑になります。いずれにしても、ログを収集するには、サーバーごとにエージェントが必要になります。ただし、Hadoop のようなシステムに対するデータのコピー処理をスケールアウトするには、多数のサーバーにまたがる多数の独立したエージェントプロセスを手動で管理し、処理の分配も手動で行う必要があります。また、新しいタスクを追加するには、標準化されたストレージレイヤーがないため、アップストリームタスクの再構成も必要になる場合があります。

  2. データウェアハウスの ETL

    例: GobblinChukwaSuroMorphlinesHIHO

    これらのシステムは、種類の異なるさまざまなシステムとデータウェアハウス(一般には HDFS など)の間のギャップを埋めることを目的とするものです。これらのシステムでは、データウェアハウスに着目しているため、共通のパターンがあります。最も明確なのは、主にバッチジョブが中心になっていることです。システムによっては、このようなバッチの規模を非常に小さくすることができますが、ストリーム処理アプリケーションで求められる低レイテンシを実現できるようには設計されていません。この設計は、データウェアハウスへのデータの読み込みには適していますが、ストリームデータプラットフォームで必要とされるさまざまなデータレプリケーションジョブへの拡張はできません。

    もう 1 つの共通する機能は、柔軟性のあるプラグ可能なデータ処理パイプラインです。データパイプラインの早い段階で処理を実行できない場合、これは、データウェアハウス向けの ETL においては必須です。データを HDFS に格納する前に、長期的な保存、クエリ、分析に適した形態にデータを変換する必要があります。しかし、それにより、ツールは使用と実装の両面で非常に複雑になり、ユーザーは既に使い慣れた他の既存のツールを使用するのではなく、ETL フレームワークでデータを処理する方法を学習する必要に迫られます。

    最後に、ユースケースが非常に特化しているため、これらのシステムは一般に、単一のシンク(HDFS)または非常に似通った少数のシンク(HDFS と S3 など)のみに対応します。この場合も、特定のアプリケーションドメインが決まっていれば、これは合理的な設計上のトレードオフですが、他のタイプのデータコピージョブについては、システムの利用が制限されます。

  3. データパイプライン管理

    例: NiFi

    これらのシステムは、可能な限り簡単にデータパイプラインを構築することを目指すものです。2 つのシステム間でデータをコピーする個々のジョブの構成と実行に注目するのではなく、オペレーターがパイプライン全体を確認できるようにし、GUI を提供することで使いやすさを高めています。これらのシステムのコア部分では、同じ基本的なコンポーネント(個々のコピータスク、データソースとシンク、中間のキューなど)を必要としますが、これらのシステムのデフォルトビューは、パイプライン全体です。

    これらのシステムではデータパイプライン全体を 1 つのまとまりとして管理するため、パイプラインの各部分をチームごとに管理する必要がある組織の場合、組織全体で導入するには適さない可能性があります。大規模な組織では、1 つの大きなデータパイプラインを管理するのではなく、複数の小規模なデータパイプラインを用意して、それぞれをこのようなツールで管理している場合があります。ただし、このように包括的なビューがあれば、処理のエラーにグローバルに対応しやすくなり、データパイプライン全体の総合的なモニタリングとメトリクスを実現できます。

    さらに、これらのシステムは汎用的なプロセッサーコンポーネントを中心に設計されており、コンポーネントを任意に接続して、データパイプラインを構築できます。そのため柔軟性に優れていますが、信頼性とデリバリーセマンティクスの保証度は高くありません。これらのシステムでは多くの場合、ステージ間のキューイングがサポートされていますが、フォールトトレランスは通常、限定的であり、ログやメトリックの処理システムのようなものです。

関連するシステムを分類し、それぞれについて長所と短所を見てきましたが、Kafka Connect の設計には、主に次のような特性があります。

  • デフォルトでの幅広いコピー -- システム間で大量のデータをコピーするコネクターを簡単に定義でき、構成のオーバーヘッドを最小限に抑えることができます。個々のテーブルをコピーするコネクターを定義することも可能ですが、デフォルトの作業単位はデータベース全体とする必要があります。
  • ストリーミングとバッチ -- コピー元およびコピー先としてストリーミング指向とバッチ指向のどちらのシステムもサポートします。
  • アプリケーションに対するスケーラビリティ -- 開発、テスト、小規模な本稼働環境の 1 つのコネクターで実行される単一のプロセスへのスケールダウン、幅広い種類の大規模システム間でのデータのコピーを行う組織全体規模のサービスへのスケールアップができます。
  • データのコピーのみに対応 -- 信頼性の高いスケーラブルなデータのコピーに対応しています。データの変換、拡張、その他の変更は、その機能専用のフレームワークに委ねられます。それに応じて、Kafka Connect によってコピーされたデータとストリーム処理フレームワークの連携が十分である必要があります。
  • 並列 -- フレームワークでスケーラビリティの自動化を実現できるように、並列処理がコアの抽象化に組み込まれている必要があります。
  • 使いやすいコネクター API -- 新しいコネクターの開発が容易である必要があります。新しいコネクターを実装するための API およびランタイムモデルは、ジョブに最適なライブラリをシンプルに使用することができ、簡単にデータをシステム間で転送できるものである必要があります。障害からの復旧など、フレームワークがコネクターのサポートを必要とする場合については、必要なすべてのツールが Kafka Connect API に含まれている必要があります。

アーキテクチャ

Kafka Connect の設計には、主に 3 つのモデルがあります。

  • コネクターモデル: コネクターを定義するには、Connector クラスと、コピーの対象となるデータおよびそのデータのフォーマット方法を制御する構成オプションを指定します。実際にデータをコピーする一連の Tasks の定義と更新は、各 Connector インスタンスで行います。Kafka Connect が Tasks を管理します。 Connector は、一連の Tasks の生成と、タスクの更新が必要になった場合のフレームワークへの通知のみを行います。Source および SinkConnectors/Tasks は、API では区別されます。これは、両者にとって可能な限りシンプルな API を実現するためです。
  • ワーカーモデル: Kafka Connect クラスターは、Connectors および Tasks を実行するコンテナーとなっている一連の Worker プロセスで構成されます。Workers は自動的に相互の調整を行って、負荷を分散し、拡張性とフォールトトレランスを実現します。Workers は、使用可能なプロセスの間で負荷を分散しますが、プロセスの管理は行いません。 Workers については、任意のプロセス管理戦略を使用できます(YARN や Mesos などのクラスター管理ツール、Chef や Puppet などの構成管理ツール、プロセスライフサイクルの直接管理など)。
  • データモデル: コネクターはパーティション分割された入力ストリームからパーティション分割された出力ストリームにメッセージのストリームをコピーします。この入力または出力のうち少なくとも 1 つは必ず Kafka となります。これらのストリームは、ぞれぞれが順序のある一連のメッセージであり、メッセージごとに関連付けられたオフセットがあります。幅広いシステムとの連携をサポートするため、これらのオフセットのフォーマットとセマンティクスは、コネクターで定義されます。ただし、障害発生時に一定のデリバリーセマンティクスを実現するには、オフセットがストリーム内で一意であること、およびストリームで任意のオフセットのシークができることが求められます。メッセージのコンテンツは、Connectors によってシリアル化に依存しないフォーマットで表現されます。Kafka Connect は、さまざまなシリアル化フォーマットでこのデータを保存できるように、プラグ可能な Converters をサポートします。スキーマが組み込まれているため、複雑なデータパイプラインでも、メッセージのフォーマットに関する重要なメタデータを伝達することができます。ただし、スキーマが使用できない場合は、スキーマなしのデータも使用することができます。

コネクターモデルは、3 つの重要なユーザー要件に対応しています。1 つ目として、Kafka Connect は、デフォルトで幅広いコピー を実行できます。ユーザーが Connectors のレベルでジョブを定義すると、ジョブが小規模な Tasks に分割されます。この 2 段階のスキームにより、コネクターはジョブを小規模なタスクに分割するのに十分な入力が得られるため、幅広いデータのコピーを可能にする構成を使用しやすくなります。また、Connectors では即座にジョブを小規模なサブタスクに分割する方法が検討され、そのための適切な粒度が選択されるため、一定の 並列化 を実現できます。最後に、Kafka Connect では、ソースとシンクのインターフェイスが分化されているため、使いやすいコネクター API を提供できます。そのため、幅広いシステム向けのコネクターを非常に簡単に実装できます。

ワーカーモデルにより、Kafka Connect は アプリケーションに対するスケーラビリティ を実現しています。スケールダウンして、単一のワーカープロセスで実行し、ワーカープロセスをそれ自体のコーディネーターとして使用することも、クラスターモードで実行し、ワーカーでコネクターとタスクのスケジューリングを動的に行うこともできます。ただし、ワーカーの "プロセス管理" については前提事項が非常に少ないため、さまざまなクラスターマネージャーで実行することや、従来型のサービス管理を使用することが簡単にできます。このアーキテクチャではスケールアップ/スケールダウンができますが、Kafka Connect の実装では、両方のモードをサポートするユーティリティも追加されます。ジョブの管理および監視用の REST インターフェイスにより、多数のユーザーのジョブを実行する、組織全体規模のサービスとして Kafka Connect を実行することが容易になります。アドホックなジョブに特化したコマンドラインユーティリティにより、エージェントベースのアプローチが必要な開発環境、テスト環境、本稼働環境での設定と実行が容易になります。

残りの要件には、データモデルが対応しています。Kafka との緊密な連携により、さまざまな利点があります。Kafka は、ストリーミングとバッチ の両方のシステムで自然なバッファの役割を果たし、データ管理の負担が大幅に軽減され、コネクター開発者のデリバリーが保証されます。また、必ずエンドポイントの 1 つとして Kafka が必要となっているため、大規模なデータパイプラインでは、Kafka との連携に優れたさまざまなツールを利用できます。これにより、Kafka Connect は データのコピーのみに対応 できます。データをさらに処理するには、幅広いストリーム処理ツールを使用できるため、概念上も実装の面でも Kafka Connect はシンプルな状態を維持できます。これは、シンクへの送信前に ETL を実行する必要がある他のシステムと大きく異なる点です。他のツールとは対照的に、Kafka Connect では、ETL 処理を行わず、すべての変換はその目的に特化したツールに任せることができます。最後に、Kafka では、コアの抽象化にパーティションが含まれており、さらなる 並列化 が可能となっています。

内部的な Connect のオフセット

コネクターが実行されると、Kafka Connect はそれぞれの オフセット を追跡します。それにより、障害が発生した場合や、メンテナンスのための正常な再起動が行われた場合に、コネクターが前回の位置から再開することができます。これらのオフセットは、コピーされたデータのストリームにおける現在の位置を追跡するためのものであること、また、各コネクターで複数の パーティション のストリームに対応する複数のオフセットを追跡する必要がある場合がある点で、Kafka のオフセットと似ています。ただし、データの読み込み元のシステムでオフセットのフォーマットが定義されるため、Kafka トピックのように単純に long にすることができない点が異なります。たとえば、データベースからデータを読み込む場合、データベースの changelog 内の位置を示すトランザクション ID がオフセットになることがあります。

オフセットのフォーマットはコネクターによって異なるため、通常はユーザーがオフセットのフォーマットを考慮する必要はありません。ただし、Kafka Connect では、障害発生時の復旧のため、構成、オフセット、ステータスの更新用に永続ストレージが必要です。また、必要なトピックが存在しない場合、Kafka Connect によって作成が試行されますが、このストレージで使用されるトピックをユーザーが手動で作成することもできます。これらの設定は、Kafka Connect をどのように実行するかによって異なります。これについては、「コネクター開発者ガイド」で説明しています。