.. title:: Monitor Kafka Streams Applications in Confluent Platform .. meta:: :description: Learn about JMX metrics provided by the Kafka Streams library for monitoring your Kafka Streams applications. .. _streams_monitoring: Monitor |kstreams| Applications in |cp| ======================================= |ak-tm| reports a variety of metrics through JMX. You can configure your |kstreams| applications to report stats using pluggable reporter configuration settings. Metrics ------- Access metrics ^^^^^^^^^^^^^^ Access metrics using JMX and reporters """""""""""""""""""""""""""""""""""""" The |kstreams| library reports a variety of metrics through JMX. It can also be configured to report stats using additional pluggable stats reporters using the ``metrics.reporters`` configuration option. The easiest way to view the available metrics is through tools such as `JConsole `__, which allow you to browse JMX MBeans. For all |ak-tm| metrics, see :ref:`kafka_monitoring`. For Cluster Linking metrics, see :ref:`cluster-linking-metrics`. Access metrics programmatically """"""""""""""""""""""""""""""" The entire metrics registry of a |kstreams| instance can be accessed read-only through the method ``KafkaStreams#metrics()``. The metrics registry contains all of the available metrics listed below. For more information, see :ref:`Kafka Streams Javadocs `. The metrics for |kstreams| have a four-level hierarchy: - At the top level, there are client-level metrics for each running |kstreams| client. - Each client has *stream threads* with their own metrics. - Each stream thread has *tasks* with their own metrics. - Each task has a number of *processor nodes* with their own metrics. Also, each task has a number of *state stores* and *record caches*, all of which have their own metrics. Configure metrics granularity ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^ By default, |kstreams| has metrics with three recording levels: ``info``, ``debug``, and ``trace``. The ``debug`` level records most metrics, while the ``info`` level records only some of them. The ``trace`` level records all possible metrics. Use the ``metrics.recording.level`` configuration option to specify which metrics you want collected, for example: .. code:: properties metrics.recording.level="info" For more information, see :ref:`streams_developer-guide_optional-configs`. Built-in metrics ^^^^^^^^^^^^^^^^ .. _client-metrics: Client metrics """""""""""""" All of the following metrics have a recording level of ``info``. **MBean: kafka.streams:type=stream-metrics,client-id=[clientId]** ``version`` The version of the Kafka Streams client. ``commit-id`` The version control commit ID of the Kafka Streams client. ``application-id`` The application ID of the Kafka Streams client. ``topology-description`` The description of the topology executed in the Kafka Streams client. ``state`` The state of the Kafka Streams client. ``alive-stream-threads`` The current number of alive stream threads that are running or participating in rebalance. ``failed-stream-threads`` The number of failed stream threads since the start of the Kafka Streams client. .. _thread-metrics: Thread metrics """""""""""""" All of the following metrics have a recording level of ``info``. **MBean: kafka.streams:type=stream-thread-metrics,thread-id=[threadId]** ``blocked-time-ns-total`` The total time the |kstreams| thread spent blocked on |ak| since it was started, in nanoseconds (ns). You can sample this metric periodically and use the difference between samples to measure time blocked during an interval, which is useful for debugging |kstreams| application performance, because it gives the proportion of time the application was blocked on |ak|, versus processing messages. ``[commit | poll | process | punctuate]-latency-[avg | max]`` The [average | maximum] execution time in ms, for the respective operation, across all running tasks of this thread. ``[commit | poll | process | punctuate]-rate`` The average number of respective operations per second across all tasks. ``[commit | poll | process | punctuate]-total`` The total number of respective operations across all tasks. ``[commit | poll | process | punctuate]-ratio`` The fraction of time the thread spent on the respective operations for active tasks. ``poll-records-[avg | max]`` The [average | maximum] number of records polled from consumer within an iteration. ``process-records-[avg | max]`` The [average | maximum] number of records processed within an iteration. ``task-created-rate`` The average number of newly-created tasks per second. ``task-created-total`` The total number of newly-created tasks. ``task-closed-rate`` The average number of tasks closed per second. ``task-closed-total`` The total number of tasks closed. ``thread-start-time`` The epoch time when the |kstreams| thread was started, which is useful for computing the processing ratio during the first interval after the thread starts. .. _task-metrics: Task metrics """""""""""" All of the following metrics have a recording level of ``debug``, except for the ``dropped-records-*``, ``active-process-ratio``, and ``record-e2e-latency-*`` metrics, which have a recording level ``info``. All latency metrics are reported in nanoseconds (ns). **MBean: kafka.streams:type=stream-task-metrics,thread-id=[threadId],task-id=[taskId]** ``commit-latency-[avg | max]`` The [average | maximum] execution time in nanoseconds (ns), for committing. ``commit-rate`` The average number of commit calls per second. ``commit-total`` The total number of commit calls. ``process-latency-[avg | max]`` The [average | maximum] execution time, in nanoseconds (ns), for the respective operation for this task. ``process-rate`` The average number of respective operations per second for this task. ``process-total`` The total number of respective operations for this task. ``record-lateness-[avg | max]`` The [average | maximum] observed lateness (:ref:`stream time ` - record timestamp) for this task. For more information on out-of-order records, see :ref:`streams_concepts_out-out-order-handling`. ``enforced-processing-rate`` The average number of enforced processings per second for this task. ``enforced-processing-total`` The total number of enforced processings for this task. ``dropped-records-rate`` The average number of records dropped within this task. ``dropped-records-total`` The total number of records dropped within this task. ``active-process-ratio`` The fraction of time the thread spent on processing this active task among all assigned active tasks. .. _processor-node-metrics: Processor node metrics """""""""""""""""""""" The following metrics are available only on certain types of nodes. - The ``process-rate`` and ``process-total`` metrics are available only for source processor nodes. - The ``suppression-emit-rate`` and ``suppression-emit-total`` metrics are available only for suppression operation nodes. - The ``record-e2e-latency-*`` metrics are available only for source processor nodes and terminal nodes (nodes without successor nodes). All of the metrics have a recording level of ``debug``, except for the ``record-e2e-latency-*`` metrics, which have a recording level of ``info``. **MBean: kafka.streams:type=stream-processor-node-metrics,thread-id=[threadId],task-id=[taskId],processor-node-id=[processorNodeId]** ``bytes-consumed-total`` The total number of bytes consumed by a source processor node. ``bytes-produced-total`` The total number of bytes produced by a sink processor node. ``process-rate`` The average number of records processed per second by a source node. ``process-total`` The total number of records processed by a source node. ``suppression-emit-rate`` The rate at which records that have been emitted downstream from suppression operation nodes. Compare with the ``process-rate`` metric to determine how many updates are being suppressed. ``suppression-emit-total`` The total number of records that have been emitted downstream from suppression operation nodes. Compare with the ``process-total`` metric to determine how many updates are being suppressed. ``record-e2e-latency-[avg | min | max]`` The [average | minimum | maximum] end-to-end latency of a record, measured by comparing the record timestamp with the system time when it has been fully processed by the node. ``records-consumed-total`` The total number of records consumed by a source processor node. ``records-produced-total`` The total number of records produced by a sink processor node. .. _state-store-metrics: State store metrics """"""""""""""""""" All the following metrics have a recording level of ``debug``, except for the ``record-e2e-latency-*`` metrics which have a recording level ``trace``. The ``store-scope`` value is specified in ``StoreSupplier#metricsScope()`` for the user's customized state stores; for built-in state stores, currently we have: * ``in-memory-state`` * ``in-memory-lru-state`` * ``in-memory-window-state`` * ``in-memory-suppression`` (for suppression buffers) * ``rocksdb-state`` (for RocksDB backed key-value store) * ``rocksdb-window-state`` (for RocksDB backed window store) * ``rocksdb-session-state`` (for RocksDB backed session store) Metrics ``suppression-buffer-size-avg``, ``suppression-buffer-size-max``, ``suppression-buffer-count-avg``, and ``suppression-buffer-count-max`` are only available for suppression buffers. All other metrics are not available for suppression buffers. All latency metrics are reported in nanoseconds (ns). **MBean: kafka.streams:type=stream-state-metrics,thread-id=[threadId],task-id=[taskId],[storeType]-id=[storeName]** ``[put | put-if-absent | get | delete | put-all | all | range | flush | restore]-latency-[avg | max]`` The average execution time in nanoseconds (ns), for the respective operation. ``[put | put-if-absent | get | delete | put-all | all | range | flush | restore]-rate`` The average rate of respective operations per second for this store. ``[put | put-if-absent | get | delete | put-all | all | range | flush | restore]-total`` The total number of respective operations for this store. ``get-latency-[avg | max]`` The [average | maximum] get execution time, in ns. ``delete-latency-[avg | max]`` The [average | maximum] delete execution time, in ns. ``put-all-latency-[avg | max]`` The [average | maximum] put-all execution time, in ns. ``all-latency-[avg | max]`` The [average | maximum] all operation execution time, in ns. ``range-latency-[avg | max]`` The [average | maximum] range execution time, in ns. ``flush-latency-[avg | max]`` The [average | maximum] flush execution time, in ns. ``restore-latency-[avg | max]`` The [average | maximum] restore execution time, in ns. ``put-rate`` The average put rate for this store. ``put-if-absent-rate`` The average put-if-absent rate for this store. ``get-rate`` The average get rate for this store. ``delete-rate`` The average delete rate for this store. ``put-all-rate`` The average put-all rate for this store. ``all-rate`` The average all operation rate for this store. ``range-rate`` The average range rate for this store. ``flush-rate`` The average flush rate for this store. ``restore-rate`` The average restore rate for this store. ``suppression-buffer-size-[avg | max]`` The average or maximum size of buffered data, in bytes. This helps you choose a value for ``BufferConfig.maxBytes(...)``, if desired. ``suppression-buffer-count-[avg | max]`` The average or maximum number of records in the buffer. This helps you choose a value for ``BufferConfig.maxRecords(...)``, if desired. ``record-e2e-latency-[avg | min | max]`` The [average | minimum | maximum] end-to-end latency of a record, measured by comparing the record timestamp with the system time when it has been fully processed by the node. .. _rocksdb-metrics: RocksDB metrics """"""""""""""" RocksDB metrics are grouped into statistics-based metrics and properties-based metrics. - Statistics-based metrics are recorded from statistics that a RocksDB state store collects. - Properties-based metrics are recorded from properties that RocksDB exposes. Statistics collected by RocksDB provide cumulative measurements over time, for example, bytes written to the state store. Properties exposed by RocksDB provide current measurements, for example, the amount of memory currently used. The built-in RocksDB state stores have these values for ``store-scope``: * ``rocksdb-state`` (for RocksDB-backed key-value stores) * ``rocksdb-window-state`` (for RocksDB-backed window stores) * ``rocksdb-session-state`` (for RocksDB-backed session stores) RocksDB Statistics-based Metrics **RocksDB Statistics-based Metrics:** All of the following metrics have a recording level of ``debug``, because collecting statistics in `RocksDB may have an impact on performance `__. Statistics-based metrics are collected every minute from the RocksDB state stores. If a state store consists of multiple RocksDB instances, which is the case for WindowStores and SessionStores, each metric reports an aggregation over the RocksDB instances of the state store. **MBean: kafka.streams:type=stream-state-metrics,thread-id=[threadId],task-id=[taskId],[storeType]-id=[storeName]** ``bytes-written-[rate | total]`` The [average | total] number of bytes written per second to the RocksDB state store. ``bytes-read-[rate | total]`` The [average | total] number of bytes read per second from the RocksDB state store. ``memtable-bytes-flushed-[rate | total]`` The [average | total] number of bytes flushed per second from the memtable to disk. ``memtable-hit-ratio`` The ratio of memtable hits, relative to all lookups to the memtable. ``memtable-flush-time-[avg | min | max]`` The [average | minimum | maximum] duration of memtable flushes to disk, in ms. ``block-cache-data-hit-ratio`` The ratio of block cache hits for data blocks, relative to all lookups for data blocks to the block cache. ``block-cache-index-hit-ratio`` The ratio of block cache hits for index blocks, relative to all lookups for index blocks to the block cache. ``block-cache-filter-hit-ratio`` The ratio of block cache hits for filter blocks, relative to all lookups for filter blocks to the block cache. ``write-stall-duration-[avg | total]`` The [average | total] duration of write stalls, in ms. ``bytes-read-compaction-rate`` The average number of bytes read per second during compaction. ``bytes-written-compaction-rate`` The average number of bytes written per second during compaction. ``compaction-time-[avg | min | max]`` The [average | minimum | maximum] duration of disk compactions, in ms. ``number-open-files`` The number of current open files. ``number-file-errors-total`` The total number of file errors that occurred. **RocksDB Properties-based Metrics:** All of the following properties-based metrics have a recording level of ``info`` and are recorded when the metrics are accessed. If a state store consists of multiple RocksDB instances, which is the case for WindowStores and SessionStores, each metric reports the sum over all the RocksDB instances of the state store, except for the block cache metrics, named ``block-cache-*``. The block cache metrics report the sum over all RocksDB instances if each instance uses its own block cache, and they report the recorded value from only one instance if a single block cache is shared among all instances. ``num-immutable-mem-table`` The number of immutable memtables that have not yet been flushed. ``cur-size-active-mem-table`` The approximate size of the active memtable in bytes. ``cur-size-all-mem-tables`` The approximate size of active and unflushed immutable memtables in bytes. ``size-all-mem-tables`` The approximate size of active, unflushed immutable, and pinned immutable memtables in bytes. ``num-entries-active-mem-table`` The number of entries in the active memtable. ``num-entries-imm-mem-tables`` The number of entries in the unflushed immutable memtables. ``num-deletes-active-mem-table`` The number of delete entries in the active memtable. ``num-deletes-imm-mem-tables`` The number of delete entries in the unflushed immutable memtables. ``mem-table-flush-pending`` This metric reports ``1`` if a memtable flush is pending, otherwise it reports ``0``. ``num-running-flushes`` The number of currently running flushes. ``compaction-pending`` This metric reports ``1`` if at least one compaction is pending, otherwise it reports ``0``. ``num-running-compactions`` The number of currently running compactions. ``estimate-pending-compaction-bytes`` The estimated total number of bytes a compaction needs to rewrite on disk to get all levels down to under target size (only valid for level compaction). ``total-sst-files-size`` The total size in bytes of all Sorted Sequence Table (SST) files. ``live-sst-files-size`` The total size in bytes of all Sorted Sequence Table (SST) files that belong to the latest log-structured merge (LSM) tree. ``num-live-versions`` Number of live versions of the log-structured merge (LSM) tree. ``block-cache-capacity`` The capacity of the block cache in bytes. ``block-cache-usage`` The memory size of the entries residing in block cache in bytes. ``block-cache-pinned-usage`` The memory size for the entries being pinned in the block cache in bytes. ``estimate-num-keys`` The estimated number of keys in the active and unflushed immutable memtables and storage. ``estimate-table-readers-mem`` The estimated memory in bytes used for reading Sorted Sequence Tables (SSTs), excluding memory used in block cache. ``background-errors`` The total number of background errors. .. _record-cache-metrics: Record cache metrics """""""""""""""""""" All of the following metrics have a recording level of ``debug``. **MBean: kafka.streams:type=stream-record-cache-metrics,thread-id=[threadId],task-id=[taskId],record-cache-id=[storeName]** ``hit-ratio-[avg | min | max]`` The [average | minimum | maximum] cache hit ratio defined as the ratio of cache read hits over the total cache read requests. Add your own metrics ^^^^^^^^^^^^^^^^^^^^ Application developers using the :ref:`low-level Processor API ` can add additional metrics to their application. The ``ProcessorContext#metrics()`` method provides a handle to the ``StreamMetrics`` object, which you can use to: * Add latency and throughput metrics via ``StreamMetrics#addLatencyRateTotalSensor()`` and ``StreamMetrics#addRateTotalSensor()``. * Add any other type of metric via ``StreamMetrics#addSensor()``. .. _streams_monitoring-runtime_status: Runtime status information -------------------------- Status of |kstreams| instances ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^ .. important:: Don't confuse the runtime state of a ``KafkaStreams`` instance, for example, ``created`` or ``rebalancing``, with state stores. A |kstreams| instance may be in one of several run-time states, as defined in the enum ``KafkaStreams.State``. For example, it might be created but not running; or it might be rebalancing and thus its state stores are not available for querying. Users can access the current runtime state programmatically using the method ``KafkaStreams#state()``. The documentation of ``KafkaStreams.State`` in the :ref:`Kafka Streams Javadocs ` lists all the available states. Also, you can use ``KafkaStreams#setStateListener()`` to register a ``KafkaStreams#StateListener`` method that will be triggered whenever the state changes. Use the ``KafkaStreams#localThreadsMetadata()`` method to check the runtime state of the current ``KafkaStreams`` instance. The ``localThreadsMetadata()`` method returns a ``ThreadMetadata`` object for each local stream thread. The ``ThreadMetadata`` object describes the runtime state of a thread and the metadata for the thread's currently assigned tasks. Get runtime information on |kstreams| clients """"""""""""""""""""""""""""""""""""""""""""" You can get runtime information on these local ``KafkaStreams`` clients: - :ref:`Admin client ` - :ref:`Producer clients ` - :ref:`Consumer client ` - :ref:`Restore consumer client ` There is one admin client per ``KafkaStreams`` instance, and all other clients are per ``StreamThread``. Get the names of local ``KafkaStreams`` clients by calling the client ID methods on the ``ThreadMetadata`` class, like ``producerClientIds()``. Client names are based on a client ID value, which is assigned according to the ``StreamsConfig.CLIENT_ID_CONFIG`` and ``StreamsConfig.APPLICATION_ID_CONFIG`` configuration settings. * If ``CLIENT_ID_CONFIG`` is set, Kafka Streams uses ``CLIENT_ID_CONFIG`` for the client ID value. * If ``CLIENT_ID_CONFIG`` isn't set, Kafka Streams uses ``APPLICATION_ID_CONFIG`` and appends a random unique identifier (UUID): :: clientId = StreamsConfig.APPLICATION_ID_CONFIG + "-" + Kafka Streams creates names for specific clients by appending a thread ID and a descriptive string to the main client ID. :: specificClientId = clientId + "-StreamThread-" + + For example, if ``CLIENT_ID_CONFIG`` is set to "MyClientId", the ``consumerClientId()`` method returns a value that resembles ``MyClientId-StreamThread-2-consumer``. If ``CLIENT_ID_CONFIG`` isn't set, and ``APPLICATION_ID_CONFIG`` is set to "MyApplicationId", the ``consumerClientId()`` method returns a value that resembles ``MyApplicationId-8d8ce4a7-85bb-41f7-ac9c-fe6f3cc0959e-StreamThread-2-consumer``. Call the ``threadName()`` method to get the thread ID: :: threadId = clientId + "-StreamThread-" + Depending on the configuration settings, an example thread ID resembles ``MyClientId-StreamThread-2`` or ``MyApplicationId-8d8ce4a7-85bb-41f7-ac9c-fe6f3cc0959e-StreamThread-2``. .. _kafka-admin-client-id: adminClientId() Gets the ID of the client application, which is the main client ID value, appended with ``-admin``. Depending on configuration settings, the return value resembles ``MyClientId-admin`` or ``MyApplicationId-8d8ce4a7-85bb-41f7-ac9c-fe6f3cc0959e-admin``. The admin client ID doesn't contain a thread ID. .. _kafka-producer-client-id: producerClientIds() Gets the names of producer clients. If exactly-once semantics (EOS version 1) is active, returns the list of task producer names, otherwise (EOS disabled or EOS version 2) returns the thread producer name. All producer client names are the main thread ID appended with ``-producer``. If EOS version 1 is active, a ``-`` is included. A task ID is a sub-topology ID and a partition number, ``_``. The ``subTopologyId`` is an integer greater than or equal to zero. If EOS version 1 is active, the ``producerClientIds()`` method returns a ``Set`` of client names that have different task IDs. Depending on configuration settings, the return value resembles ``MyClientId-StreamThread-2-1_4-producer``. If EOS isn't active or EOS version 2 is active, the return value is a single client name that doesn't have a task ID, for example, ``MyClientId-StreamThread-2-producer``. For more information, see :ref:`streams_architecture_tasks`. .. _kafka-consumer-client-id: consumerClientId() Gets the name of the consumer client. The consumer client name is the main thread ID appended with ``-consumer``, for example, ``MyClientId-StreamThread-2-consumer``. .. _kafka-restore-consumer-client-id: restoreConsumerClientId() Gets the name of the restore consumer client. The restore consumer client name is the main thread ID appended with ``-restore-consumer``, for example, ``MyClientId-StreamThread-2-restore-consumer`` .. _streams_monitoring_state-store_monitor_restoration: Monitor the restoration progress of fault-tolerant state stores ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^ When starting your application, any fault-tolerant state stores don't need a restoration process, because the persisted state is read from local disk. But there could be situations when a full restore from the backing changelog topic is required, for example, a failure wiped out the local state or your application runs in a stateless environment and persisted data is lost on restarts. If you have a significant amount of data in the changelog topic, the restoration process could take a non-negligible amount of time. Given that processing of new data won't start until the restoration process is completed, having a window into the progress of restoration is useful. To observe the restoration of all state stores, provide your application with an instance of the ``org.apache.kafka.streams.processor.StateRestoreListener`` interface. Set the ``org.apache.kafka.streams.processor.StateRestoreListener`` by calling the ``KafkaStreams#setGlobalStateRestoreListener`` method. The following code shows a basic implementation example that prints restoration status to the console. .. sourcecode:: java import org.apache.kafka.common.TopicPartition; import org.apache.kafka.streams.processor.StateRestoreListener; public class ConsoleGlobalRestoreListerner implements StateRestoreListener { @Override public void onRestoreStart(final TopicPartition topicPartition, final String storeName, final long startingOffset, final long endingOffset) { System.out.print("Started restoration of " + storeName + " partition " + topicPartition.partition()); System.out.println(" total records to be restored " + (endingOffset - startingOffset)); } @Override public void onBatchRestored(final TopicPartition topicPartition, final String storeName, final long batchEndOffset, final long numRestored) { System.out.println("Restored batch " + numRestored + " for " + storeName + " partition " + topicPartition.partition()); } @Override public void onRestoreEnd(final TopicPartition topicPartition, final String storeName, final long totalRestored) { System.out.println("Restoration complete for " + storeName + " partition " + topicPartition.partition()); } } .. important:: The ``StateRestoreListener`` instance is shared across all ``org.apache.kafka.streams.processor.internals.StreamThread`` instances and also used for global stores. Furthermore, all methods are assumed to be stateless. If any stateful operations are desired, then you must provide synchronization internally. Integration with |c3| --------------------- Since the 3.2 release, :ref:`Confluent Control Center ` displays the underlying :ref:`producer metrics ` and :ref:`consumer metrics ` of a |kstreams| application, which the |kstreams| API uses internally whenever data needs to be read from or written to |ak| topics. These metrics can be used, for example, to monitor the so-called "consumer lag" of an application, which indicates whether an application -- at its :ref:`current capacity and available computing resources ` -- is able to keep up with the incoming data volume. In |c3-short|, all of the running instances of a |kstreams| application appear as a single consumer group. Restore consumers of an application are displayed separately. Behind the scenes, the Streams API uses a dedicated "restore" consumer for the purposes of fault tolerance and state management. This restore consumer manually assigns and manages the topic partitions it consumes from and is not a member of the application's consumer group. As a result, the restore consumers are displayed separately from their application. .. include:: ../.hidden/docs-common/home/includes/ak-share.rst