.. _streams_monitoring: Monitoring your application =========================== **Table of Contents** .. contents:: :local: Metrics ------- Accessing Metrics ^^^^^^^^^^^^^^^^^ Accessing Metrics via JMX and Reporters """"""""""""""""""""""""""""""""""""""" The Kafka Streams library reports a variety of metrics through JMX. It can also be configured to report stats using additional pluggable stats reporters using the ``metrics.reporters`` configuration option. The easiest way to view the available metrics is through tools such as `JConsole `__, which allow you to browse JMX MBeans. Accessing Metrics Programmatically """""""""""""""""""""""""""""""""" The entire metrics registry of a ``KafkaStreams`` instance can be accessed read-only through the method ``KafkaStreams#metrics()``. The metrics registry will contain all the available metrics listed below. See the documentation of ``KafkaStreams`` in the :ref:`Kafka Streams Javadocs ` for details. Configuring Metrics Granularity ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^ By default Kafka Streams has metrics with two recording levels: ``debug`` and ``info``. The ``debug`` level records all metrics, while the ``info`` level records only some of them. Use the ``metrics.recording.level`` configuration option to specify which metrics you want collected, see :ref:`streams_developer-guide_optional-configs`. Built-in Metrics ^^^^^^^^^^^^^^^^ Thread Metrics """""""""""""" All the following metrics have a recording level of ``info``. **MBean: kafka.streams:type=stream-metrics,thread.client-id=[threadId]** ``[commit | poll | process | punctuate]-latency-[avg | max]`` The [average | maximum] execution time in ms, for the respective operation, across all running tasks of this thread. ``[commit | poll | process | punctuate]-rate`` The average number of respective operations per second across all tasks. ``task-created-rate`` The average number of newly created tasks per second. ``task-closed-rate`` The average number of tasks closed per second. ``skipped-records-rate`` The average number of skipped records per second. This metric helps monitor if the rate of record consumption and rate of record processing are equal or not. The difference should be negligible during normal operations. Task Metrics """""""""""" All the following metrics have a recording level of ``debug``. **MBean: kafka.streams:type=stream-task-metrics,streams-task-id=[taskId]** ``commit-latency-[avg | max]`` The [average | maximum] commit time in ns for this task. ``commit-rate`` The average number of commit calls per second. Processor Node Metrics """""""""""""""""""""" All the following metrics have a recording level of ``debug``. **MBean: kafka.streams:type=stream-processor-node-metrics, processor-node-id=[nodeId]** ``[process | punctuate | create | destroy]-latency-[avg | max]`` The [average | maximum] execution time in ns, for the respective operation. ``[process | punctuate | create | destroy]-rate`` The average number of respective operations per second. ``forward-rate`` The average rate of records being forwarded downstream, from source nodes only, per second. This metric can be used to understand how fast the library is consuming from source topics. State Store Metrics """"""""""""""""""" All the following metrics have a recording level of ``debug``. **MBean: kafka.streams:type=stream-[store-type]-metrics** ``[put | put-if-absent | get | delete | put-all | all | range | flush | restore]-latency-[avg | max]`` The average execution time in ns, for the respective operation. ``[put | put-if-absent | get | delete | put-all | all | range | flush | restore]-rate`` The average rate of respective operations per second for this store. Record Cache Metrics """""""""""""""""""" All the following metrics have a recording level of ``debug``. **MBean: kafka.streams:type=stream-record-cache-metrics, record-cache-id=[cacheId]** ``hitRatio-[avg | min | max]`` The [average | minimum | maximum] cache hit ratio defined as the ratio of cache read hits over the total cache read requests. Adding Your Own Metrics ^^^^^^^^^^^^^^^^^^^^^^^ Application developers using the :ref:`low-level Processor API ` can add additional metrics to their application. The ``ProcessorContext#metrics()`` method provides a handle to the ``StreamMetrics`` object, which you can use to: * Add latency and throughput metrics via ``StreamMetrics#addLatencyAndThroughputSensor`` and ``StreamMetrics#addThroughputSensor()``. * Add any other type of metric via ``StreamMetrics#addSensor()``. .. _streams_monitoring-runtime_status: Run-time Status Information --------------------------- Status of ``KafkaStreams`` instances ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^ .. important:: Don't confuse the run-time state of a ``KafkaStreams`` instance (e.g. created, rebalancing) with state stores! A Kafka Streams instance may be in one of several run-time states, as defined in the enum ``KafkaStreams.State``. For example, it might be created but not running; or it might be rebalancing and thus its state stores are not available for querying. Users can access the current run-time state programmatically using the method ``KafkaStreams#state()``. The documentation of ``KafkaStreams.State`` in the :ref:`Kafka Streams Javadocs ` lists all the available states. Also, you can use ``KafkaStreams#setStateListener()`` to register a ``KafkaStreams#StateListener`` method that will be triggered whenever the state changes. Integration with Confluent Control Center ----------------------------------------- In the 3.2 release, :ref:`Confluent Control Center ` will display the underlying :ref:`producer metrics ` and :ref:`consumer metrics ` of a Kafka Streams application, which the Streams API uses internally whenever data needs to be read from or written to Kafka topics. These metrics can be used, for example, to monitor the so-called "consumer lag" of an application, which indicates whether an application -- at its :ref:`current capacity and available computing resources ` -- is able to keep up with the incoming data volume. A Kafka Streams application, i.e. all its running instances, appear as a single consumer group in Control Center. .. note:: **Restore consumers of an application are displayed separately:** Behind the scenes, the Streams API uses a dedicated "restore" consumer for the purposes of fault tolerance and state management. This restore consumer manually assigns and manages the topic partitions it consumes from and is not a member of the application's consumer group. As a result, the restore consumers will be displayed separately from their application.