Monitor ksqlDB Persistent Queries on Confluent Cloud

You can use the Confluent Cloud Console and Metrics API to monitor your persistent queries’ health and troubleshoot problems.

Health Indicators

You can monitor the following indicators of health:

Processing Errors

Processing errors is a metric that measures the total number of record processing errors that ksqlDB has encountered when processing continous queries. A processing error is a data source related error. For example, if a query is reading a topic that contains JSON formatted records, it might hit a record that does not contain valid JSON. When ksqlDB hits a processing error, it skips the record and logs the details of the error to the processing log.

You can use the Metrics API to query the current and historical processing error counts.

Query Restarts

Query restarts measures the total number of times a query has had to restart due to a hard failure. A hard failure indicates some problem in the environment that prevents a query from making progress. For example, it may not be able to write results to an output topic because an admin has revoked its write permissions. Repeated restarts indicates that a query is not able to make any progress.

You can use the Metrics API to query the current and historical query restart counts.

Storage Utilization

Storage utilization is a metric that measures the % of available storage that ksqlDB has used to store state for stateful queries like aggregations and joins. If this value reaches 100% (1.0) ksqlDB will not be able to continue to process records because it does not have available storage space to maintain state.

You can view the current and historical storage utilization in the Performance tab in the ksqlDB web interface. You can also query storage utilization using the Confluent Cloud Metrics API.

Important

You must monitor the storage utilization metric and ensure that you take appropriate action if utilization approaches 100%, or your cluster may experience degraded performance. For more information, see High Storage Utilization.

Consumer Group Lag

Consumer group lag measures the difference between the current processing offset and the latest offset for each source topic partition a query consumes. While this value may occasionally spike, consumer lag that’s consistently growing indicates the ksqlDB may not be able to keep up with records as they’re being appended to the topics that a query consumes from.

To monitor consumer group lag, you’ll need to get your queries’ consumer groups, and then monitor lag for those groups.

To get the consumer group lag for a query, you can use the ksqlDB EXPLAIN statement:

EXPLAIN <QUERY ID>;

Which will return a response that looks like:

{
  "queryDescription": {
    "consumerGroupId": "<consumer group id>"
  }
}

Once you have the consumer group ID, you can use the admin API or UI to Monitor Kafka Consumer Lag in Confluent Cloud.

In the following sections, we will describe how to troubleshoot when you notice that one of these indicators signals a problem.

Query Saturation

Query saturation measures the percentage of time a query-processing thread spends processing records, for example serializing/deserializing, evaluating expressions, and reading/writing state for aggregations and joins. When a query processing thread is not doing work, it’s waiting for new records to process, so if a query-processing thread is almost always busy, it likely can’t keep up with its source topic partitions. This metric returns the maximum value across all queries in a ksqlDB cluster. Check this metric when consumer group lag is growing.

You can inspect current and historical values of this metric in the Performance tab in the ksqlDB web interface. You can also use the Metrics API to query the current and historical saturation.

Troubleshooting

Processing Errors

If your cluster is repeatedly hitting record processing errors, you can inspect the processing log to get more details about the errors. From there, you should be able to debug and fix the source that is emitting bad records to the source topic that ksqlDB is processing.

Query Restarts

If your cluster is repeatedly hitting query restarts, you can see details about the underlying failures by running the ksqlDB EXPLAIN statement:

EXPLAIN <QUERY ID>;

Which will return a response that looks like:

{
  "queryDescription": {
    "queryErrors": <list of query failures>
  }
}

The returned error contains a “type” field. If the value of the “type” field is “USER”, ksqlDB has categorized the underlying failure as a condition that you must fix. If you observe repeated errors with “SYSTEM” or “UNKNOWN” types, you can contact Confluent Support team to investigate the issues.

High Storage Utilization

If your cluster is running out of storage space, you have a couple options.

The simplest option is to provision a cluster with more CSUs. Each CSU gets you 125 GB of storage space, but for 8 and 12 CSUs, ksqlDB automatically maintains replicas of your data. For more information, see CSUs and storage.

Before provisioning a cluster with more CSUs, you may want to make sure that your storage usage is not skewed such that the storage cannot be distributed across the cluster. To check this, use the Metrics API to query the storage usage broken down by task.

Each CSU adds 125GB of storage, and each task can only be run on 1 CSU. If you have a small number of tasks that store 125GB of data or more, adding CSUs won’t help, because the storage from these tasks can’t be distributed across the cluster.

If your storage usage is skewed, or you want to try to optimize your storage usage, you can find the queries that use lots of storage, and either drop or change the query or data to prevent skew or reduce footprint. Use the Metrics API to query current and historical state usage by query.

Ultimately, the feasibility of preventing skew or reducing footprint depends on your workload.

  • You may find that the source topic for a table has just one partition, and therefore a query that materializes the table runs only a single stateful task. You can mitigate this situation by repartitioning your source topic to increase the number of partitions.
  • You may find that one or a few partitions of your source topic contain all or most of its data. In this scenario, consider changing the partitioning scheme.

Growing Consumer Lag

To troubleshoot growing consumer lag, first determine whether your query is actually processing data by looking at the current committed offset. If this value is steadily increasing, records are being processed. If not, the query is probably failing and you’ll need to look at the query status page to see why.

If the query isn’t making progress, it may be bottlenecked on an external service (like Apache Kafka®), or on ksqlDB resources. If the query is bottlenecked on ksqlDB resources, you must provision another ksqlDB cluster with more CSUs.

To determine if ksqlDB is bottlenecked on limited resources, look at the query saturation metric. If this metric’s value is consistently higher than 80 percent (.80), ksqlDB is likely not able to keep up with the append rate at its source topics, and you should try adding CSUs.

Note that even if query saturation is high, it may not help to provision a cluster with more CSUs. This is because it’s possible that the load coming to your queries is skewed to a small set of tasks that cannot be distributed across more CSUs. We currently do not support querying utilization at the query or task level, so you will need to try adding CSUs for now.