Kafka Streams アプリケーションのモニタリング

メトリクス

メトリクスへのアクセス

JMX と Reporter によるメトリクスへのアクセス

Kafka Streams ライブラリは、JMX を通じてさまざまなメトリクスを報告します。また、metrics.reporters 構成オプションを使用して、プラグ可能な他の統計情報レポーターを通じて統計情報を報告するように構成することもできます。利用可能なメトリクスを確認するには、JMX MBeans を参照できる JConsole などのツールを使用するのが最も簡単です。

プログラムによるメトリクスヘのアクセス

KafkaStreams#metrics() メソッドを使用すると、KafkaStreams インスタンスのメトリクスレジストリ全体に読み取り専用でアクセスできます。メトリクスレジストリには、以下に列挙する利用可能なメトリクスがすべて含まれています。詳細については、 Kafka Streams の JavadocKafkaStreams のドキュメントを参照してください。

メトリクスの粒度の構成

Kafka Streams のメトリクスには、デフォルトで infodebugtrace という 3 つの記録レベルがあります。debug レベルではほとんどのメトリクスが記録され、info レベルでは一部だけが記録されます。trace レベルではすべてのメトリクスが記録されます。収集するメトリクスを指定するには、metrics.recording.level 構成オプションを使用します。「オプションの構成パラメーター」を参照してください。

組み込みのメトリクス

クライアントのメトリクス

以下のメトリクスの記録レベルはすべて info です。

MBean: kafka.streams:type=stream-metrics,client-id=[clientId]

version
Kafka Streams クライアントのバージョン。
commit-id
Kafka Streams クライアントのバージョン管理コミット ID。
application-id
Kafka Streams クライアントのアプリケーション ID。
topology-description
Kafka Streams クライアントで実行されているトポロジーの説明。
state
Kafka Streams クライアントのステート。
alive-stream-threads
実行中またはバランス調整に参加中の、存続しているストリームスレッドの現在の数。
failed-stream-threads
Kafka Streams クライアントの起動以降に失敗したストリームスレッドの数。

スレッドのメトリクス

以下のメトリクスの記録レベルはすべて info です。

MBean: kafka.streams:type=stream-thread-metrics,thread-id=[threadId]

[commit | poll | process | punctuate]-latency-[avg | max]
このスレッドで実行中のすべてのタスクにおける、それぞれの操作の [平均 | 最大] 実行時間(ミリ秒)。
[commit | poll | process | punctuate]-rate
すべてのタスクにおける、それぞれの操作の 1 秒あたりの平均数。
[commit | poll | process | punctuate]-total
すべてのタスクにおける、それぞれの操作の合計数。
[commit | poll | process | punctuate]-ratio
スレッドにおいて、アクティブタスクに対するそれぞれの操作に費やされた時間の割合。
poll-records-[avg | max]
1 回のイテレーションの間にコンシューマーからポーリングされたレコードの [平均 | 最大] 数。
process-records-[avg | max]
1 回のイテレーションの間に処理されたレコードの [平均 | 最大] 数。
task-created-rate
新しく作成されたタスクの 1 秒あたりの平均数。
task-created-total
新しく作成されたタスクの合計数。
task-closed-rate
終了したタスクの 1 秒あたりの平均数。
task-closed-total
終了したタスクの合計数。

タスクのメトリクス

以下のメトリクスはすべて、記録レベルが debug です。ただし、dropped-records-*active-process-ratiorecord-e2e-latency-* は例外で、これらのメトリクスの記録レベルは info です。

MBean: kafka.streams:type=stream-task-metrics,thread-id=[threadId],task-id=[taskId]

[commit | process]-latency-[avg | max]
このタスクにおける、それぞれの操作の [平均 | 最大] 実行時間(ナノ秒)。
[commit | process]-rate
このタスクにおける、それぞれの操作の 1 秒あたりの平均数。
[commit | process]-total
このタスクにおける、それぞれの操作の合計数。
record-lateness-[avg | max]
このタスクにおける [平均 | 最大] 観測遅延時間(ストリーム時刻 - レコードのタイムスタンプ)。順序外のレコードの詳細については、「順序外の処理」を参照してください。
enforced-processing-rate
このタスクで実行された処理の 1 秒あたりの平均数。
enforced-processing-total
このタスクで実行された処理の合計数。
dropped-records-rate
このタスク内で破棄されたレコードの平均数。
dropped-records-total
このタスク内で破棄されたレコードの合計数。
active-process-ratio
スレッドに割り当てられたすべてのアクティブタスクのうち、このアクティブタスクの処理に費やされた時間の割合。

record-e2e-latency-[avg | min | max] レコードのタイムスタンプとノードでの処理が完了したときのシステム時刻を比較して算出される、レコードのエンドツーエンドのレイテンシの [平均 | 最小 | 最大] 値。

プロセッサーノードのメトリクス

以下のメトリクスは、特定の種類のノードでのみ利用できます。process-rate および process-total メトリクスはソースプロセッサーノードでのみ利用でき、suppression-emit-rate および suppression-emit-total メトリクスは抑制操作ノードでのみ利用できます。これらのメトリクスの記録レベルはすべて debug です。

MBean: kafka.streams:type=stream-processor-node-metrics,thread-id=[threadId],task-id=[taskId],processor-node-id=[processorNodeId]

process-rate
ソースノードによって処理されたレコードの 1 秒あたりの平均数。
process-total
ソースノードによって処理されたレコードの合計数。
suppression-emit-rate
抑制操作ノードからダウンストリームに送信されたレコードの割合。process-rate メトリックと比較すると、抑制された更新の数を確認できます。
suppression-emit-total
抑制操作ノードからダウンストリームに送信されたレコードの合計数。process-total メトリックと比較すると、抑制された更新の数を確認できます。

ステートストアのメトリクス

以下のメトリクスはすべて、記録レベルが debug です。ただし、record-e2e-latency-* は例外で、このメトリクスの記録レベルは trace です。カスタマイズされたユーザーのステートストアの場合、store-scope の値は StoreSupplier#metricsScope() で指定されます。組み込みのステートストアについては、現在、次の値が用意されています。

  • in-memory-state
  • in-memory-lru-state
  • in-memory-window-state
  • in-memory-suppression (抑制バッファ)
  • rocksdb-state (RocksDB ベースのキーと値のストア)
  • rocksdb-window-state (RocksDB ベースのウィンドウストア)
  • rocksdb-session-state (RocksDB ベースのセッションストア)

suppression-buffer-size-avgsuppression-buffer-size-maxsuppression-buffer-count-avgsuppression-buffer-count-max の各メトリクス は、抑制バッファでのみ利用できます。その他のすべてのメトリクスは、抑制バッファでは利用できません。

MBean: kafka.streams:type=stream-state-metrics,thread-id=[threadId],task-id=[taskId],[storeType]-id=[storeName]

[put | put-if-absent | get | delete | put-all | all | range | flush | restore]-latency-[avg | max]
それぞれの操作の平均実行時間(ナノ秒)。
[put | put-if-absent | get | delete | put-all | all | range | flush | restore]-rate
このストアに対するそれぞれの操作の 1 秒あたりの平均レート。
[put | put-if-absent | get | delete | put-all | all | range | flush | restore]-total
このストアに対するそれぞれの操作の合計数。
suppression-buffer-size-[avg | max]
バッファに格納されたデータの平均サイズまたは最大サイズ(バイト数)。必要に応じて、BufferConfig.maxBytes(...) の値を決定するために使用できます。
suppression-buffer-count-[avg | max]

バッファ内のレコードの平均数または最大数。必要に応じて、BufferConfig.maxRecords(...) の値を決定するために使用できます。

record-e2e-latency-[avg | min | max] レコードのタイムスタンプとノードでの処理が完了したときのシステム時刻を比較して算出される、レコードのエンドツーエンドのレイテンシの [平均 | 最小 | 最大] 値。

RocksDB のメトリクス

RocksDB のメトリクスは、統計ベースのメトリクスとプロパティベースのメトリクスに分類されます。統計ベースのメトリクスは、RocksDB のステートストアで収集される統計情報から記録されます。プロパティベースのメトリクスは、RocksDB で公開されるプロパティから記録されます。RocksDB で収集される統計情報は、たとえば、ステートストアに書き込まれたバイト数など、経時的に累積された計測値です。RocksDB で公開されるプロパティは、たとえば、現在使用されているメモリー量など、現在の計測値です。

以下のメトリクスの記録レベルはすべて debug です。これらのメトリクスは、RocksDB のステートストアから毎分収集されます。複数のセッションウィンドウで長時間にわたって集約が行われる状況では、ステートストアは複数の RocksDB インスタンスで構成されます。この場合、各メトリックは、ステートストアの複数の RocksDB インスタンスに対する集約について報告します。組み込みの RocksDB ステートストアに対応する storeType の値は次のとおりです。

  • rocksdb-state (RocksDB ベースのキーと値のストア)
  • rocksdb-window-state (RocksDB ベースのウィンドウストア)
  • rocksdb-session-state (RocksDB ベースのセッションストア)

MBean: kafka.streams:type=stream-state-metrics,thread-id=[threadId],task-id=[taskId],[storeType]-id=[storeName]

RocksDB の統計ベースのメトリクス: 以下の統計ベースのメトリクスはすべて、記録レベルは debug です。RocksDB での統計情報の収集は、パフォーマンスに影響する場合がある ためです。統計ベースのメトリクスは、RocksDB のステートストアから 1 分に 1 回のペースで収集されます。複数のセッションウィンドウで長時間にわたって集約が行われる状況では、ステートストアは複数の RocksDB インスタンスで構成されます。この場合、各メトリックは、ステートストアの複数の RocksDB インスタンスに対する集約について報告します。

bytes-written-rate
RocksDB ステートストアに書き込まれた 1 秒あたりの平均バイト数。
bytes-read-rate
RocksDB ステートストアから読み取られた 1 秒あたりの平均バイト数。
memtable-bytes-flushed-rate
memtable からディスクにフラッシュされた 1 秒あたりの平均バイト数。
memtable-hit-ratio
memtable に対するルックアップの総数と比較した、memtable のヒット率。
block-cache-data-hit-ratio
ブロックキャッシュに対するデータブロックのルックアップの総数と比較した、ブロックキャッシュでのデータブロックのヒット率。
block-cache-index-hit-ratio
ブロックキャッシュに対するインデックスブロックのルックアップの総数と比較した、ブロックキャッシュでのインデックスブロックのヒット率。
block-cache-filter-hit-ratio
ブロックキャッシュに対するフィルターブロックのルックアップの総数と比較した、ブロックキャッシュでのフィルターブロックのヒット率。
write-stall-duration-[avg | total]
書き込み停止の [平均 | 合計] 時間(ミリ秒)。
bytes-read-compaction-rate
圧縮中に読み取られた 1 秒あたりの平均バイト数。
number-open-files
現在開いているファイルの数。
number-file-errors-total
発生したエラーの合計数。

RocksDB のプロパティベースのメトリクス: 以下のプロパティベースのメトリクスはすべて、記録レベルは info であり、メトリクスへのアクセス時に記録されます。複数のセッションウィンドウで長時間にわたって集約が行われる状況では、ステートストアは複数の RocksDB インスタンスで構成されます。この場合、各メトリックは、そのステートストアのすべての RocksDB インスタンスの合計を報告します。ただし、block-cache-* という名前のブロックキャッシュメトリクスは例外です。ブロックキャッシュメトリクスは、各インスタンスで個別のブロックキャッシュが使用されている場合は、すべての RocksDB インスタンスの合計を報告しますが、すべてのインスタンスで単一のブロックキャッシュが共有されている場合は、その単一のインスタンスで記録された値を報告します。

num-immutable-mem-table
まだフラッシュされていない変更不可の memtable の数。
cur-size-active-mem-table
アクティブな memtable のおおよそのサイズ(バイト数)。
cur-size-all-mem-tables
アクティブ、およびフラッシュされていない変更不可の memtable のおおよそのサイズ(バイト数)。
size-all-mem-tables
アクティブ、フラッシュされていない変更不可、および固定されている変更不可の memtable のおおよそのサイズ(バイト数)。
num-entries-active-mem-table
アクティブな memtable のエントリの数。
num-entries-imm-mem-tables
フラッシュされていない変更不可の memtable のエントリの数。
num-deletes-active-mem-table
アクティブな memtable の削除エントリの数。
num-deletes-imm-mem-tables
フラッシュされていない変更不可の memtable の削除エントリの数。
mem-table-flush-pending
このメトリックは、memtable のフラッシュが保留中の場合は 1 を、それ以外の場合は 0 を返します。
num-running-flushes
現在実行されているフラッシュの数。
compaction-pending
このメトリックは、保留中の圧縮が 1 つ以上ある場合に 1 を、それ以外の場合は 0 を返します。
num-running-compactions
現在実行されている圧縮の数。
estimate-pending-compaction-bytes
すべてのレベルを目標サイズ未満に抑えるために、圧縮でディスクに再書き込みする必要がある推定の合計バイト数(レベル圧縮についてのみ有効)。
total-sst-files-size
すべてのソート済みシーケンステーブル(SST)ファイルの合計サイズ(バイト数)。
live-sst-files-size
最新のログ構造マージ(LSM)ツリーに属する、すべてのソート済みシーケンステーブル(SST)ファイルの合計サイズ(バイト数)。
num-live-versions
ライブバーションのログ構造マージ(LSM)ツリーの数。
block-cache-capacity
ブロックキャッシュのキャパシティ(バイト数)。
block-cache-usage
ブロックキャッシュ内に存在するエントリのメモリーサイズ(バイト数)。
block-cache-pinned-usage
ブロックキャッシュ内の固定されているエントリのメモリーサイズ(バイト数)。
estimate-num-keys
アクティブおよびフラッシュされていない変更不可の memtable およびストレージ内の推定されるキーの数。
estimate-table-readers-mem
ブロックキャッシュで使用されているメモリーを除外した、ソート済みシーケンステーブル(SST)の読み取りに使用される推定のメモリー(バイト数)。
background-errors
バックグラウンドエラーの合計数。

レコードキャッシュのメトリクス

以下のメトリクスの記録レベルはすべて debug です。

MBean: kafka.streams:type=stream-record-cache-metrics,thread-id=[threadId],task-id=[taskId],record-cache-id=[storeName]

hit-ratio-[avg | min | max]
キャッシュの [平均 | 最小 | 最大] ヒット率。ここでのヒット率とは、キャッシュの読み取りリクエストの合計数に対する、キャッシュの読み取りヒット数の割合です。

独自のメトリクスの追加

低レベルの Processor API を使用しているアプリケーション開発者は、独自のアプリケーションにメトリクスを追加できます。ProcessorContext#metrics() メソッドを呼び出すと、次の操作に使用できる StreamMetrics オブジェクトのハンドルを取得できます。

  • StreamMetrics#addLatencyRateTotalSensor() および StreamMetrics#addRateTotalSensor() を使用して、レイテンシとスループットに関するメトリクスを追加する。
  • StreamMetrics#addSensor() を使用して、他の任意の種類のメトリックを追加する。

ランタイムステータス情報

KafkaStreams インスタンスのステータス

重要

KafkaStreams インスタンスのランタイムステート(作成済み、バランス調整中など)を、ステートストアと混同しないでください。

Kafka Streams インスタンスのステートは、KafkaStreams.State 列挙体で定義されるいくつかのランタイムステートのいずれかで表されます。たとえば、作成されても実行されていなかったり、バランス調整中のためにステートストアがクエリを受け付けていなかったりすることがあります。ユーザーは、プログラムで KafkaStreams#state() メソッドを使用することで、現在のランタイムステートにアクセスできます。定義済みのステートはすべて、Kafka Streams の JavadocKafkaStreams.State のドキュメントに記載されています。

また、KafkaStreams#setStateListener() を使用すると、ステートが変更されるたびにトリガーされる KafkaStreams#StateListener メソッドを登録できます。

現在の KafkaStreams インスタンスのランタイムステートを確認するには、KafkaStreams#localThreadsMetadata() メソッドを使用します。localThreadsMetadata() メソッドは、ローカルストリームスレッドごとに 1 つの ThreadMetadata オブジェクトを返します。ThreadMetadata オブジェクトは、スレッドのランタイムステートと、スレッドに現在割り当てられているタスクのメタデータを表します。

KafkaStreams クライアントのランタイム情報の取得

次のローカル KafkaStreams クライアントについて、ランタイム情報を取得することができます。

KafkaStreams インスタンスごとに 1 つの管理クライアントがあり、その他のクライアントはすべて StreamThread ごとにあります。

ローカル KafkaStreams クライアントの名前を取得するには、ThreadMetadata クラスのクライアント ID メソッドを呼び出します。このようなメソッドとして、producerClientIds() などがあります。

クライアント名はクライアント ID 値に基づいて設定され、クライアント ID は StreamsConfig.CLIENT_ID_CONFIG および StreamsConfig.APPLICATION_ID_CONFIG 構成設定に従って割り当てられます。

  • CLIENT_ID_CONFIG が設定されている場合、Kafka Streams は CLIENT_ID_CONFIG をクライアント ID 値として使用します。

  • CLIENT_ID_CONFIG が設定されていない場合、Kafka Streams は APPLICATION_ID_CONFIG を使用し、ランダムな一意識別子(UUID)を付加します。

    clientId = StreamsConfig.APPLICATION_ID_CONFIG + "-" + <random-UUID>
    

Kafka Streams は、メインのクライアント ID にスレッド ID と説明文字列を追加して固有のクライアント名を作成します。

specificClientId = clientId + "-StreamThread-" + <thread-number> + <description>

たとえば、CLIENT_ID_CONFIG が "MyClientId" に設定されている場合、consumerClientId() メソッドからは MyClientId-StreamThread-2-consumer のような値が返されます。CLIENT_ID_CONFIG が設定されていない場合、APPLICATION_ID_CONFIG が "MyApplicationId" に設定されているとすれば、consumerClientId() メソッドからは MyApplicationId-8d8ce4a7-85bb-41f7-ac9c-fe6f3cc0959e-StreamThread-2-consumer のような値が返されます。

スレッド ID を取得するには、threadName() メソッドを呼び出します。

threadId = clientId + "-StreamThread-" + <thread-number>

構成設定に応じて、スレッド ID は、たとえば MyClientId-StreamThread-2MyApplicationId-8d8ce4a7-85bb-41f7-ac9c-fe6f3cc0959e-StreamThread-2 のようになります。

adminClientId()
クライアントアプリケーションの ID を取得します。これは、メインのクライアント ID 値に -admin を付加したものです。構成設定に応じて、戻り値は、たとえば MyClientId-adminMyApplicationId-8d8ce4a7-85bb-41f7-ac9c-fe6f3cc0959e-admin のようになります。
producerClientIds()

プロデューサークライアントの名前を取得します。"厳密に 1 回" のセマンティクス(EOS バージョン 1)が有効な場合は、タスクプロデューサー名のリストが返されます。それ以外(EOS が無効または EOS バージョン 2)の場合は、スレッドプロデューサーの名前が返されます。プロデューサークライアント名はすべて、メインのスレッド ID に -producer を付加したものになります。EOS バージョン 1 が有効な場合は、-<taskId> も追加されます。

タスク ID はサブトポロジー ID とパーティション番号を組み合わせたもので、<subTopologyId>_<partition> のようになります。subTopologyId はゼロ以上の整数です。

EOS バージョン 1 が有効な場合、producerClientIds() メソッドは、異なるタスク ID を持つクライアント名の Set を返します。構成設定に応じて、戻り値は MyClientId-StreamThread-2-1_4-producer のようになります。

EOS が有効でない場合、または EOS バージョン 2 が有効な場合、戻り値は単一のクライアント名になり、タスク ID は含まれません。たとえば、MyClientId-StreamThread-2-producer のようになります。

詳細については、「ストリームパーティションとタスク」を参照してください。

consumerClientId()
コンシューマークライアントの名前を取得します。コンシューマークライアント名は、メインのスレッド ID に -consumer を付加したもので、たとえば MyClientId-StreamThread-2-consumer のようになります。
restoreConsumerClientId()
復元コンシューマークライアントの名前を取得します。復元コンシューマークライアント名は、メインのスレッド ID に -restore-consumer を付加したもので、たとえば MyClientId-StreamThread-2-restore-consumer のようになります。

フォールトトレラントなステートストアの復元状況のモニタリング

アプリケーションの起動時には、保存されたステートがローカルディスクから読み取られるため、フォールトトレラントなステートストアでの復元プロセスは必要ありません。ただし、状況によっては、バックアップされた changelog トピックからの完全な復元が必要になることがあります(たとえば、障害によってローカルステートが消去された場合や、アプリケーションがステートレスな環境で実行され、保存されたデータが再起動時に失われた場合)。

changelog トピックに大量のデータがあると、復元プロセスに無視できないほど長い時間がかかることがあります。新しいデータの処理は復元プロセスが完了するまで始まらないため、復元の進行状況を確認するしくみを取り入れると役立ちます。

すべてのステートストアの復元を確認できるようにするには、アプリケーションで org.apache.kafka.streams.processor.StateRestoreListener インターフェイスのインスタンスを提供します。org.apache.kafka.streams.processor.StateRestoreListener を設定するには、KafkaStreams#setGlobalStateRestoreListener メソッドを呼び出します。

以下は、復元ステータスをコンソールに出力する基本的な実装の例です。

import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.streams.processor.StateRestoreListener;

 public class ConsoleGlobalRestoreListerner implements StateRestoreListener {

    @Override
    public void onRestoreStart(final TopicPartition topicPartition,
                               final String storeName,
                               final long startingOffset,
                               final long endingOffset) {

        System.out.print("Started restoration of " + storeName + " partition " + topicPartition.partition());
        System.out.println(" total records to be restored " + (endingOffset - startingOffset));
    }

    @Override
    public void onBatchRestored(final TopicPartition topicPartition,
                                final String storeName,
                                final long batchEndOffset,
                                final long numRestored) {

        System.out.println("Restored batch " + numRestored + " for " + storeName + " partition " + topicPartition.partition());

    }

    @Override
    public void onRestoreEnd(final TopicPartition topicPartition,
                             final String storeName,
                             final long totalRestored) {

        System.out.println("Restoration complete for " + storeName + " partition " + topicPartition.partition());
    }
}

注意

StateRestoreListener インスタンスは、すべての org.apache.kafka.streams.processor.internals.StreamThread インスタンス間で共有され、グローバルストアにも使用されます。さらに、すべてのメソッドはステートレスと見なされます。ステートフルな操作が必要な場合は、内部で同期処理を提供する必要があります。

Confluent Control Center との統合

リリース 3.2 以降、 Confluent Control Center には、Kafka Streams アプリケーションの基盤の プロデューサーメトリクスコンシューマーメトリクス が表示されます。これらは、Apache Kafka® トピックとの間でデータの読み書きが必要になるたびに、Streams API によって内部で使用されます。これらのメトリクスは、たとえば、アプリケーションの "コンシューマーラグ" をモニタリングするために使用できます。コンシューマーラグは、アプリケーションが、現在の容量と利用可能な計算リソース で受信データ量に対応できるかどうかを示します。

Control Center では、1 つの Kafka Streams アプリケーション、つまりアプリケーションのすべての実行インスタンスが、単一のコンシューマーグループとして表示されます。

注釈

アプリケーションの復元コンシューマーは別に表示 : Streams API の内部では、フォールトトレランスとステート管理のために、専用の "復元" コンシューマーが使用されます。この復元コンシューマーは、自身が消費するトピックパーティションを手動で割り当てて管理し、アプリケーションのコンシューマーグループには含まれません。結果として、復元コンシューマーはアプリケーションとは別に表示されます。

注釈

このウェブサイトには、Apache License v2 の条件に基づいて Apache Software Foundation で開発されたコンテンツが含まれています。