Monitoring Persistent Queries

You can use the Confluent Cloud Console and Metrics API to monitor your persistent queries’ health and troubleshoot problems.

Health Indicators

You can monitor the following indicators of health:

Storage Utilization

Storage utilization is a metric that measures the % of available storage that ksqlDB has used to store state for stateful queries like aggregations and joins. If this value reaches 100% (1.0) ksqlDB will not be able to continue to process records because it does not have available storage space to maintain state.

You can view the current and historical storage utilization in the Performance tab in the ksqlDB web interface. You can also query storage utilization using the Confluent Cloud Metrics API.

Consumer Group Lag

Consumer group lag measures the difference between the current processing offset and the latest offset for each source topic partition a query consumes. While this value may occasionally spike, consumer lag that’s consistently growing indicates the ksqlDB may not be able to keep up with records as they’re being appended to the topics that a query consumes from.

To monitor consumer group lag, you’ll need to get your queries’ consumer groups, and then monitor lag for those groups.

To get the consumer group lag for a query, you can use the ksqlDB EXPLAIN statement:

EXPLAIN <QUERY ID>;

Which will return a response that looks like:

{
  "queryDescription": {
    "consumerGroupId": "<consumer group id>"
  }
}

Once you have the consumer group ID, you can use the admin API or UI to Monitor Consumer Lag.

In the following sections, we will describe how to troubleshoot when you notice that one of these indicators signals a problem.

Troubleshooting

High Storage Utilization

If your cluster is running out of storage space, you have a couple options.

The simplest option is to provision a cluster with more CSUs. Each CSU gets you 125GB of storage space. However be aware that for 8 and 12 CSUs, ksqlDB automatically maintains replicas of your data, so to expand storage past 4 CSUs you will need to expand to 12 CSUs.

Before provisioning a cluster with more CSUs, you may want to make sure that your storage usage is not skewed such that the storage cannot be distributed across the cluster. To check this, you can use the Metrics API to query the storage usage broken down by task. Each CSU adds 125GB of storage, and each task can only be run on 1 CSU. So if you have a small number of tasks that store 125GB of data or more, then adding CSUs will not help because the storage from these tasks cannot be distributed across the cluster.

In the case that your storage usage is skewed, or you want to try to optimize your storage usage, you can find the queries that use lots of storage, and either drop or change the query or data to prevent skew or reduce footprint. Use the Metrics API to query current and historical state usage by query.

Ultimately, the feasibility of preventing skew or reducing footprint will depend on your workload. That said, here are some examples of common problems: * You might find that the source topic for a table has just 1 partition, and therefore a query that materializes the table will only run a single stateful task. You can mitigate this by repartitioning your source topic to increase the number of partitions. * You might find that one or a few partitions of your source topic contain all or most of it’s data. In this scenario consider how you can change the partitioning scheme.

Growing Consumer Lag

To troubleshoot growing consumer lag, first determine whether your query is actually processing data by looking at the current committed offset. If this value is steadily increasing, records are being processed. If not, the query is probably failing and you’ll need to look at the query status page to see why.

If the query is making progress, it may be bottlenecked on an external service (like Apache Kafka®), or on ksqlDB resources. If the query is bottlenecked on ksqlDB resources, you’ll need to provision another ksqlDB cluster with more CSUs.

To tell if ksqlDB is bottlenecked on limited resources, look at a metric called query saturation. Query saturation measures the percentage of time a query processing thread spends processing records (e.g. serializing/deserializing, evaluating expressions, reading/writing state for aggregations and joins). When a query processing thread is not doing work it’s waiting for new records to process. So if a query processing thread is almost always busy it likely cannot keep up with its source topic partitions. If this metric’s value is consistently higher than 80% (.80), then ksqlDB is likely not able to keep up with the append rate at its source topics and you should try adding CSUs.

You can look at current and historical values of this metric in the Performance tab in the ksqlDB web interface. You can also use the Metrics API to query the current and historical saturation

Note that even if query saturation is high, it may not help to provision a cluster with more CSUs. This is because it’s possible that the load coming to your queries is skewed to a small set of tasks that cannot be distributed across more CSUs. We currently do not support querying utilization at the query or task level, so you will need to try adding CSUs for now.