Important
You are viewing documentation for an older version of Confluent Platform. For the latest, click here.
Monitoring your application¶
Table of Contents
Metrics¶
Accessing Metrics¶
Accessing Metrics via JMX and Reporters¶
The Kafka Streams library reports a variety of metrics through JMX. It can also be configured to report
stats using additional pluggable stats reporters using the metrics.reporters
configuration
option. The easiest way to view the available metrics is through tools such as
JConsole, which allow you to
browse JMX MBeans.
Accessing Metrics Programmatically¶
The entire metrics registry of a KafkaStreams
instance can be accessed read-only through the method
KafkaStreams#metrics()
. The metrics registry will contain all the available metrics listed below.
See the documentation of KafkaStreams
in the Kafka Streams Javadocs for details.
Configuring Metrics Granularity¶
By default Kafka Streams has metrics with two recording levels: debug
and info
. The debug
level records all
metrics, while the info
level records only some of them. Use the metrics.recording.level
configuration option
to specify which metrics you want collected, see Optional configuration parameters.
Built-in Metrics¶
Thread Metrics¶
All the following metrics have a recording level of info
.
MBean: kafka.streams:type=stream-metrics,thread.client-id=[threadId]
[commit | poll | process | punctuate]-latency-[avg | max]
- The [average | maximum] execution time in ms, for the respective operation, across all running tasks of this thread.
[commit | poll | process | punctuate]-rate
- The average number of respective operations per second across all tasks.
task-created-rate
- The average number of newly created tasks per second.
task-closed-rate
- The average number of tasks closed per second.
skipped-records-rate
- The average number of skipped records per second. This metric helps monitor if the rate of record consumption and rate of record processing are equal or not. The difference should be negligible during normal operations.
Task Metrics¶
All the following metrics have a recording level of debug
.
MBean: kafka.streams:type=stream-task-metrics,client-id=[threadId],task-id=[taskId]
commit-latency-[avg | max]
- The [average | maximum] commit time in ns for this task.
commit-rate
- The average number of commit calls per second.
Processor Node Metrics¶
All the following metrics have a recording level of debug
.
MBean: kafka.streams:type=stream-processor-node-metrics,client-id=[threadId],task-id=[taskId],processor-node-id=[processorNodeId]
[process | punctuate | create | destroy]-latency-[avg | max]
- The [average | maximum] execution time in ns, for the respective operation.
[process | punctuate | create | destroy]-rate
- The average number of respective operations per second.
forward-rate
- The average rate of records being forwarded downstream, from source nodes only, per second. This metric can be used to understand how fast the library is consuming from source topics.
skippedDueToDeserializationError-rate
- The average number of skipped records due to deserialization error per second. This metric is only registered for source nodes in the topology that are piping deserialized messages from Kafka topics and forwarding to downstream processors.
State Store Metrics¶
All the following metrics have a recording level of debug
.
MBean: kafka.streams:type=stream-[storeType]-state-metrics,client-id=[threadId],task-id=[taskId],[storeType]-state-id=[storeName]
[put | put-if-absent | get | delete | put-all | all | range | flush | restore]-latency-[avg | max]
- The average execution time in ns, for the respective operation.
[put | put-if-absent | get | delete | put-all | all | range | flush | restore]-rate
- The average rate of respective operations per second for this store.
Record Cache Metrics¶
All the following metrics have a recording level of debug
.
MBean: kafka.streams:type=stream-record-cache-metrics,client-id=[threadId],task-id=[taskId],record-cache-id=[storeName]
hitRatio-[avg | min | max]
- The [average | minimum | maximum] cache hit ratio defined as the ratio of cache read hits over the total cache read requests.
Adding Your Own Metrics¶
Application developers using the low-level Processor API can add additional metrics to their application. The ProcessorContext#metrics()
method provides a handle to the StreamMetrics
object, which you can use to:
- Add latency and throughput metrics via
StreamMetrics#addLatencyAndThroughputSensor
andStreamMetrics#addThroughputSensor()
. - Add any other type of metric via
StreamMetrics#addSensor()
.
Run-time Status Information¶
Status of KafkaStreams
instances¶
Important
Don’t confuse the run-time state of a KafkaStreams
instance (e.g. created, rebalancing) with state stores!
A Kafka Streams instance may be in one of several run-time states, as defined in the enum KafkaStreams.State
.
For example, it might be created but not running; or it might be rebalancing and thus its state stores are not available
for querying. Users can access the current run-time state programmatically using the method KafkaStreams#state()
.
The documentation of KafkaStreams.State
in the Kafka Streams Javadocs lists all the
available states.
Also, you can use KafkaStreams#setStateListener()
to register a KafkaStreams#StateListener
method that will be
triggered whenever the state changes.
Use the KafkaStreams#localThreadsMetadata()
method to check the runtime
state of the current KafkaStreams
instance. The localThreadsMetadata()
method returns a ThreadMetadata
object for each local stream thread. The
ThreadMetadata
object describes the runtime state of a thread and the
metadata for the thread’s currently assigned tasks.
Monitoring the Restoration Progress of Fault-tolerant State Stores¶
When starting up your application any fault-tolerant state stores don’t need a restoration process as the persisted state is read from local disk. But there could be situations when a full restore from the backing changelog topic is required (e.g., a failure wiped out the local state or your application runs in a stateless environment and persisted data is lost on re-starts).
If you have a significant amount of data in the changelog topic, the restoration process could take a non-negligible amount of time. Given that processing of new data won’t start until the restoration process is completed, having a window into the progress of restoration is useful.
In order to observe the restoration of all state stores you provide your application an instance of the org.apache.kafka.streams.processor.StateRestoreListener
interface. You set the org.apache.kafka.streams.processor.StateRestoreListener
by calling the KafkaStreams#setGlobalStateRestoreListener
method.
A basic implementation example that prints restoration status to the console:
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.streams.processor.StateRestoreListener;
public class ConsoleGlobalRestoreListerner implements StateRestoreListener {
@Override
public void onRestoreStart(final TopicPartition topicPartition,
final String storeName,
final long startingOffset,
final long endingOffset) {
System.out.print("Started restoration of " + storeName + " partition " + topicPartition.partition());
System.out.println(" total records to be restored " + (endingOffset - startingOffset));
}
@Override
public void onBatchRestored(final TopicPartition topicPartition,
final String storeName,
final long batchEndOffset,
final long numRestored) {
System.out.println("Restored batch " + numRestored + " for " + storeName + " partition " + topicPartition.partition());
}
@Override
public void onRestoreEnd(final TopicPartition topicPartition,
final String storeName,
final long totalRestored) {
System.out.println("Restoration complete for " + storeName + " partition " + topicPartition.partition());
}
}
Attention
The StateRestoreListener
instance is shared across all org.apache.kafka.streams.processor.internals.StreamThread
instances and also used for global stores.
Furthermore, it is assumed all methods are stateless. If any stateful operations are desired, then the user will need to provide synchronization internally
Integration with Confluent Control Center¶
Since the 3.2 release, Confluent Control Center will display the underlying producer metrics and consumer metrics of a Kafka Streams application, which the Streams API uses internally whenever data needs to be read from or written to Kafka topics. These metrics can be used, for example, to monitor the so-called “consumer lag” of an application, which indicates whether an application – at its current capacity and available computing resources – is able to keep up with the incoming data volume.
A Kafka Streams application, i.e. all its running instances, appear as a single consumer group in Control Center.
Note
Restore consumers of an application are displayed separately: Behind the scenes, the Streams API uses a dedicated “restore” consumer for the purposes of fault tolerance and state management. This restore consumer manually assigns and manages the topic partitions it consumes from and is not a member of the application’s consumer group. As a result, the restore consumers will be displayed separately from their application.