Monitoring Kafka Streams Applications

Metrics

Accessing Metrics

Accessing Metrics via JMX and Reporters

The Kafka Streams library reports a variety of metrics through JMX. It can also be configured to report stats using additional pluggable stats reporters using the metrics.reporters configuration option. The easiest way to view the available metrics is through tools such as JConsole, which allow you to browse JMX MBeans.

Accessing Metrics Programmatically

The entire metrics registry of a KafkaStreams instance can be accessed read-only through the method KafkaStreams#metrics(). The metrics registry will contain all the available metrics listed below. See the documentation of KafkaStreams in the Kafka Streams Javadocs for details.

Configuring Metrics Granularity

By default Kafka Streams has metrics with three recording levels: info, debug, and trace. The debug level records most metrics, while the info level records only some of them. The trace level records all possible metrics. Use the metrics.recording.level configuration option to specify which metrics you want collected, see Optional configuration parameters.

Built-in Metrics

Client Metrics

All of the following metrics have a recording level of info.

MBean: kafka.streams:type=stream-metrics,client-id=[clientId]

version
The version of the Kafka Streams client.
commit-id
The version control commit ID of the Kafka Streams client.
application-id
The application ID of the Kafka Streams client.
topology-description
The description of the topology executed in the Kafka Streams client.
state
The state of the Kafka Streams client.
alive-stream-threads
The current number of alive stream threads that are running or participating in rebalance.
failed-stream-threads
The number of failed stream threads since the start of the Kafka Streams client.

Thread Metrics

All of the following metrics have a recording level of info.

MBean: kafka.streams:type=stream-thread-metrics,thread-id=[threadId]

[commit | poll | process | punctuate]-latency-[avg | max]
The [average | maximum] execution time in ms, for the respective operation, across all running tasks of this thread.
[commit | poll | process | punctuate]-rate
The average number of respective operations per second across all tasks.
[commit | poll | process | punctuate]-total
The total number of respective operations across all tasks.
[commit | poll | process | punctuate]-ratio
The fraction of time the thread spent on the respective operations for active tasks.
poll-records-[avg | max]
The [average | maximum] number of records polled from consumer within an iteration.
process-records-[avg | max]
The [average | maximum] number of records processed within an iteration.
task-created-rate
The average number of newly-created tasks per second.
task-created-total
The total number of newly-created tasks.
task-closed-rate
The average number of tasks closed per second.
task-closed-total
The total number of tasks closed.

Task Metrics

All of the following metrics have a recording level of debug, except for the dropped-records-*, active-process-ratio, and record-e2e-latency-* metrics which have a recording level info.

MBean: kafka.streams:type=stream-task-metrics,thread-id=[threadId],task-id=[taskId]

[commit | process]-latency-[avg | max]
The [average | maximum] execution time in ns, for the respective operation for this task.
[commit | process]-rate
The average number of respective operations per second for this task.
[commit | process]-total
The total number of respective operations for this task.
record-lateness-[avg | max]
The [average | maximum] observed lateness (stream time - record timestamp) for this task. For more information on out-of-order records, see Out-of-Order Handling.
enforced-processing-rate
The average number of enforced processings per second for this task.
enforced-processing-total
The total number of enforced processings for this task.
dropped-records-rate
The average number of records dropped within this task.
dropped-records-total
The total number of records dropped within this task.
active-process-ratio
The fraction of time the thread spent on processing this active task among all assigned active tasks.

record-e2e-latency-[avg | min | max] The [average | minimum | maximum] end-to-end latency of a record, measured by comparing the record timestamp with the system time when it has been fully processed by the node.

Processor Node Metrics

The following metrics are only available on certain types of nodes. The process-rate and process-total metrics are only available for source processor nodes, and the suppression-emit-rate and suppression-emit-total metrics are only available for suppression operation nodes. All of the metrics have a recording level of debug

MBean: kafka.streams:type=stream-processor-node-metrics,thread-id=[threadId],task-id=[taskId],processor-node-id=[processorNodeId]

process-rate
The average number of records processed per second by a source node.
process-total
The total number of records processed by a source node.
suppression-emit-rate
The rate at which records that have been emitted downstream from suppression operation nodes. Compare with the process-rate metric to determine how many updates are being suppressed.
suppression-emit-total
The total number of records that have been emitted downstream from suppression operation nodes. Compare with the process-total metric to determine how many updates are being suppressed.

State Store Metrics

All the following metrics have a recording level of debug, except for the record-e2e-latency-* metrics which have a recording level trace. The store-scope value is specified in StoreSupplier#metricsScope() for the user’s customized state stores; for built-in state stores, currently we have:

  • in-memory-state
  • in-memory-lru-state
  • in-memory-window-state
  • in-memory-suppression (for suppression buffers)
  • rocksdb-state (for RocksDB backed key-value store)
  • rocksdb-window-state (for RocksDB backed window store)
  • rocksdb-session-state (for RocksDB backed session store)

Metrics suppression-buffer-size-avg, suppression-buffer-size-max, suppression-buffer-count-avg, and suppression-buffer-count-max are only available for suppression buffers. All other metrics are not available for suppression buffers.

MBean: kafka.streams:type=stream-state-metrics,thread-id=[threadId],task-id=[taskId],[storeType]-id=[storeName]

[put | put-if-absent | get | delete | put-all | all | range | flush | restore]-latency-[avg | max]
The average execution time in ns, for the respective operation.
[put | put-if-absent | get | delete | put-all | all | range | flush | restore]-rate
The average rate of respective operations per second for this store.
[put | put-if-absent | get | delete | put-all | all | range | flush | restore]-total
The total number of respective operations for this store.
suppression-buffer-size-[avg | max]
The average or maximum size of buffered data, in bytes. This helps you choose a value for BufferConfig.maxBytes(...), if desired.
suppression-buffer-count-[avg | max]

The average or maximum number of records in the buffer. This helps you choose a value for BufferConfig.maxRecords(...), if desired.

record-e2e-latency-[avg | min | max] The [average | minimum | maximum] end-to-end latency of a record, measured by comparing the record timestamp with the system time when it has been fully processed by the node.

RocksDB Metrics

RocksDB metrics are grouped into statistics-based metrics and properties-based metrics. Statistics-based metrics are recorded from statistics that a RocksDB state store collects. Properties-based metrics are recorded from properties that RocksDB exposes. Statistics collected by RocksDB provide cumulative measurements over time, for example, bytes written to the state store. Properties exposed by RocksDB provide current measurements, for example, the amount of memory currently used.

All of the following metrics have a recording level of debug. The metrics are collected every minute from the RocksDB state stores. If a state store consists of multiple RocksDB instances, which is the case for aggregations over time and session windows, each metric reports an aggregation over the RocksDB instances of the state store. The built-in RocksDB state stores have these values for storeType:

  • rocksdb-state (for RocksDB-backed key-value stores)
  • rocksdb-window-state (for RocksDB-backed window stores)
  • rocksdb-session-state (for RocksDB-backed session stores)

MBean: kafka.streams:type=stream-state-metrics,thread-id=[threadId],task-id=[taskId],[storeType]-id=[storeName]

RocksDB Statistics-based Metrics: All of the following statistics-based metrics have a recording level of debug, because collecting statistics in RocksDB may have an impact on performance. Statistics-based metrics are collected from the RocksDB state stores at a rate of once per minute. If a state store consists of multiple RocksDB instances, which is the case for aggregations over time and session windows, each metric reports an aggregation over the RocksDB instances of the state store.

bytes-written-rate
The average number of bytes written per second to the RocksDB state store.
bytes-read-rate
The average number of bytes read per second from the RocksDB state store.
memtable-bytes-flushed-rate
The average number of bytes flushed per second from the memtable to disk.
memtable-hit-ratio
The ratio of memtable hits, relative to all lookups to the memtable.
block-cache-data-hit-ratio
The ratio of block cache hits for data blocks, relative to all lookups for data blocks to the block cache.
block-cache-index-hit-ratio
The ratio of block cache hits for index blocks, relative to all lookups for index blocks to the block cache.
block-cache-filter-hit-ratio
The ratio of block cache hits for filter blocks, relative to all lookups for filter blocks to the block cache.
write-stall-duration-[avg | total]
The [average | total] duration of write stalls, in ms.
bytes-read-compaction-rate
The average number of bytes read per second during compaction.
number-open-files
The number of current open files.
number-file-errors-total
The total number of file errors that occurred.

RocksDB Properties-based Metrics: All of the following properties-based metrics have a recording level of info and are recorded when the metrics are accessed. If a state store consists of multiple RocksDB instances, which is the case for aggregations over time and session windows, each metric reports the sum over all the RocksDB instances of the state store, except for the block cache metrics, named block-cache-*. The block cache metrics report the sum over all RocksDB instances if each instance uses its own block cache, and they report the recorded value from only one instance if a single block cache is shared among all instances.

num-immutable-mem-table
The number of immutable memtables that have not yet been flushed.
cur-size-active-mem-table
The approximate size of the active memtable in bytes.
cur-size-all-mem-tables
The approximate size of active and unflushed immutable memtables in bytes.
size-all-mem-tables
The approximate size of active, unflushed immutable, and pinned immutable memtables in bytes.
num-entries-active-mem-table
The number of entries in the active memtable.
num-entries-imm-mem-tables
The number of entries in the unflushed immutable memtables.
num-deletes-active-mem-table
The number of delete entries in the active memtable.
num-deletes-imm-mem-tables
The number of delete entries in the unflushed immutable memtables.
mem-table-flush-pending
This metric reports 1 if a memtable flush is pending, otherwise it reports 0.
num-running-flushes
The number of currently running flushes.
compaction-pending
This metric reports 1 if at least one compaction is pending, otherwise it reports 0.
num-running-compactions
The number of currently running compactions.
estimate-pending-compaction-bytes
The estimated total number of bytes a compaction needs to rewrite on disk to get all levels down to under target size (only valid for level compaction).
total-sst-files-size
The total size in bytes of all Sorted Sequence Table (SST) files.
live-sst-files-size
The total size in bytes of all Sorted Sequence Table (SST) files that belong to the latest log-structured merge (LSM) tree.
num-live-versions
Number of live versions of the log-structured merge (LSM) tree.
block-cache-capacity
The capacity of the block cache in bytes.
block-cache-usage
The memory size of the entries residing in block cache in bytes.
block-cache-pinned-usage
The memory size for the entries being pinned in the block cache in bytes.
estimate-num-keys
The estimated number of keys in the active and unflushed immutable memtables and storage.
estimate-table-readers-mem
The estimated memory in bytes used for reading Sorted Sequence Tables (SSTs), excluding memory used in block cache.
background-errors
The total number of background errors.

Record Cache Metrics

All of the following metrics have a recording level of debug.

MBean: kafka.streams:type=stream-record-cache-metrics,thread-id=[threadId],task-id=[taskId],record-cache-id=[storeName]

hit-ratio-[avg | min | max]
The [average | minimum | maximum] cache hit ratio defined as the ratio of cache read hits over the total cache read requests.

Adding Your Own Metrics

Application developers using the low-level Processor API can add additional metrics to their application. The ProcessorContext#metrics() method provides a handle to the StreamMetrics object, which you can use to:

  • Add latency and throughput metrics via StreamMetrics#addLatencyRateTotalSensor() and StreamMetrics#addRateTotalSensor().
  • Add any other type of metric via StreamMetrics#addSensor().

Runtime Status Information

Status of KafkaStreams instances

Important

Don’t confuse the runtime state of a KafkaStreams instance (e.g. created, rebalancing) with state stores!

A Kafka Streams instance may be in one of several run-time states, as defined in the enum KafkaStreams.State. For example, it might be created but not running; or it might be rebalancing and thus its state stores are not available for querying. Users can access the current runtime state programmatically using the method KafkaStreams#state(). The documentation of KafkaStreams.State in the Kafka Streams Javadocs lists all the available states.

Also, you can use KafkaStreams#setStateListener() to register a KafkaStreams#StateListener method that will be triggered whenever the state changes.

Use the KafkaStreams#localThreadsMetadata() method to check the runtime state of the current KafkaStreams instance. The localThreadsMetadata() method returns a ThreadMetadata object for each local stream thread. The ThreadMetadata object describes the runtime state of a thread and the metadata for the thread’s currently assigned tasks.

Get Runtime Information on KafkaStreams Clients

You can get runtime information on these local KafkaStreams clients:

There is one admin client per KafkaStreams instance, and all other clients are per StreamThread.

Get the names of local KafkaStreams clients by calling the client ID methods on the ThreadMetadata class, like producerClientIds().

Client names are based on a client ID value, which is assigned according to the StreamsConfig.CLIENT_ID_CONFIG and StreamsConfig.APPLICATION_ID_CONFIG configuration settings.

  • If CLIENT_ID_CONFIG is set, Kafka Streams uses CLIENT_ID_CONFIG for the client ID value.

  • If CLIENT_ID_CONFIG isn’t set, Kafka Streams uses APPLICATION_ID_CONFIG and appends a random unique identifier (UUID):

    clientId = StreamsConfig.APPLICATION_ID_CONFIG + "-" + <random-UUID>
    

Kafka Streams creates names for specific clients by appending a thread ID and a descriptive string to the main client ID.

specificClientId = clientId + "-StreamThread-" + <thread-number> + <description>

For example, if CLIENT_ID_CONFIG is set to “MyClientId”, the consumerClientId() method returns a value that resembles MyClientId-StreamThread-2-consumer. If CLIENT_ID_CONFIG isn’t set, and APPLICATION_ID_CONFIG is set to “MyApplicationId”, the consumerClientId() method returns a value that resembles MyApplicationId-8d8ce4a7-85bb-41f7-ac9c-fe6f3cc0959e-StreamThread-2-consumer.

Call the threadName() method to get the thread ID:

threadId = clientId + "-StreamThread-" + <thread-number>

Depending on the configuration settings, an example thread ID resembles MyClientId-StreamThread-2 or MyApplicationId-8d8ce4a7-85bb-41f7-ac9c-fe6f3cc0959e-StreamThread-2.

adminClientId()
Gets the ID of the client application, which is the main client ID value, appended with -admin. Depending on configuration settings, the return value resembles MyClientId-admin or MyApplicationId-8d8ce4a7-85bb-41f7-ac9c-fe6f3cc0959e-admin.
producerClientIds()

Gets the names of producer clients. If exactly-once semantics (EOS version 1) is active, returns the list of task producer names, otherwise (EOS disabled or EOS version 2) returns the thread producer name. All producer client names are the main thread ID appended with -producer. If EOS version 1 is active, a -<taskId> is included.

A task ID is a sub-topology ID and a partition number, <subTopologyId>_<partition>. The subTopologyId is an integer greater than or equal to zero.

If EOS version 1 is active, the producerClientIds() method returns a Set of client names that have different task IDs. Depending on configuration settings, the return value resembles MyClientId-StreamThread-2-1_4-producer.

If EOS isn’t active or EOS version 2 is active, the return value is a single client name that doesn’t have a task ID, for example, MyClientId-StreamThread-2-producer.

For more information, see Stream Partitions and Tasks.

consumerClientId()
Gets the name of the consumer client. The consumer client name is the main thread ID appended with -consumer, for example, MyClientId-StreamThread-2-consumer.
restoreConsumerClientId()
Gets the name of the restore consumer client. The restore consumer client name is the main thread ID appended with -restore-consumer, for example, MyClientId-StreamThread-2-restore-consumer

Monitoring the Restoration Progress of Fault-tolerant State Stores

When starting up your application any fault-tolerant state stores don’t need a restoration process as the persisted state is read from local disk. But there could be situations when a full restore from the backing changelog topic is required (e.g., a failure wiped out the local state or your application runs in a stateless environment and persisted data is lost on re-starts).

If you have a significant amount of data in the changelog topic, the restoration process could take a non-negligible amount of time. Given that processing of new data won’t start until the restoration process is completed, having a window into the progress of restoration is useful.

In order to observe the restoration of all state stores you provide your application an instance of the org.apache.kafka.streams.processor.StateRestoreListener interface. You set the org.apache.kafka.streams.processor.StateRestoreListener by calling the KafkaStreams#setGlobalStateRestoreListener method.

A basic implementation example that prints restoration status to the console:

import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.streams.processor.StateRestoreListener;

 public class ConsoleGlobalRestoreListerner implements StateRestoreListener {

    @Override
    public void onRestoreStart(final TopicPartition topicPartition,
                               final String storeName,
                               final long startingOffset,
                               final long endingOffset) {

        System.out.print("Started restoration of " + storeName + " partition " + topicPartition.partition());
        System.out.println(" total records to be restored " + (endingOffset - startingOffset));
    }

    @Override
    public void onBatchRestored(final TopicPartition topicPartition,
                                final String storeName,
                                final long batchEndOffset,
                                final long numRestored) {

        System.out.println("Restored batch " + numRestored + " for " + storeName + " partition " + topicPartition.partition());

    }

    @Override
    public void onRestoreEnd(final TopicPartition topicPartition,
                             final String storeName,
                             final long totalRestored) {

        System.out.println("Restoration complete for " + storeName + " partition " + topicPartition.partition());
    }
}

Attention

The StateRestoreListener instance is shared across all org.apache.kafka.streams.processor.internals.StreamThread instances and also used for global stores. Furthermore, it is assumed all methods are stateless. If any stateful operations are desired, then the user will need to provide synchronization internally

Integration with Confluent Control Center

Since the 3.2 release, Confluent Control Center will display the underlying producer metrics and consumer metrics of a Kafka Streams application, which the Streams API uses internally whenever data needs to be read from or written to Apache Kafka® topics. These metrics can be used, for example, to monitor the so-called “consumer lag” of an application, which indicates whether an application – at its current capacity and available computing resources – is able to keep up with the incoming data volume.

A Kafka Streams application, i.e. all its running instances, appear as a single consumer group in Control Center.

Note

Restore consumers of an application are displayed separately: Behind the scenes, the Streams API uses a dedicated “restore” consumer for the purposes of fault tolerance and state management. This restore consumer manually assigns and manages the topic partitions it consumes from and is not a member of the application’s consumer group. As a result, the restore consumers will be displayed separately from their application.