Kafka Streams アプリケーションのモニタリング¶
メトリクス¶
メトリクスへのアクセス¶
JMX と Reporter によるメトリクスへのアクセス¶
Kafka Streams ライブラリは、JMX を通じてさまざまなメトリクスを報告します。また、metrics.reporters
構成オプションを使用して、プラグ可能な他の統計情報レポーターを通じて統計情報を報告するように構成することもできます。利用可能なメトリクスを確認するには、JMX MBeans を参照できる JConsole などのツールを使用するのが最も簡単です。
プログラムによるメトリクスヘのアクセス¶
KafkaStreams#metrics()
メソッドを使用すると、KafkaStreams
インスタンスのメトリクスレジストリ全体に読み取り専用でアクセスできます。メトリクスレジストリには、以下に列挙する利用可能なメトリクスがすべて含まれています。詳細については、 Kafka Streams の Javadoc で KafkaStreams
のドキュメントを参照してください。
メトリクスの粒度の構成¶
Kafka Streams のメトリクスには、デフォルトで debug
と info
という 2 つの記録レベルがあります。debug
レベルではすべてのメトリクスが記録され、info
レベルでは一部だけが記録されます。収集するメトリクスを指定するには、metrics.recording.level
構成オプションを使用します。「オプションの構成パラメーター」を参照してください。
組み込みのメトリクス¶
クライアントのメトリクス¶
以下のメトリクスの記録レベルはすべて info
です。
MBean: kafka.streams:type=stream-metrics,client-id=[clientId]
version
- Kafka Streams クライアントのバージョン。
commit-id
- Kafka Streams クライアントのバージョン管理コミット ID。
application-id
- Kafka Streams クライアントのアプリケーション ID。
topology-description
- Kafka Streams クライアントで実行されているトポロジーの説明。
state
- Kafka Streams クライアントのステート。
alive-stream-threads
- 実行中またはバランス調整に参加中の、存続しているストリームスレッドの現在の数。
failed-stream-threads
- Kafka Streams クライアントの起動以降に失敗したストリームスレッドの数。
スレッドのメトリクス¶
以下のメトリクスの記録レベルはすべて info
です。
MBean: kafka.streams:type=stream-thread-metrics,thread-id=[threadId]
[commit | poll | process | punctuate]-latency-[avg | max]
- このスレッドで実行中のすべてのタスクにおける、それぞれの操作の [平均 | 最大] 実行時間(ミリ秒)。
[commit | poll | process | punctuate]-rate
- すべてのタスクにおける、それぞれの操作の 1 秒あたりの平均数。
[commit | poll | process | punctuate]-total
- すべてのタスクにおける、それぞれの操作の合計数。
[commit | poll | process | punctuate]-ratio
- スレッドにおいて、アクティブタスクに対するそれぞれの操作に費やされた時間の割合。
poll-records-[avg | max]
- 1 回のイテレーションの間にコンシューマーからポーリングされたレコードの [平均 | 最大] 数。
process-records-[avg | max]
- 1 回のイテレーションの間に処理されたレコードの [平均 | 最大] 数。
task-created-rate
- 新しく作成されたタスクの 1 秒あたりの平均数。
task-created-total
- 新しく作成されたタスクの合計数。
task-closed-rate
- 終了したタスクの 1 秒あたりの平均数。
task-closed-total
- 終了したタスクの合計数。
タスクのメトリクス¶
以下のメトリクスの記録レベルはすべて debug
です。
MBean: kafka.streams:type=stream-task-metrics,thread-id=[threadId],task-id=[taskId]
[commit | process]-latency-[avg | max]
- このタスクにおける、それぞれの操作の [平均 | 最大] 実行時間(ナノ秒)。
[commit | process]-rate
- このタスクにおける、それぞれの操作の 1 秒あたりの平均数。
[commit | process]-total
- このタスクにおける、それぞれの操作の合計数。
record-lateness-[avg | max]
- このタスクにおける [平均 | 最大] 観測遅延時間(ストリーム時刻 - レコードのタイムスタンプ)。順序外のレコードの詳細については、「順序外の処理」を参照してください。
enforced-processing-rate
- このタスクで実行された処理の 1 秒あたりの平均数。
enforced-processing-total
- このタスクで実行された処理の合計数。
dropped-records-rate
- このタスク内で破棄されたレコードの平均数。
dropped-records-total
- このタスク内で破棄されたレコードの合計数。
active-process-ratio
- スレッドに割り当てられたすべてのアクティブタスクのうち、このアクティブタスクの処理に費やされた時間の割合。
プロセッサーノードのメトリクス¶
以下のメトリクスは、特定の種類のノードでのみ利用できます。process-rate
および process-total
メトリクスはソースプロセッサーノードでのみ利用でき、suppression-emit-rate
および suppression-emit-total
メトリクスは抑制操作ノードでのみ利用できます。これらのメトリクスの記録レベルはすべて debug
です。
MBean: kafka.streams:type=stream-processor-node-metrics,thread-id=[threadId],task-id=[taskId],processor-node-id=[processorNodeId]
process-rate
- ソースノードによって処理されたレコードの 1 秒あたりの平均数。
process-total
- ソースノードによって処理されたレコードの合計数。
suppression-emit-rate
- 抑制操作ノードからダウンストリームに送信されたレコードの割合。
process-rate
メトリックと比較すると、抑制された更新の数を確認できます。 suppression-emit-total
- 抑制操作ノードからダウンストリームに送信されたレコードの合計数。
process-total
メトリックと比較すると、抑制された更新の数を確認できます。
ステートストアのメトリクス¶
以下のメトリクスの記録レベルはすべて debug
です。カスタマイズされたユーザーのステートストアの場合、store-scope
の値は StoreSupplier#metricsScope()
で指定されます。組み込みのステートストアについては、現在、次の値が用意されています。
in-memory-state
in-memory-lru-state
in-memory-window-state
in-memory-suppression
(抑制バッファ)rocksdb-state
(RocksDB ベースのキーと値のストア)rocksdb-window-state
(RocksDB ベースのウィンドウストア)rocksdb-session-state
(RocksDB ベースのセッションストア)
suppression-buffer-size-avg
、suppression-buffer-size-max
、suppression-buffer-count-avg
、suppression-buffer-count-max
の各メトリクス は、抑制バッファでのみ利用できます。その他のすべてのメトリクスは、抑制バッファでは利用できません。
MBean: kafka.streams:type=stream-state-metrics,thread-id=[threadId],task-id=[taskId],[storeType]-id=[storeName]
[put | put-if-absent | get | delete | put-all | all | range | flush | restore]-latency-[avg | max]
- それぞれの操作の平均実行時間(ナノ秒)。
[put | put-if-absent | get | delete | put-all | all | range | flush | restore]-rate
- このストアに対するそれぞれの操作の 1 秒あたりの平均レート。
[put | put-if-absent | get | delete | put-all | all | range | flush | restore]-total
- このストアに対するそれぞれの操作の合計数。
suppression-buffer-size-[avg | max]
- バッファに格納されたデータの平均サイズまたは最大サイズ(バイト数)。必要に応じて、
BufferConfig.maxBytes(...)
の値を決定するために使用できます。 suppression-buffer-count-[avg | max]
- バッファ内のレコードの平均数または最大数。必要に応じて、
BufferConfig.maxRecords(...)
の値を決定するために使用できます。
RocksDB のメトリクス¶
以下のメトリクスの記録レベルはすべて debug
です。これらのメトリクスは、RocksDB のステートストアから毎分収集されます。複数のセッションウィンドウで長時間にわたって集約が行われる状況では、ステートストアは複数の RocksDB インスタンスで構成されます。この場合、各メトリックは、ステートストアの複数の RocksDB インスタンスに対する集約について報告します。組み込みの RocksDB ステートストアに対応する storeType
の値は次のとおりです。
rocksdb-state
(RocksDB ベースのキーと値のストア)rocksdb-window-state
(RocksDB ベースのウィンドウストア)rocksdb-session-state
(RocksDB ベースのセッションストア)
MBean: kafka.streams:type=stream-state-metrics,thread-id=[threadId],task-id=[taskId],[storeType]-id=[storeName]
bytes-written-rate
- RocksDB ステートストアに書き込まれた 1 秒あたりの平均バイト数。
bytes-read-rate
- RocksDB ステートストアから読み取られた 1 秒あたりの平均バイト数。
memtable-bytes-flushed-rate
- memtable からディスクにフラッシュされた 1 秒あたりの平均バイト数。
memtable-hit-ratio
- memtable に対するルックアップの総数と比較した、memtable のヒット率。
block-cache-data-hit-ratio
- ブロックキャッシュに対するデータブロックのルックアップの総数と比較した、ブロックキャッシュでのデータブロックのヒット率。
block-cache-index-hit-ratio
- ブロックキャッシュに対するインデックスブロックのルックアップの総数と比較した、ブロックキャッシュでのインデックスブロックのヒット率。
block-cache-filter-hit-ratio
- ブロックキャッシュに対するフィルターブロックのルックアップの総数と比較した、ブロックキャッシュでのフィルターブロックのヒット率。
write-stall-duration-[avg | total]
- 書き込み停止の [平均 | 合計] 時間(ミリ秒)。
bytes-read-compaction-rate
- 圧縮中に読み取られた 1 秒あたりの平均バイト数。
number-open-files
- 現在開いているファイルの数。
number-file-errors-total
- 発生したエラーの合計数。
レコードキャッシュのメトリクス¶
以下のメトリクスの記録レベルはすべて debug
です。
MBean: kafka.streams:type=stream-record-cache-metrics,thread-id=[threadId],task-id=[taskId],record-cache-id=[storeName]
hit-ratio-[avg | min | max]
- キャッシュの [平均 | 最小 | 最大] ヒット率。ここでのヒット率とは、キャッシュの読み取りリクエストの合計数に対する、キャッシュの読み取りヒット数の割合です。
独自のメトリクスの追加¶
低レベルの Processor API を使用しているアプリケーション開発者は、独自のアプリケーションにメトリクスを追加できます。ProcessorContext#metrics()
メソッドを呼び出すと、次の操作に使用できる StreamMetrics
オブジェクトのハンドルを取得できます。
StreamMetrics#addLatencyRateTotalSensor()
およびStreamMetrics#addRateTotalSensor()
を使用して、レイテンシとスループットに関するメトリクスを追加する。StreamMetrics#addSensor()
を使用して、他の任意の種類のメトリックを追加する。
ランタイムステータス情報¶
KafkaStreams
インスタンスのステータス¶
重要
KafkaStreams
インスタンスのランタイムステート(作成済み、バランス調整中など)を、ステートストアと混同しないでください。
Kafka Streams インスタンスのステートは、KafkaStreams.State
列挙体で定義されるいくつかのランタイムステートのいずれかで表されます。たとえば、作成されても実行されていなかったり、バランス調整中のためにステートストアがクエリを受け付けていなかったりすることがあります。ユーザーは、プログラムで KafkaStreams#state()
メソッドを使用することで、現在のランタイムステートにアクセスできます。定義済みのステートはすべて、Kafka Streams の Javadoc の KafkaStreams.State
のドキュメントに記載されています。
また、KafkaStreams#setStateListener()
を使用すると、ステートが変更されるたびにトリガーされる KafkaStreams#StateListener
メソッドを登録できます。
現在の KafkaStreams
インスタンスのランタイムステートを確認するには、KafkaStreams#localThreadsMetadata()
メソッドを使用します。localThreadsMetadata()
メソッドは、ローカルストリームスレッドごとに 1 つの ThreadMetadata
オブジェクトを返します。ThreadMetadata
オブジェクトは、スレッドのランタイムステートと、スレッドに現在割り当てられているタスクのメタデータを表します。
KafkaStreams クライアントのランタイム情報の取得¶
次のローカル KafkaStreams
クライアントについて、ランタイム情報を取得することができます。
KafkaStreams
インスタンスごとに 1 つの管理クライアントがあり、その他のクライアントはすべて StreamThread
ごとにあります。
ローカル KafkaStreams
クライアントの名前を取得するには、ThreadMetadata
クラスのクライアント ID メソッドを呼び出します。このようなメソッドとして、producerClientIds()
などがあります。
クライアント名はクライアント ID 値に基づいて設定され、クライアント ID は StreamsConfig.CLIENT_ID_CONFIG
および StreamsConfig.APPLICATION_ID_CONFIG
構成設定に従って割り当てられます。
CLIENT_ID_CONFIG
が設定されている場合、Kafka Streams はCLIENT_ID_CONFIG
をクライアント ID 値として使用します。CLIENT_ID_CONFIG
が設定されていない場合、Kafka Streams はAPPLICATION_ID_CONFIG
を使用し、ランダムな一意識別子(UUID)を付加します。clientId = StreamsConfig.APPLICATION_ID_CONFIG + "-" + <random-UUID>
Kafka Streams は、メインのクライアント ID にスレッド ID と説明文字列を追加して固有のクライアント名を作成します。
specificClientId = clientId + "-StreamThread-" + <thread-number> + <description>
たとえば、CLIENT_ID_CONFIG
が "MyClientId" に設定されている場合、consumerClientId()
メソッドからは MyClientId-StreamThread-2-consumer
のような値が返されます。CLIENT_ID_CONFIG
が設定されていない場合、APPLICATION_ID_CONFIG
が "MyApplicationId" に設定されているとすれば、consumerClientId()
メソッドからは MyApplicationId-8d8ce4a7-85bb-41f7-ac9c-fe6f3cc0959e-StreamThread-2-consumer
のような値が返されます。
スレッド ID を取得するには、threadName()
メソッドを呼び出します。
threadId = clientId + "-StreamThread-" + <thread-number>
構成設定に応じて、スレッド ID は、たとえば MyClientId-StreamThread-2
や MyApplicationId-8d8ce4a7-85bb-41f7-ac9c-fe6f3cc0959e-StreamThread-2
のようになります。
- adminClientId()
- クライアントアプリケーションの ID を取得します。これは、メインのクライアント ID 値に
-admin
を付加したものです。構成設定に応じて、戻り値は、たとえばMyClientId-admin
やMyApplicationId-8d8ce4a7-85bb-41f7-ac9c-fe6f3cc0959e-admin
のようになります。
- producerClientIds()
プロデューサークライアントの名前を取得します。"厳密に 1 回" のセマンティクス(EOS バージョン 1)が有効な場合は、タスクプロデューサー名のリストが返されます。それ以外(EOS が無効または EOS バージョン 2)の場合は、スレッドプロデューサーの名前が返されます。プロデューサークライアント名はすべて、メインのスレッド ID に
-producer
を付加したものになります。EOS バージョン 1 が有効な場合は、-<taskId>
も追加されます。タスク ID はサブトポロジー ID とパーティション番号を組み合わせたもので、
<subTopologyId>_<partition>
のようになります。subTopologyId
はゼロ以上の整数です。EOS バージョン 1 が有効な場合、
producerClientIds()
メソッドは、異なるタスク ID を持つクライアント名のSet
を返します。構成設定に応じて、戻り値はMyClientId-StreamThread-2-1_4-producer
のようになります。EOS が有効でない場合、または EOS バージョン 2 が有効な場合、戻り値は単一のクライアント名になり、タスク ID は含まれません。たとえば、
MyClientId-StreamThread-2-producer
のようになります。詳細については、「ストリームパーティションとタスク」を参照してください。
- consumerClientId()
- コンシューマークライアントの名前を取得します。コンシューマークライアント名は、メインのスレッド ID に
-consumer
を付加したもので、たとえばMyClientId-StreamThread-2-consumer
のようになります。
- restoreConsumerClientId()
- 復元コンシューマークライアントの名前を取得します。復元コンシューマークライアント名は、メインのスレッド ID に
-restore-consumer
を付加したもので、たとえばMyClientId-StreamThread-2-restore-consumer
のようになります。
フォールトトレラントなステートストアの復元状況のモニタリング¶
アプリケーションの起動時には、保存されたステートがローカルディスクから読み取られるため、フォールトトレラントなステートストアでの復元プロセスは必要ありません。ただし、状況によっては、バックアップされた changelog トピックからの完全な復元が必要になることがあります(たとえば、障害によってローカルステートが消去された場合や、アプリケーションがステートレスな環境で実行され、保存されたデータが再起動時に失われた場合)。
changelog トピックに大量のデータがあると、復元プロセスに無視できないほど長い時間がかかることがあります。新しいデータの処理は復元プロセスが完了するまで始まらないため、復元の進行状況を確認するしくみを取り入れると役立ちます。
すべてのステートストアの復元を確認できるようにするには、アプリケーションで org.apache.kafka.streams.processor.StateRestoreListener
インターフェイスのインスタンスを提供します。org.apache.kafka.streams.processor.StateRestoreListener
を設定するには、KafkaStreams#setGlobalStateRestoreListener
メソッドを呼び出します。
以下は、復元ステータスをコンソールに出力する基本的な実装の例です。
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.streams.processor.StateRestoreListener;
public class ConsoleGlobalRestoreListerner implements StateRestoreListener {
@Override
public void onRestoreStart(final TopicPartition topicPartition,
final String storeName,
final long startingOffset,
final long endingOffset) {
System.out.print("Started restoration of " + storeName + " partition " + topicPartition.partition());
System.out.println(" total records to be restored " + (endingOffset - startingOffset));
}
@Override
public void onBatchRestored(final TopicPartition topicPartition,
final String storeName,
final long batchEndOffset,
final long numRestored) {
System.out.println("Restored batch " + numRestored + " for " + storeName + " partition " + topicPartition.partition());
}
@Override
public void onRestoreEnd(final TopicPartition topicPartition,
final String storeName,
final long totalRestored) {
System.out.println("Restoration complete for " + storeName + " partition " + topicPartition.partition());
}
}
注意
StateRestoreListener
インスタンスは、すべての org.apache.kafka.streams.processor.internals.StreamThread
インスタンス間で共有され、グローバルストアにも使用されます。さらに、すべてのメソッドはステートレスと見なされます。ステートフルな操作が必要な場合は、内部で同期処理を提供する必要があります。
Confluent Control Center との統合¶
リリース 3.2 以降、 Confluent Control Center には、Kafka Streams アプリケーションの基盤の プロデューサーメトリクス と コンシューマーメトリクス が表示されます。これらは、Apache Kafka® トピックとの間でデータの読み書きが必要になるたびに、Streams API によって内部で使用されます。これらのメトリクスは、たとえば、アプリケーションの "コンシューマーラグ" をモニタリングするために使用できます。コンシューマーラグは、アプリケーションが、現在の容量と利用可能な計算リソース で受信データ量に対応できるかどうかを示します。
Control Center では、1 つの Kafka Streams アプリケーション、つまりアプリケーションのすべての実行インスタンスが、単一のコンシューマーグループとして表示されます。
注釈
アプリケーションの復元コンシューマーは別に表示 : Streams API の内部では、フォールトトレランスとステート管理のために、専用の "復元" コンシューマーが使用されます。この復元コンシューマーは、自身が消費するトピックパーティションを手動で割り当てて管理し、アプリケーションのコンシューマーグループには含まれません。結果として、復元コンシューマーはアプリケーションとは別に表示されます。
注釈
このウェブサイトには、Apache License v2 の条件に基づいて Apache Software Foundation で開発されたコンテンツが含まれています。