キャパシティプランニングとサイジング

Kafka Streams は、Apache Kafka® を基盤として構築されたシンプルで強力なストリーミングライブラリです。内部的には、Kafka Streams アプリケーションを実行するリソースをプロビジョニングするときに検討するべき重要な考慮事項があります。このセクションでは、次のような疑問に答えます。

  • Kafka Streams アプリケーションで 1 秒あたりに 100 万件のレコードを処理するには、どれだけの数の Kafka ブローカーが必要か。
  • Kafka Streams アプリケーションでそれらのレコードを処理するには、何台のクライアントマシンが必要か。
  • Kafka Streams アプリケーションには、一般にどのくらいのメモリー、CPU、ネットワークが必要か。

背景とコンテキスト

ここでは、以降のセクションを理解するために役立つ基礎的な情報についてまとめます。大部分は、Kafka Streams の アーキテクチャ のうち、パフォーマンスに影響する部分の要約になります。まずは、 本稼働環境へのデプロイ に関する情報と、トピックやパーティションの数の決定方法 を確認して、 Kafka のサイジングについての理解を見直すことをお勧めします。

Kafka Streams では Kafka のプロデューサーおよびコンシューマー API を使用: Kafka Streams アプリケーションは、通常の Kafka クライアントと同様に、内部に Kafka のプロデューサーとコンシューマーを含んでいます。このため、Kafka Streams インスタンスを追加することは、アプリにプロデューサーとコンシューマーを追加することだと認識する必要があります。

並列処理の単位はタスク: Kafka Streams では、並列処理の基本単位はストリームタスクです。タスクは、トピックごとに 1 つの Kafka パーティションから消費し、プロセッサーノードのグラフを通じてレコードを処理します。処理がステートフルである場合、タスクはステートストアに書き込み、1 つ以上の Kafka パーティションに結果を返します。並列処理の上限を高める手順は簡単で、トピックの パーティション の数を大きくするだけです。タスクの数は、これに比例して自動的に増加します。

タスクの配置が重要: パーティションやタスクの数を増やすと並列処理の上限が高まりますが、それらのタスクを物理的にどこに配置するかを決定する必要があります。これには 2 つの方法があり、1 つ目は、単一サーバーにすべてのタスクを配置してスケールアップすることです。これは、アプリが CPU に依存していて、1 台のサーバーに多数の CPU が搭載されている場合に適しています。これを実現するには、1 つのアプリで多数のスレッドを実行するか(num.stream.threads 構成オプション、デフォルトは 1)、アプリの複数のクローンを同じマシン上で起動して、クローンごとに 1 つのスレッドを実行します。これらの 2 つの間にパフォーマンスの違いはありません。2 つ目の方法は、1 台以上のマシンにタスクを分散してスケールアウトすることです。これは、アプリがネットワーク、メモリー、またはディスクに依存している場合や、単一サーバーの CPU コアの数が限られている場合に役立ちます。

自動的な負荷分散: 必要なパーティションの数と Kafka Streams インスタンスの起動場所を決定したら、後は自動で実行されます。Kafka に備わっているコンシューマーグループ管理 機能 により、ユーザーが関与しなくても、タスク間で負荷分散が行われます。既に説明したとおり、このコンテキストにおいて Kafka Streams は Kafka のクライアントであるため、この機能のメリットを受けることができます。

これらの情報をふまえて、いくつかの主なシナリオを見ていきましょう。

ステートレスとステートフル

ステートレスアプリケーション

この場合は、CPU とネットワークリソースが鍵となります。これらのアプリケーションでは、ステートを保持しておく必要がありません。たとえば、ストリーミングデータがプロセッサーノードを通過するときに、アプリケーションでデータをフィルター処理したり、データ変換などのロジックを実行したりできます。これらのアプリでは、必要に応じて Kafka トピックに最終的な出力を書き戻すことができますが、ステートストアはありません(集約や結合は行いません)。ストリーム変換のセクション には、ステートレスアプリケーションの例がいくつか掲載されています。

ステートレスアプリケーションのサイジングは、ある意味、Kafka でクライアントを追加してサイジングする場合と同様です。唯一の違いは、追加するクライアントが Kafka Streams インスタンスであるという点です。これらのインスタンスでは、各レコードの(消費だけでなく)処理も行うため、CPU リソースの消費量が増える可能性があります。したがって、それまでは主にネットワークに注意を払っていたのに対して、今度は CPU についても考慮する必要があります。

このようなアプリでは、まず OS のパフォーマンスカウンターをモニタリングして、CPU とネットワークのどちらがボトルネックであるかを判断します。CPU がボトルネックである場合は、アプリにスレッドを追加して(または、同じマシン上でいくつかのクローンインスタンスを起動して)、CPU がより多く使用されるようにします。ネットワークがボトルネックである場合は、別のマシンを追加して、そこで Kafka Streams アプリのクローンを実行する必要があります。Kafka Streams は、すべてのマシンで実行されているすべてのタスク間で自動的に負荷を分散します。

ステートフルアプリケーション

この場合は、ローカルディスクとメモリーという別の 2 つのリソースをモニタリングする必要があります。これらは集約や結合を実行するアプリケーションです。さらに、Kafka Streams の対話型クエリ を通じて、外部アプリケーションから任意のタイミングでローカルステートに対するクエリが実行される可能性があります。ストリーム変換のセクション には、ステートフルアプリケーションの例がいくつか掲載されています。

  • ローカルストレージ領域(クエリの高速化用): Kafka Streams では、組み込みデータベースを使用してローカルデータを格納します。デフォルトのストレージエンジンは RocksDB です。ローカルストレージのディスク領域が不足した場合は、アプリケーションを別のマシンにスケールアウトする必要があることを示します。この場合はスレッドを増やしても解決になりません。
  • グローバルストレージ領域(フォールトトレランス用): データはローカルに書き込まれるだけでなく、ローカルステートに障害が発生した場合にデータを回復できるように、changelog と呼ばれるトピックにもバックアップされます。このトピックではログの圧縮が有効になります。したがって、合計 S 個のステートストアからのトラフィックを考慮して Kafka をプロビジョニングする必要があります。各ステートストアは 1 つのトピックを Kafka に追加します(レプリケーション係数が replication.factor の設定によって制御される R-way レプリケーション)。よって Kafka のプロビジョニングは、クライアントが S 個の新しいトピックを書き込む場合のプロビジョニングと同じように行う必要があります。これには、トラフィックを維持するためにブローカーの追加が必要になることがあります(後述の例を参照)。
  • グローバルストレージ領域(内部操作用): Kafka Streams は、操作に重要な内部トピックを保存する場合があります。このような内部トピックの一例として、(たとえばトピックの自動再パーティション化の結果として)中間データを保持する repartition topics と呼ばれるものがあります。トピックのキーを再マップしたり、再マップされたキーを基に結合を実行したりするたびに、キーの再マップ元のトピックとほぼ同じサイズで、新しい再パーティショントピックが作成されます。したがって、再パーティショントピックからのトラフィックを考慮して Kafka をプロビジョニングする必要があります。replication.factor の設定によって、再パーティショントピックもレプリケートされる可能性があることに注意してください。
  • メモリー(パフォーマンス用): 各ステートストアには、バッファリングのためのメモリーオーバーヘッドがあります(「メモリー管理」を参照)。さらに RocksDB は、圧縮や独自のバッファリングのためにオフヒープメモリーも利用します。いずれかのマシンでメモリーが不足したら、そのクライアントマシンで利用できる RAM を増設するか、アプリケーションを他のマシンにスケールアウトすることが必要になる場合があります。この場合はスレッドを増やしても解決になりません。
  • スタンバイレプリカ(高可用性用): 障害発生時の可用性を高めるために、num.standby.replicas を設定すると(デフォルトは 0)、Kafka Streams タスクをレプリケートすることができます。スタンバイレプリカ では、処理は何も実行されません。元のタスクが利用しているステートストアの changelog トピックから消費するだけです。スタンバイレプリカがあると、システムの負荷が増加します。スタンバイレプリカは、クライアント側のネットワークリソースとストレージリソース、およびブローカーのネットワークリソースを利用します。Kafka Streams の対話型クエリ を使用している場合は、スタンバイレプリカの有効化を検討することをお勧めします。アプリでは、タスクの移行中、移行されているタスクとそこで管理されているステートへの対話型クエリに応答できなくなるためです。スタンバイレプリカは、このような状況でのフェールオーバー時間を最小化するために役立ちます。

ステートレスアプリケーションとステートフルアプリケーションのサイジングに関して、いくつかの具体的な疑問を見ていきましょう。

シナリオ 1: メッセージのストリームに対して、述語に基づいてフィルターを適用する(文字列が有効な IP アドレスかどうかをチェックするなど)必要があるとします。各メッセージは 100 バイトです。フィルター処理はステートレスな変換です。このとき、1 秒あたり 1,000 万件のメッセージ、つまり約 1 GBps のフィルター処理速度をサポートする必要があるとします(このシナリオでは、Kafka が適切にプロビジョニングされていて目的の速度を維持できると想定し、Kafka Streams アプリケーションに集中します)。各クライアントマシンのネットワーク速度が 500 MBps に制限されるとすると、2 台のクライアントマシンを用意して、Kafka Streams アプリケーションのインスタンスを 1 つずつ実行する必要があります。2 つのインスタンス間では、Kafka Streams によって自動的に処理の負荷分散が行われます。

シナリオ 2: シナリオ 1 とほとんど同じですが、異なる点として、Kafka Streams アプリケーションで .to() 演算子を使用して、フィルター処理されたすべてのメッセージを matchTopic という新しいトピックに出力します。元のメッセージの約半数がフィルター処理されると仮定します。つまり、1 秒あたり 500 万件のメッセージを matchTopic に出力する必要があります。このシナリオでは、アプリケーションに 1 GBps の取り込み速度と 0.5 GBps の出力速度が要求されるため、ネットワーク負荷の合計は 1.5 GBps になります。このシナリオでは、3 つの Kafka Streams インスタンスを 3 台の別々のクライアントマシンで実行する必要があります。

シナリオ 3: 100 万個の別個のキーを処理する必要があるとします。各キーのサイズは 8 バイト(long)、値は文字列(平均 92 バイト)です。このとき、1 つのメッセージは約 100 バイトになります。100 万件のメッセージがある場合、ステートを保持するには 1 億バイト、つまり約 100 MB が必要です。

ウィンドウ操作については、ウィンドウの数と保持時間も考慮する必要があります。たとえば、1 分ずつ進むホッピング時間ウィンドウを想定し、保持時間が 1 時間であるとします。この場合、キーごとに 60 ウィンドウを保持する必要があります。上記のデータでは 100 MB の 60 倍、つまり約 6 GB になります。

このようなストアが 20 個あるとすれば、約 120 GB が必要です。パーティション数が 100 の場合、データがパーティション間で均等に分散されるとすると、各パーティションは平均 1.2 GB になります。メモリーが 16 GB のマシンでメインメモリーにステートを保持する場合、他にメモリーが使用される余地を残して控えめに見積もると、1 台のマシンで対応できるパーティションは約 10 個です。したがって、100 個のパーティションをすべて処理するには、約 10 台のマシンが必要です。

このシナリオではすべてのデータをメモリー内に読み込む想定でしたが、データを(RocksDB などに) 永続化 する場合も、同様の計算を行います。この場合に関連するメトリックは、ディスクのストレージ領域になります。

シナリオ 4: 1 秒あたり 100 万件のリクエスト(集約や結合など)を Kafka Streams アプリケーションで処理するには、いくつの Kafka ブローカーを追加する必要があるでしょうか。これはアプリケーションの実際のロジックと、現在のブローカーの負荷によって異なります。現在のブローカーの数を B、1 秒あたり 100 万件のリクエストを収集して(未処理のまま)格納するために B 個のブローカーにかかる負荷を L とします。この負荷 L は、アプリケーションにより、L(増加なし、アプリがステートレスの場合など)から 2 * L(アプリがステートフルな処理を実行する場合など)の間のどこかになると見積もるのが妥当です。追加で必要となるブローカーの数は、既存のブローカーの現在の利用状況に依存します。

  • 既存のブローカーの 50% が利用されている場合、同じブローカーで新しい負荷に対応でき、追加のブローカーは必要ありません(ただし実際には、ストレージの 100% を利用するのではなく、ある程度の空き領域を残しておく方が安全です)。
  • 負荷 L を処理するために既存のブローカーの 100% が利用されている場合、つまり既存の Kafka ブローカーにネットワークとストレージ容量の余裕がまったくない場合、ステートストアからの新しいデータを格納するには、0(L -> L)から B(L -> 2*L)の追加のブローカーが必要になる可能性があります。

シナリオ 5: これらのレコードを Kafka Streams アプリケーションで処理するには、何台のクライアントマシンが必要でしょうか。最適な並列処理のためには、Kafka Streams インスタンスの数が、処理されるトピックのパーティション数と一致する必要があります。この数を P とします。各マシンに C 個のコアが搭載されているとすると、コアごとに 1 つのインスタンスを実行できるため、必要なマシンは P/C 台です。ただし、ネットワークがボトルネックになる可能性があるため、ユーザーがネットワークのセットアップをアップグレードできる立場にあるなどの状況でない限り、Kafka で現在の P 個のパーティションをホストする Kafka ブローカーと同数のクライアントマシンが必要です。したがって、P/C と、P を保持する現在のブローカー数のうち、大きい方が上限になると考えられます。

シナリオ 6: ストリーミングのユースケース(典型的な "高速データルックアップ" のシナリオである不正の摘出や、ステートの追跡が必要となる "集約" のユースケースなど)について考えます。現状では、従来のデータベースクラスターで 10 TB のデータを管理していて、3 回レプリケートされます(必要な領域は合計で 30 TB になります)。代わりに Kafka Streams の対話型クエリ を使用する場合、アプリケーションインスタンスはいくつ必要でしょうか。

まず、アプリケーションでは、合計 10 TB のステートをアプリインスタンス間でローカルに管理することになります(スタンバイレプリカ はないと想定します)。各クライアントマシンのストレージ領域が 1 TB だとすると、少なくとも 10 台のクライアントマシンが必要です。障害が発生しやすい場合は、さらに多く必要になる可能性があります。これは、いずれかのクライアントマシンで障害が起きると、その負荷を別のアプリケーションインスタンスが自動的に引き継ぐためです。したがって、1 台の障害を許容するには、11 台のクライアントマシンが必要です(または、ローカルストレージ領域をアップグレードして 1 TB より大きいディスクを使用します)。

ローカルストレージに加えて、データは Kafka にも格納され、元のデータベースクラスターと同じレプリケーション係数を使用してレプリケートされます。つまり、Kafka ブローカーに 30 TB のストレージが必要です。

対話型クエリでは、一見するとリモートデータベースよりも多くのストレージが使用されるように見えます(データベースのステートが 30 TB であるのに対して、対話型クエリでは、ローカルクライアント側のステートが 10 TB + サーバー側の Kafka ステートが 30 TB = 40 TB)。しかし、従来のデータベースクラスターでは、Kafka がインジェストエンジンとしても使用されている可能性に注意が必要です(最悪のケースでは、Kafka のステートに追加で 30 TB が使用されます)。それに加えて、内部のデータベースログも使用されます(最悪のケースでは、さらに 30 TB が追加されます)。このため、実際には、対話型クエリで格納されたデータの方が、従来のデータベースよりもフットプリントが小さくなる傾向にあります。

トラブルシューティング

ストア/RocksDB のパフォーマンスが低い: ワークロードが IO に依存している可能性があります。これは特に、SSD ではなくハードディスクドライブを使用している場合に発生します。ただし、SSD しか使用していない場合は、クライアント側の CPU の使用率を確認してください。使用率がきわめて高い場合、パフォーマンスの改善には、より多くのコアが必要となる可能性があります。ステートストアはデフォルトで changelog トピックを使用するため、ステートストアの使用は、RocksDB への書き込みと Kafka への生成の両方を意味します。

RocksDB のファイルサイズが想定よりも大きい: RocksDB はスパースファイルを割り当てることが多いため、ファイルサイズが大きく見えても、ストレージの実際の消費量は低い場合があります。ストレージの実際の消費量を確認してください(Linux では du コマンドを使用します)。ストレージの消費量が RocksDB に書き込まれたデータ量よりも本当に大きい場合は、書き込み増幅が発生している可能性があります。書き込み増幅を測定して調整する方法の詳細については、「RocksDB Tuning Guide」を参照してください。

アプリのメモリー使用量が多い: トポロジーに多数のストアがあると、ストアごとに一定のメモリーコストが生じます。たとえば、RocksDB がデフォルトのストアである場合、ストアごとにオフヒープメモリーが使用されます。アプリのインスタンスを複数のマシンに分散するか、RocksDBConfigSetter クラス を使用して RocksDB のメモリー使用量を減らすことを検討してください。

注釈

ウィンドウ化されたストアでは、単一の RocksDB インスタンスが使用されるのではなく、Kafka Streams は複数のインスタンスを異なる期間に使用します。これらのインスタンスは "セグメント" と呼ばれます。

RocksDBConfigSetter クラス を使用して RocksDB のメモリー使用量を減らす場合は、「その他のメモリー使用量」のセクションで最初に説明したように、RocksDB にはいくつかの重要なメモリー構成があることに注意してください。具体的な設定として、block_cache_size (デフォルトは 50 MB)、write_buffer_size (デフォルトは 16 MB)、write_buffer_count (デフォルトは 3)があります。これらのデフォルト値を使用すると、RocksDB ストアあたりの推定使用量(estimate per store とします)は、(write_buffer_size_mb * write_buffer_count ) + block_cache_size_mb (デフォルトで 98 MB)になります。

40 個のパーティションがあり、ウィンドウ化されたストアを 1 つ使用している場合(パーティションごとのセグメントはデフォルトの 3 つ)、メモリー消費量の合計は 40 * 3 * estimate per store (この例では 13440 MB)になります。

RocksDB によるディスク使用量を減らすには、RocksDBConfigSetter クラス を使用して圧縮を有効にします。詳細については、RocksDB の「Compression」を参照してください。