Monitor Kafka Consumer Lag in Confluent Cloud

Monitoring consumer lag in Apache Kafka® is essential to ensure the smooth functioning of your Kafka cluster. Consumer lag refers to the delay between the production and consumption of messages in Kafka, which can have a significant impact on the overall performance of your system. You can monitor Kafka consumer lag with Confluent Cloud using the Metrics API or the Cloud Console.

Consumer lag is a combination of both offset lag and consumer latency. Use Confluent Control Center to monitor consumer latency. Use Java client metrics and the Kafka Admin API to monitor offset lag.

See also

For an example that showcases how to monitor an Kafka client application and Confluent Cloud metrics, and steps through various failure scenarios to show metrics results, see the Observability for Kafka Clients to Confluent Cloud.

Use the Metrics API to monitor Kafka Consumer Lag

Confluent recommends using the Metrics API to monitor how consumer lag changes over time. To monitor at the topic and consumer group level of detail, you can use a supported integration. To view data at the more detailed consumer and partition level, you can begin from the example query.

Note that this metric differs from the output of the kafka-consumer-groups command output in two ways:

  • Consumer groups do not update lag values during rebalancing
  • Consumer groups with no active consumers return no result

For an example of the rebalance behavior, consider a consumer group that has two active consumers that are lagging behind by three and four records each. Metrics API returns respective values of three and four for those consumers. A rebalance begins and producers send more records to the topic but those records cannot be consumed until the rebalance completes. The rebalance takes two minutes. For those two minutes, the value for lag does not change (three and four) despite 20 additional records being sent to the topic during those two minutes. When rebalance completes, Metrics API returns respective values of 13 and 14 for those consumers. Metric API does not update consumer lag values during rebalance.

Now consider an example where two consumers are unresponsive and no longer send heartbeats. Metrics API no longer returns data for this consumer group. Metric API does not expose consumer lag for topics and partitions that do not have an active consumer.

Important

Metrics API does not include consumers that use the assign() method. This is because the coordinator of a consumer group does not manage consumer assignment for consumers assigned to topics and partitions using assign().

Use the Confluent Cloud Console to Monitor Kafka Consumer Lag

  1. Select your cluster name.

  2. Choose Clients and select the Consumer Lag tab. A list of consumer groups displays.

    A screenshot of the consumer lag overview page in Confluent Cloud

    Consumer lag

    You may notice a couple of clients that have the naming convention consumer-nnnn. These client names represent the Confluent Cloud interface itself.

  3. Select a consumer group from the list to see lag details for that group.

    A screenshot of the consumer lag detail for a consumer group in Confluent Cloud

    Consumer lag detail

For more information on creating a consumer, see Quick Start for Confluent Cloud.

Note

Cloud Console uses the Metrics API to monitor consumer lag and provides output that is different than kafka-consumer-groups. For more information, see Use the Metrics API to monitor Kafka Consumer Lag.

Use Client Interceptors to monitor Kafka Consumer Latency

Consumer latency refers to the time it takes for a consumer to receive a message after it has been produced. This value can be affected by various factors, including network latency, processing time, and message size.

You can use Confluent Control Center to track consumer latency and more.

Use Java Client Metrics to monitor Kafka Offset Lag

Offset lag is the difference between the latest offset available in a Kafka topic partition and the offset that a consumer group has consumed. This value indicates how far behind the consumer group is from the latest available data.

You can monitor the records-lag-max metric from the Java consumer.

Use Kafka Admin API to monitor Offset Lag

You can monitor offsets by using the Kafka Admin API and the associated CLI, which enables accessing lag information programmatically. For more information, see AdminClient Configurations.

Prerequisites
  • Access to Confluent Cloud.
  • Java version 1.7.0_111 or greater, 1.8.0_102 or greater, and 1.9
  • A web browser
  • Confluent Platform is installed
  1. Create a client properties file to hold the Confluent Cloud configuration. In this example, it is named client_ssl.properties. This file should contain the Confluent Cloud client configurations. You can find this information in the CLI and client configuration tab of Confluent Cloud Console. Configure this example for your environment:

    ssl.endpoint.identification.algorithm=https
    sasl.mechanism=PLAIN
    sasl.jaas.config=org.apache.kafka.common.security.plain.PlainLoginModule required \
    username="<cluster-api-key>" \
    password="<cluster-api-secret>";
    security.protocol=SASL_SSL
    

    Note

    Use a Kafka cluster API key and secret for the username and password configs. Using a Confluent Cloud API key causes authentication to fail.

  2. Set the BOOTSTRAP_SERVERS variable to the Confluent Cloud cluster bootstrap URL. You can find this value by clicking Cluster settings from the Cloud Console interface.

    BOOTSTRAP_SERVERS="<bootstrap-url>"
    
  3. From the Confluent Platform installation home, list the consumer groups. Confluent Cloud properties are passed in with the --command-config argument. A bootstrap server must be provided to the script.

    ./bin/kafka-consumer-groups --bootstrap-server ${BOOTSTRAP_SERVERS} --command-config \
    client-ssl.properties --list  _confluent-healthcheck  example-group
    

    Note

    If the previous command causes a timeout error, try increasing the timeout to 10 seconds by using the --timeout option, for example, --timeout 10000.

  4. For each consumer group, check its offsets using this command. This command only shows information about consumers that use the Java consumer API (i.e., non-ZooKeeper-based consumers).

    ./bin/kafka-consumer-groups --bootstrap-server ${BOOTSTRAP_SERVERS} \
     --command-config /tmp/client.properties --describe --group _confluent-healthcheck
    

    Your output should resemble:

    TOPIC                          PARTITION  CURRENT-OFFSET  LOG-END-OFFSET  LAG        CONSUMER-ID                                       HOST                           CLIENT-ID
    _confluent-healthcheck         0          13164704        13164773        69         healthcheck-agent-bf8d1655-63a6-4061-b680-0f11cdf182e5/100.96.67.0                   healthcheck-agent
    _confluent-healthcheck         1          13161581        13161650        69         healthcheck-agent-bf8d1655-63a6-4061-b680-0f11cdf182e5/100.96.67.0                   healthcheck-agent
    _confluent-healthcheck         2          12229509        12229578        69         healthcheck-agent-bf8d1655-63a6-4061-b680-0f11cdf182e5/100.96.67.0                   healthcheck-agent
    _confluent-healthcheck         3          86              86              0          healthcheck-agent-bf8d1655-63a6-4061-b680-0f11cdf182e5/100.96.67.0                   healthcheck-agent
    ...
    

    The fourth column shows the lag, the difference between the last committed offset and the latest offset in the log.

Known kafka-consumer-groups Issues

If you encounter the following error:

Error: Executing consumer group command failed due to Failed to construct kafka consumer

or if the command fails with a TimeoutException, then the configuration of request.timeout.ms in client_ssl.properties (which defaults to 5000 in the kafka-consumer-groups command line tool) needs to be raised. For example:

request.timeout.ms=60000