Kafka コンシューマー

Confluent Platform には、Apache Kafka® に同梱されている Java コンシューマーが含まれています。

このセクションでは、コンシューマーの仕組みの大まかな概要を説明し、調整に使用する構成設定を紹介します。

さまざまな言語で記述されたコンシューマーの例については、個別の言語のセクションを参照してください。Confluent Cloud の使用など、その他の例については、「Apache Kafka® のサンプルコード」を参照してください。

概念

コンシューマーグループ

コンシューマーグループ は、連携して複数のトピックからデータを消費する一連のコンシューマーです。すべてのトピックのパーティションを、グループ内のコンシューマーで分け合います。グループにメンバーの増減があると、パーティションの再割り当てが行われ、各メンバーに一定の割合でパーティションが割り当てられます。これは、グループのバランス調整と呼ばれます。

従来の "上位レベル" のコンシューマーと新しいコンシューマーの主な違いは、前者では ZooKeeper を利用してグループ管理を行っていたのに対し、後者では Kafka 自体に組み込まれているグループプロトコルが使用される点です。このプロトコルでは、1 つのブローカーが コーディネーター に指定され、グループのメンバーおよびパーティション割り当ての管理を担います。

各グループのコーディネーターは、コミットされたオフセットを保存するために使用される、内部オフセットトピック __consumer_offsets のリーダーから選ばれます。基本的には、グループの ID がこのトピックの 1 つのパーティションにハッシュ化され、そのパーティションのリーダーがコーディネーターとして選ばれます。このようにして、コンシューマーグループの管理は、クラスター内のすべてのブローカーにほぼ均等に分散されます。そのため、グループ数が増えても、ブローカー数を増やすことで対応できます。

コンシューマーは、起動されると、グループのコーディネーターを見つけて、グループに参加するためのリクエストを送信します。コーディネーターは、新しいメンバーにもグループのパーティションが均等に割り当てられるように、グループのバランス調整を開始します。バランス調整が行われるたびに、グループの 世代 が新しくなります。

グループ内の各メンバーがグループのメンバーに留まるには、ハートビートをコーディネーターに送信する必要があります。構成されている セッションタイムアウト の期限までにハートビートを受信しなかった場合、コーディネーターはそのメンバーをグループから除外し、そのメンバーのパーティションを別のメンバーに割り当て直します。

オフセット管理

コンシューマーは、コーディネーターから割り当てを受けた後、割り当てられた各パーティションの初期位置を確定する必要があります。グループが初めて作成されると、メッセージが消費される前に、構成可能なオフセットのリセットポリシー(auto.offset.reset )に従って位置が設定されます。通常、最も早いオフセットまたは最新のオフセットで消費が開始されます。

グループ内のコンシューマーが、コーディネーターによって割り当てられたパーティションからメッセージを読み取る際には、読み取ったメッセージに対応するオフセットをコミットする必要があります。コンシューマーがクラッシュした場合やシャットダウンされた場合、そのコンシューマーのパーティションは別のメンバーに再割り当てされます。割り当てを受けたメンバーは、各パーティションの最後にコミットされたオフセットから消費を開始します。オフセットをコミットする前にコンシューマーがクラッシュした場合、そのパーティションを引き継いだコンシューマーはリセットポリシーを使用します。

オフセットコミットポリシーは、アプリケーションで求められるメッセージ配信保証を実現するうえで非常に重要です。デフォルトでは、コンシューマーは自動コミットポリシーを使用するように構成されます。自動コミットポリシーでは、一定の間隔でコミットが行われます。コンシューマーでは、コミット API もサポートされており、手動でのオフセット管理に使用できます。デリバリーセマンティクス に影響を与えるため、正しいオフセット管理が重要になります。

デフォルトでは、コンシューマーはオフセットを自動コミットするように構成されます。自動コミットを使用すると、"少なくとも 1 回" のデリバリーを実現できます。メッセージが失われないことが Kafka で保証されますが、重複が生じる可能性があります。自動コミットは、基本的には、auto.commit.interval.ms 構成プロパティで間隔が設定され、cron で実行されます。コンシューマーがクラッシュした場合、再起動またはバランス調整が行われた後、クラッシュしたコンシューマーが所有していたすべてのパーティションの位置は、最後にコミットされたオフセットにリセットされます。この場合、最後にコミットされた位置は、自動コミットの間隔自体と同じ古さになる可能性があります。最後のコミット以降に届いたメッセージは、再度読み取る必要が生じます。

重複の幅を狭めたい場合は、自動コミットの間隔を小さくすることができますが、場合によっては、よりきめ細かいオフセットの制御が求められる場合があります。そのため、コンシューマーではコミット API がサポートされており、オフセットを細かく制御できます。コミット API を直接使用する場合は、まず、enable.auto.commit プロパティを false に設定して、構成で自動コミットを無効にする必要があることに注意してください。

コミット API を呼び出すたびに、オフセットコミットリクエストがブローカーに送信されます。同期 API を使用すると、リクエストが正常に完了するまで、コンシューマーはブロックされます。これにより、全体のスループットが低下する可能性があります。ブロックされなければ、コミットの処理を待つ間にコンシューマーがレコードの処理を進められるためです。

この問題に対処する方法の 1 つは、ポーリングで返されるデータ量を大きくすることです。コンシューマーには fetch.min.bytes という構成設定があり、1 回のフェッチで返されるデータ量を制御できます。ブローカーは、十分なデータが蓄積されるまで(または fetch.max.wait.ms の時間が経過するまで)フェッチの処理を保留します。ただし、障害が発生した場合の最悪のケースでは、処理すべき重複の量が増えるというトレードオフがあります。

2 つ目の方法としては、非同期コミットを使用します。リクエストが完了するまで待つのではなく、コンシューマーで非同期コミットを使用すると、リクエストを送信してすぐに復帰できます。

非同期コミットでパフォーマンスが向上するのであれば、常に非同期コミットを使用しないのはなぜでしょうか。主な理由としては、コミットが失敗した場合にコンシューマーがリクエストを再試行しないことが挙げられます。同期コミットであればこの問題はありません。コミットが成功するか回復不可能なエラーが発生するまで無限に再試行が行われます。非同期コミットでは、コミットの順番の処理が問題となります。コミットが失敗したことをコンシューマーが認識した時点で、既に次のバッチのメッセージが処理されている可能性や、次のコミットが送信されている可能性があります。この場合、古いコミットを再試行すると、消費が重複する可能性があります。

この問題に適切に対処するためにコンシューマーの内部処理を複雑化させなように、API では、コミットが成功または失敗したときに呼び出されるコールバックが提供されます。必要に応じて、このコールバックを利用してコミットを再試行できますが、その場合でも同様に順序の問題に対処する必要があります。

オフセットのコミットが失敗しても、後続のコミットが成功すれば、大きな問題にはなりません。実際に読み取りの重複にはつながらないためです。ただし、バランス調整が行われる前、またはコンシューマーがシャットダウンされる前の最後のコミットが失敗した場合、オフセットは最後のコミットにリセットされ、重複が生じる可能性が高くなります。そのため、一般的なパターンとしては、ポーリングループでの非同期コミットと、バランス調整やシャットダウン時の同期コミットを組み合わせます。終了時にコミットするのは容易ですが、バランス調整をフックする方法が必要です。

バランス調整には、パーティションの撤回とパーティションの割り当てという 2 つの段階があります。撤回のメソッドは必ずバランス調整の前に呼び出されます。これは、パーティションの再割り当てが行われる前にオフセットをコミットする最後のチャンスになります。割り当てのメソッドは必ずバランス調整の後に呼び出されます。これを利用して、割り当てられたパーティションの初期位置を設定できます。この場合、撤回のフックを使用して、現在のオフセットを同期的にコミットします。

一般に、非同期コミットは、同期コミットよりも安全性が劣ると考える必要があります。クラッシュする前にコミットが連続して失敗していた場合、処理の重複が増加します。コールバックでコミットの失敗に対処するロジックを追加するか、場合に応じて同期コミットを組み合わせることで、このリスクを軽減することができます。ただし、テストで必要性が確認された場合を除き、複雑にしすぎないようにする必要があります。信頼性を高める必要がある場合は、同期コミットを使用できます。トピックパーティションの数およびグループ内のコンシューマーの数を増やすことで、拡張性も確保できます。ただし、スループットを最大限に高める必要があり、重複の数がある程度増えても構わない場合は、非同期コミットが適している可能性があります。

明白なことですが注意しておきたいのは、非同期コミットを利用できるのは、"少なくとも 1 回" のメッセージデリバリーの場合のみだということです。"最大 1 回" を実現するには、メッセージを消費する前にコミットが成功したかどうかを確認できる必要があります。コミットが失敗していたことが明らかになったらメッセージを "未読に戻す" 機能がない限り、これには必然的に同期コミットが必要になります。

例では、コミット API の具体的な事例をいくつか紹介し、パフォーマンスと信頼性のトレードオフについて説明します。

外部システムに書き込む場合は、コンシューマーの位置を、出力として保存された位置に合わせて調整する必要があります。そのため、コンシューマーは、出力と同じ場所にオフセットを保存します。たとえば、 Kafka Connect コネクターはデータを、読み取ったデータのオフセットとともに HDFS に入力します。それにより、データとオフセットは同時に更新されるか、両方とも更新されないかのどちらかになることが保証されます。同様のパターンは、このように強力なセマンティクスが求められ、重複排除に使用できるプライマリキーがメッセージにない、他の多くのデータシステムでも採用されています。

Kafka ではこのようにして Kafka Streams で 厳密に 1 回の処理 がサポートされており、通常は、トランザクションのプロデューサーまたはコンシューマーを使用して、Kafka のトピック間でデータの転送や処理を行う場合の厳密に 1 回のデリバリーを実現できます。それ以外の場合について、Kafka ではデフォルトで、少なくとも 1 回のデリバリーが保証されます。プロデューサーの再試行を無効にし、メッセージのバッチを処理する前にコンシューマーでオフセットをコミットすることで、最大 1 回のデリバリーを実装できます。

ちなみに

フォロワーからフェッチする構成を使用している場合、コンシューマーは非同期のフォロワーレプリカをフェッチおよび消費できます。詳細については、「Multi-Region Clusters」を参照してください。

Kafka コンシューマーの構成

すべての構成設定については、「コンシューマーの構成」に一覧が掲載されています。主な構成設定の一部と、その設定によりコンシューマーの動作にどのような影響があるかを以下にまとめました。

コアの構成

必須の設定は bootstrap.servers のみですが、client.id を設定することをお勧めします。この設定により、ブローカーに対するリクエストと、そのリクエストを行ったクライアントインスタンスの関連付けが容易になります。クライアントのクォータ を適用するため、通常、同じグループ内のすべてのコンシューマーで同じクライアント ID を使用します。

グループの構成

シンプルな割り当ての API を使用していて、Kafka にオフセットを保存する必要がない場合を除き、group.id は必ず構成する必要があります。

session.timeout.ms の値をオーバーライドすることにより、セッションのタイムアウトを制御できます。C/C++ および Java のクライアントのデフォルトは 10 秒ですが、ネットワーク接続が低速な場合や GC の停止時間が長い場合などに、バランス調整が過剰に行われないように、この時間を長くすることができます。

セッションタイムアウトに大きな数字を指定した場合の主な問題は、コンシューマーインスタンスのクラッシュをコーディネーターが検出するまでに時間がかかることです。この場合、グループ内の別のコンシューマーがパーティションを引き継ぐまでに時間がかかることになります。ただし、通常のシャットダウンの場合は、コンシューマーがグループを離脱するリクエストを明示的にコーディネーターに送信するため、即座にバランス調整が行われます。

バランス調整の動作に影響するその他の設定としては、heartbeat.interval.ms があります。この設定では、コンシューマーがコーディネーターにハートビートを送信する頻度を指定します。これは、バランス調整が必要なときにコンシューマーが検出するための手段ともなるため、ハートビートの間隔を小さくしておくと、通常はバランス調整が迅速に行われるようになります。デフォルトの設定は、3 秒です。グループの規模が大きい場合は、この設定を大きくしておくとよいでしょう。

過剰なバランス調整につながる可能性のあるもう 1 つのプロパティが、max.poll.interval.ms です。このプロパティでは、コンシューマーの poll メソッド(.NET の Consume メソッド)を呼び出す間隔の最大値を指定します。この時間が経過すると、コンシューマープロセスは失敗したと見なされます。デフォルトは 300 秒です。アプリケーションでのメッセージ処理の所要時間が長い場合は、この数値を大きくしても問題ありません。Java のコンシューマーを使用している場合は、max.poll.records の値を変更して、ループの実行ごとに処理されるレコード数を調整することもできます。

オフセット管理

オフセット管理に影響する 2 つの主な設定は、自動コミットの有効/無効と、オフセットのリセットポリシーです。1 つ目として、enable.auto.commit (デフォルトの設定)を設定した場合、コンシューマーは自動的に、auto.commit.interval.ms に設定されている間隔で定期的にオフセットをコミットします。デフォルトは 5 秒です。

2 つ目として、auto.offset.reset を使用して、コミットされた位置がない場合(グループが最初に初期化された場合)またはオフセットが範囲外の場合のコンシューマーの動作を指定します。"最も早い" オフセットと "最新の" オフセット(デフォルト)のどちらに位置をリセットするかを選択できます。"none" を選択して、最初のオフセットを自分で設定し、範囲外のエラーに手動で対応することもできます。

メッセージの処理

Java コンシューマーがすべての IO および処理をフォアグラウンドスレッドで実行している間、librdkafka ベースのクライアント(C/C++、Python、Go、C#)はバックグラウンドスレッドを使用します。その結果、複数のスレッドからポーリングが行われても、ポーリングの安全性を確保できます。これを利用して、複数のスレッドでメッセージを並列処理できます。大まかに言うと、ポーリングによりキューからメッセージが取り出され、キューへの書き込みはバックグラウンドで行われます。

バックグラウンドスレッドを使用することで得られるもう 1 つの結果として、ハートビートとバランス調整がすべてバックグラウンドで行われることが挙げられます。これにより、メッセージ処理によって、コンシューマーのバランス調整が行われなくなることを心配する必要がなくなる利点があります。ただし、メッセージプロセッサーが終了していても、バックグラウンドスレッドのハードビートが継続されるという欠点があります。この状態になった場合、コンシューマーのパーティションがそのまま保持され、プロセスがシャットダウンされるまで、読み取りのラグが蓄積されます。

クライアントによって、内部的にさまざまなアプローチが採用されていますが、実際にはそれほど大きな違いはありません。Java クライアントで同様の抽象化を実現するには、ポーリングループとメッセージプロセッサーの間にキューを配置します。ポーリングループでキューへの入力を行い、プロセッサーがキューからメッセージを取り出します。

Kafka コンシューマーグループのコマンドツール

Kafka には、コンシューマーグループのステータスを表示する管理用ユーティリティが含まれています。

グループの一覧表示

クラスター内のアクティブなグループのリストを表示するには、Kafka ディストリビューションに含まれている kafka-consumer-groups ユーティリティを使用します。クラスター内の各ブローカーを調査してリストを作成するため、大規模なクラスターでは、この処理に少し時間がかかる場合があります。

bin/kafka-consumer-groups --bootstrap-server host:9092 --list

グループの情報表示

kafka-consumer-groups ユーティリティを使用して、現在のグループの情報を収集することもできます。たとえば、foo グループの現在の割り当てを確認するには、次のコマンドを使用します。

bin/kafka-consumer-groups --bootstrap-server host:9092 --describe --group foo

バランス調整の実行中にこのコマンドを呼び出した場合、エラーが報告されます。再度実行すると、現在の世代のすべてのメンバーの割り当てを確認できます。

コードの例

Java を含む、さまざまなプログラミング言語での Kafka クライアントの基本的なコード例については、「Apache Kafka® のサンプルコード」を参照してください。これらのコード例にはすべて、オンプレミスまたは Confluent Cloud で実行される Kafka クラスターに接続できるプロデューサーとコンシューマーが含まれています。また、Schema Registry を使用して Avro データを生成および消費する方法の例も示されています。

おすすめの関連情報