重要

このページの日本語コンテンツは古くなっている可能性があります。最新の英語版コンテンツをご覧になるには、こちらをクリックしてください。

スループットのための最適化

スループットのために最適化するには、プロデューサーとコンシューマーが特定の時間内にできる限り多くのデータを移動する必要があります。高スループットを実現するには、データの移動速度を最大化してください。データ速度をできる限り高速にする必要があります。

このページの構成パラメーターには、広範囲の値が指定できるものがあります。設定できる値は、必要な要件や、メッセージの平均サイズ、パーティション数といったその他の要因、さらには環境内で生じる他のさまざまな差によって影響を受けます。したがって、ベンチマークテスト を行うとアプリケーションおよび環境の構成の検証に役立ちます。

参考

ステップバイステップのチュートリアル「How to optimize your Kafka producer for throughput」に従ってください。このチュートリアルでは、kafka-producer-perf-test を使用してベースラインのパフォーマンスを測定し、大量のデータに対応するようプロデューサーを調整する方法が説明されています。

パーティションの数

トピックのパーティションは、Kafka の並列処理の単位です。プロデューサーは、さまざまなブローカーのさまざまなパーティションにメッセージを並行して送信できます。また、それらのメッセージをさまざまなコンシューマーが並行して読み取ることができます。一般に、トピックパーティションの数が多いほどスループットが高くなるため、スループットを最大化するには、Confluent Cloud クラスター内のブローカー全体に分散させるのに十分な数のパーティションが必要になります。

パーティション数を増やすことにはいくつかのトレードオフがあります。ガイドライン を参照して、パーティションの数を選択する方法を確認してください。プロデューサーのスループットとコンシューマーのスループットに基づいてパーティション数を選択し、環境のパフォーマンスのベンチマークテストを行うようにしてください。また、できる限り均等にメッセージがトピックパーティションに分散するように、データパターンとキー割り当ての設計を検討してください。これによって、特定のトピックのパーティションが他より過負荷になることを防止できます。

メッセージのバッチ処理

Kafka プロデューサーのバッチ処理方法を使用すると、同じパーティションに送信されるメッセージをバッチ化することができます。つまり、複数のメッセージを収集して単一のリクエストでまとめて送信することができます。スループットを最適化するために取れる最も重要な手段は、バッチサイズ、およびバッチのメッセージが溜まるまで待機する時間を増やすようにプロデューサーのバッチ処理をチューニングすることです。バッチサイズが大きいほど、Confluent Cloud へのリクエストが少なくなるので、プロデューサーの負荷と各リクエストを処理するブローカーの CPU オーバーヘッドが削減されます。Java クライアントでは、batch.size パラメーターを構成して、各メッセージバッチのバイト数の最大サイズを増やすことができます。バッチが満杯になるまでの時間を増やすためには、linger.ms パラメーターを構成して、メッセージの送信前のプロデューサーの待機時間を長くします。この待機時間の間、プロデューサーは、構成された batch.size にバッチが達するまで待つことができます。トレードオフとして、高いレイテンシが許容されます。送信できるメッセージでもすぐには送信されないからです。

圧縮

スループットを最適化するために、プロデューサーで圧縮を有効にすることもできます。圧縮すると、送信するビット数を削減できます。圧縮を有効にするには、compression.type パラメーターを以下のいずれかの標準圧縮コーデックに設定します。

  • lz4 (パフォーマンスのために推奨)
  • snappy
  • zstd
  • gzip
  • none (デフォルトは none、つまり圧縮なし)

パフォーマンスのためには gzip ではなく lz4 を使用してください。gzip は、計算処理が多く、アプリケーションのパフォーマンスも低下する場合があります。圧縮はデータのバッチ全体に適用されるため、バッチ処理が適切であるほど、圧縮率が高くなります。

Confluent Platform とは異なり、compression.type を Confluent Cloud トピックに対して構成することはできません。

プロデューサーに compression.type を構成する方法の詳細については、「プロデューサーの構成」を参照してください。

構成可能なトピックパラメーターと構成できないトピックパラメーターの網羅的な一覧については、「トピックの作成、編集、削除」の「トピックの編集」の手順の最後にある表を参照してください。

プロデューサーの受領応答

プロデューサーがメッセージを Confluent Cloud に送信すると、メッセージはターゲットパーティションのリーダーブローカーに送信されます。その後、プロデューサーは、次のメッセージの送信に進む前に、メッセージがコミットされたことを確認するために、リーダーブローカーからの応答を待機します(acks0 に設定されていないと想定しています。0 に設定されている場合は、プロデューサーはブローカーからの受領応答を待機しません)。まだコミットされていないメッセージをコンシューマーで読み取れないようにするために、自動チェックが適用されています。リーダーブローカーがこの応答を送信することが、プロデューサーのスループットに影響を与える可能性があります。プロデューサーが応答を受信するのが早いほど、そのプロデューサーは次のメッセージをより早く送信できるため、通常はスループットが高くなります。そのため、プロデューサーは構成パラメーター acks を設定して、リーダーブローカーが受領応答をプロデューサーに返す前に受け取っておく必要がある受領応答の数を指定できます。acks=1 を設定した場合、リーダーブローカーはローカルログにレコードを書き込むと、すべてのフォロワーからの受領応答を待機することなくリクエストの受領応答を返します。他のブローカーへのメッセージの複製が完了するのをプロデューサーが待機しなくてもよくなるので、トレードオフとして、永続性が低くなることを許容する必要があります。

メモリーの割り当て

Kafka プロデューサーは、Java クライアントが未送信メッセージを格納するためのメモリーを自動的に割り当てます。このメモリーの上限に達すると、プロデューサーは、メモリーが解放されるか max.block.ms 時間が経過するまで、それ以上の送信をブロックします。構成パラメーター buffer.memory を使用して、メモリーの割り当て量を調整できます。パーティション数が多くなければ、この調整は必要ありません。しかし、パーティション数が多い場合は、メッセージサイズ、データ蓄積待機時間、パーティション数を考慮しながら buffer.memory をチューニングすることで、より多くのパーティションでパイプラインを維持することができます。さらに、これによって、より多くのブローカーで帯域幅を有効に使用することが可能になります。

コンシューマーのフェッチ

スループットを最適化するもう 1 つの方法は、コンシューマーが Confluent Cloud のリーダーブローカーからフェッチごとに受け取るデータ量を調整することです。構成パラメーター fetch.min.bytes を増やすと、コンシューマーがフェッチリクエストごとにリーダーから受け取るデータの量を増加できます。このパラメーターは、コンシューマーからのフェッチ応答に対して想定される最小バイト数を設定します。この値を増やすと、Confluent Cloud に対して行われるフェッチリクエストの数が減り、各フェッチを処理するためのブローカーの CPU オーバーヘッドも減るため、スループットも改善します。プロデューサーのバッチ処理を増やす場合の結果と同様に、コンシューマーのこのパラメーターを増やすと、レイテンシが高くなるというトレードオフが生じることがあります。この理由は、フェッチリクエストのメッセージ数がフェッチリクエストサイズ(fetch.min.bytes)に達するか、または待機時間(構成パラメーター fetch.max.wait.ms)の期限が切れるまで、ブローカーが新しいメッセージをコンシューマーに送信しないからです。

アプリケーションで許容できるのであれば、複数のコンシューマーから成るコンシューマーグループを使用して、消費を並列化してください。消費を並列化すると、複数のコンシューマーに負荷分散して、複数のパーティションを同時に処理できるので、スループットが向上する可能性があります。この並列化の上限は、トピックのパーティションの数です。

スループットを最適化するための構成のまとめ

プロデューサー

  • batch.size: 100000 ~ 200000 に増やします(デフォルトは 16384
  • linger.ms: 10 ~ 100 に増やします(デフォルトは 0
  • compression.type=lz4 (デフォルトは none、たとえば、圧縮なしの場合)
  • acks=1 (デフォルト: all - Kafka 3.0 より前のデフォルト: 1
  • buffer.memory: パーティション数が多い場合に増やします(デフォルトは 33554432

コンシューマー

  • fetch.min.bytes: 最大で 100000 に増やします(デフォルトは 1
  • fetch.max.wait.ms=500 (デフォルトは 500