Confluent Cloud Metrics

The Confluent Cloud Metrics API provides a comprehensive way to monitor and analyze the health and performance of your data streaming workloads. Use it to query metrics related to data streaming, connectors, governance, and Apache Flink® stream processing.

This guide shows you how to use the Metrics API to:

  • Discover resources and metrics: Programmatically find available entities and the metrics they expose.
  • Run example queries: Get started with queries for common monitoring use cases.
  • Integrate with third-party tools: Connect with Datadog, Dynatrace, Grafana, and Prometheus.
  • Monitor client-side metrics: Understand how Kafka brokers collect client metrics through KIP-714 for centralized observability.

Metrics quick start

The Metrics quick start is meant to be instructional and to help you get started with using the metrics that Confluent Cloud provides. The Metrics API supports a diverse set of querying patterns to provide usage and performance analysis over time. For more information on the Confluent Cloud Metrics API, see the API Reference.

Considerations
  • You must use a API Key resource-scoped for resource management to communicate with the Metrics API
  • API Keys resource-scoped for a Kafka cluster cause an authentication error
  • Metrics in Confluent Cloud are available either through first-class integrations with third-party monitoring providers or by directly querying the Confluent Cloud Metrics API
  • Users who would like to monitor Confluent Cloud are encouraged to use an integration to reduce the operational burden of monitoring
Prerequisites

Create a API key to authenticate to the Metrics API. For example:

confluent login
confluent environment use env-abc123
confluent kafka cluster use lkc-YYYYY
confluent api-key create --resource cloud

See also

For an example that showcases how to monitor an Kafka client application and Confluent Cloud metrics, and steps through various failure scenarios to show metrics results, see the Observability for Kafka Clients to Confluent Cloud.

Add the MetricsViewer role to a new service account

The MetricsViewer role provides service account access to the Metrics API for all clusters in an organization. This role also enables service accounts to import metrics into third-party metrics platforms.

To assign the MetricsViewer role to a new service account:

Run the following commands to add a role binding for MetricsViewer to a new service account. Remember to log in with the confluent login command first.

  1. Create the service account:

    confluent iam service-account create MetricsImporter --description "A test service account to import Confluent Cloud metrics into our monitoring system"
    

    Your output should resemble:

    +-------------+--------------------------------+
    | ID          | sa-123abc                      |
    | Name        | MetricsImporter                |
    | Description | A test service account to      |
    |             | import Confluent Cloud metrics |
    |             | into our monitoring system     |
    +-------------+--------------------------------+
    
  2. Make note of the ID field.

  3. Add the MetricsViewer role binding to the service account:

    confluent iam rbac role-binding create --role MetricsViewer --principal User:sa-123abc
    

    Your output should resemble:

    +-----------+----------------+
    | Principal | User:sa-123abc |
    | Role      | MetricsViewer  |
    +-----------+----------------+
    
  4. List the role bindings to confirm that the MetricViewer role was created:

    confluent iam rbac role-binding list --principal User:sa-123abc
    

    Your output should resemble:

        Principal    | Email |     Role      | Environment | ...
    -----------------+-------+---------------+-------------+----
      User:sa-123abc |       | MetricsViewer |             |
    
  5. List the existing service accounts:

    confluent iam service-account list
    

    Your output should resemble:

         ID     |              Name              |           Description
    ------------+--------------------------------+-----------------------------------
     sa-1a2b3c | test-account                   | for testing
     sa-112233 | ProactiveSupport.1614189731753 | SA for Proactive Support
     sa-aabbcc | KSQL.lksqlc-ab123              | SA for KSQL w/ ID lksqlc-ab123
               |                                | and Name ksqlDB_app_0
     ...
    
  6. Create an API key and add it to the new service account:

    confluent api-key create --resource cloud --service-account sa-123abc
    

    Your output should resemble:

    It may take a couple of minutes for the API key to be ready.
    Save the API key and secret. The secret is not retrievable later.
    +---------+------------------------------------------------------------------+
    | API Key | 1234567ABCDEFGHI                                                 |
    | Secret  | ABCDEF123456.................................................... |
    +---------+------------------------------------------------------------------+
    
  7. Save the API key and secret in a secure location.

Integrate with third-party monitoring tools

Integrating directly with a third-party monitoring tool allows you to monitor Confluent Cloud alongside the rest of your applications.

Datadog

Datadog provides an integration where users can input a Confluent Cloud API key (resource-scoped for resource management) into the Datadog UI, select resources to monitor, and see metrics in minutes using an out-of-the-box dashboard. If you use Datadog, create your Confluent Cloud API key and follow the instructions from Datadog to get started. After configuring the integration, search the Datadog dashboards for “Confluent Cloud Overview,” the default Confluent Cloud dashboard at Datadog. Clone the default dashboard so that you can edit it to suit your needs.

Dynatrace

Dynatrace provides an extension where users can input a Confluent Cloud API key (resource-scoped for resource management) into the Dynatrace Monitoring Configuration, select resources to monitor, and see metrics in minutes in a prebuilt dashboard. If you use Dynatrace, create your Confluent Cloud API key (resource-scoped for resource management) and follow the instructions to get started.

Grafana Cloud

Grafana Labs provides an integration where users can input a Confluent Cloud API key (resource-scoped for resource management) into the Grafana Cloud UI, select resources to monitor, and see metrics in minutes using an out-of-the-box-dashboard. If you use Grafana Cloud, create your Confluent Cloud API key (resource-scoped for resource management) and follow the instructions to get started.

Prometheus

Prometheus servers can scrape the Confluent Cloud Metrics API directly by making use of the export endpoint. This endpoint returns the single most recent data point for each metric, for each distinct combination of labels in the Prometheus exposition or Open Metrics format. For more information, see Export metric values.

New Relic OpenTelemetry

You can collect metrics about your Confluent Cloud-managed Kafka deployment with the New Relic OpenTelemetry collector. The collector is a component of OpenTelemetry that collects, processes, and exports telemetry data to New Relic, or any observability back-end. For more information, see Monitoring Confluent Cloud Kafka with OpenTelemetry Collector.

Discover resources and metrics with the Metrics API

The following examples use HTTPie and cURL. HTTPie can be installed using most common software package managers by following the documentation. cURL is a standard component of most operating systems, but if you don’t have cURL, you can install it by following the documentation.

The Confluent Cloud Metrics API provides endpoints for programmatic discovery of available resources and their metrics. This resource and metric metadata is represented by descriptor objects.

The discovery endpoints can be used to avoid hardcoding metric and resource names into client scripts.

Discover available resources

A resource represents the entity against which metrics are collected, for example, a Kafka cluster, a Kafka Connector, a ksqlDB application, etc.

Get a description of the available resources by sending a GET request to the descriptors/resources endpoint of the API:

http 'https://api.telemetry.confluent.cloud/v2/metrics/cloud/descriptors/resources' --auth '<API_KEY>:<SECRET>'

This returns a JSON document describing the available resources to query and their labels.

Discover available metrics

Get a description of the available metrics by sending a GET request to the descriptors/metrics endpoint of the API:

http 'https://api.telemetry.confluent.cloud/v2/metrics/cloud/descriptors/metrics?resource_type=kafka' --auth '<API_KEY>:<SECRET>'

Note

The resource_type query parameter is required to specify the type of resource for which to list metrics. The valid resource types can be determined using the /descriptors/resources endpoint.

This returns a JSON document describing the available metrics to query and their labels.

A human-readable list of the current metrics is available in the API Reference.

KIP-714 Client Metrics

With the implementation of KIP-714, Kafka clients can now push selected metrics (for example, connection counts, latency, production/consumption rates) directly to Kafka brokers. This enhancement improves observability by allowing cluster operators to collect client metrics from brokers using OpenTelemetry, simplifying the monitoring of client behavior.

Monitor client metrics collected by Kafka brokers

The following client metrics are pushed to Kafka brokers and available for monitoring based on KIP-714. Metrics are categorized by client type and functional area for easier reference.

Note

Optional client metrics can vary by client version and configuration. For the authoritative and up-to-date list, see KIP-714.

Producer Metrics

Connection Metrics (Required)
Metric Name Description
org.apache.kafka.producer.connection.creation.total Total number of connections created.
Request Latency Metrics (Required)
Metric Name Description
org.apache.kafka.producer.node.request.latency.avg Average request latency per node
org.apache.kafka.producer.node.request.latency.max Maximum request latency per node.
Record Processing Metrics (Optional)
Metric Name Description
org.apache.kafka.producer.record.send.total Total number of records sent.
org.apache.kafka.producer.record.error.total Total number of record send errors.
org.apache.kafka.producer.record.retry.total Total number of retried record sends.
Throttling Metrics (Optional)
Metric Name Description
org.apache.kafka.producer.produce.throttle.time.avg Average time spent throttled during produce operations.
org.apache.kafka.producer.produce.throttle.time.max Maximum time spent throttled during produce operations.

Consumer Metrics

Connection Metrics (Required)
Metric Name Description
org.apache.kafka.consumer.connection.creation.total Total number of connections created.
Request Latency Metrics (Required)
Metric Name Description
org.apache.kafka.consumer.node.request.latency.avg Average request latency per node.
org.apache.kafka.consumer.node.request.latency.max Maximum request latency per node.
Coordinator Metrics (Optional)
Metric Name Description
org.apache.kafka.consumer.coordinator.commit.latency.avg Average offset commit latency.
org.apache.kafka.consumer.coordinator.commit.latency.max Maximum offset commit latency.
Fetch Metrics (Optional)
Metric Name Description
org.apache.kafka.consumer.fetch.latency.avg Average fetch latency.
org.apache.kafka.consumer.fetch.latency.max Maximum fetch latency.
Throttling Metrics (Optional)
Metric Name Description
org.apache.kafka.consumer.fetch.throttle.time.avg Average time spent throttled during fetch operations.
org.apache.kafka.consumer.fetch.throttle.time.max Maximum time spent throttled during fetch operations.

For the complete specification and additional implementation details, refer to KIP-714: Client metrics and observability.

Run example queries

The Confluent Cloud Metrics API has an expressive query language that allows users to flexibly filter and group timeseries data. Example queries are provided as a template. Additional examples can be found within the Cloud Console which also uses the Confluent Cloud Metrics API.

Timestamps in metrics queries use UTC (Coordinated Universal Time) time. Use either UTC or an offset appropriate for your location.

Query for bytes produced to the cluster per minute grouped by topic

This query measures bytes produced (ingress). If you want to query bytes consumed (egress), see Query for bytes consumed from the cluster per minute grouped by topic. Note that if you are using Cluster Linking, the received_bytes does not include the mirror-in bytes to the cluster. You can use the cluster_link_destination_response_bytes metrics to query the mirror-in bytes instead.

  1. Create a file named received_bytes_query.json using the following template. Be sure to change lkc-XXXXX and the timestamp values to match your needs.

    {
      "aggregations": [
        {
          "metric": "io.confluent.kafka.server/received_bytes"
        }
      ],
      "filter": {
        "field": "resource.kafka.id",
        "op": "EQ",
        "value": "lkc-XXXXX"
      },
      "granularity": "PT1M",
      "group_by": [
        "metric.topic"
      ],
      "intervals": [
        "2019-12-19T11:00:00-05:00/2019-12-19T11:05:00-05:00"
      ],
      "limit": 25
    }
    
  2. Submit the query as a POST using the following command. Be sure to change API_KEY and SECRET to match your environments.

    http 'https://api.telemetry.confluent.cloud/v2/metrics/cloud/query' --auth '<API_KEY>:<SECRET>' < received_bytes_query.json
    

    Your output should resemble:

    {
      "data": [
        {
          "timestamp": "2019-12-19T16:00:00Z",
          "metric.topic": "test-topic",
          "value": 72.0
        },
        {
          "timestamp": "2019-12-19T16:01:00Z",
          "metric.topic": "test-topic",
          "value": 139.0
        },
        {
          "timestamp": "2019-12-19T16:02:00Z",
          "metric.topic": "test-topic",
          "value": 232.0
        },
        {
          "timestamp": "2019-12-19T16:03:00Z",
          "metric.topic": "test-topic",
          "value": 0.0
        },
        {
          "timestamp": "2019-12-19T16:04:00Z",
          "metric.topic": "test-topic",
          "value": 0.0
        }
      ]
    }
    

Query for bytes consumed from the cluster per minute grouped by topic

This query measures bytes consumed (egress). If you want to query bytes produced (ingress), see Query for bytes produced to the cluster per minute grouped by topic. Note that if you are using Cluster Linking, the sent_bytes metrics also includes the mirror-out bytes from the cluster.

  1. Create a file named sent_bytes_query.json using the following template. Be sure to change lkc-XXXXX and the timestamp values to match your needs.

    {
      "aggregations": [
        {
          "metric": "io.confluent.kafka.server/sent_bytes"
        }
      ],
      "filter": {
        "field": "resource.kafka.id",
        "op": "EQ",
        "value": "lkc-XXXXX"
      },
      "granularity": "PT1M",
      "group_by": [
        "metric.topic"
      ],
      "intervals": [
        "2019-12-19T11:00:00-05:00/2019-12-19T11:05:00-05:00"
      ],
      "limit": 25
    }
    
  2. Submit the query as a POST using the following command. Be sure to change API_KEY and SECRET to match your environments.

    http 'https://api.telemetry.confluent.cloud/v2/metrics/cloud/query' --auth '<API_KEY>:<SECRET>' < sent_bytes_query.json
    

    Your output should resemble:

    {
      "data": [
        {
          "timestamp": "2019-12-19T16:01:00Z",
          "metric.topic": "test-topic",
          "value": 0.0
        },
        {
          "timestamp": "2019-12-19T16:02:00Z",
          "metric.topic": "test-topic",
          "value": 157.0
        },
        {
          "timestamp": "2019-12-19T16:03:00Z",
          "metric.topic": "test-topic",
          "value": 371.0
        },
        {
          "timestamp": "2019-12-19T16:04:00Z",
          "metric.topic": "test-topic",
          "value": 0.0
        }
      ]
    }
    

    Note

    If you haven’t produced data during the time window, the dataset is empty for a given topic. For more details on sent_bytes and received_bytes in Cluster Linking, please refer to Cluster Linking Performance Limits

Query for max retained bytes per hour over 2 hours for a cluster lkc-XXXXX

  1. Create a file named cluster_retained_bytes_query.json using the following template. Be sure to change lkc-XXXXX and the timestamp values to match your needs:

    {
      "aggregations": [
        {
          "metric": "io.confluent.kafka.server/retained_bytes"
        }
      ],
      "filter": {
        "field": "resource.kafka.id",
        "op": "EQ",
        "value": "lkc-XXXXX"
      },
      "granularity": "PT1H",
      "intervals": [
        "2019-12-19T11:00:00-05:00/P0Y0M0DT2H0M0S"
      ],
      "limit": 5
    }
    
  2. Submit the query as a POST using the following command. Be sure to change API_KEY and SECRET to match your environments.

    http 'https://api.telemetry.confluent.cloud/v2/metrics/cloud/query' --auth '<API_KEY>:<SECRET>' < cluster_retained_bytes_query.json
    

    Your output should resemble:

    {
      "data": [
        {
          "timestamp": "2019-12-19T16:00:00Z",
          "value": 507350.0
        },
        {
          "timestamp": "2019-12-19T17:00:00Z",
          "value": 507350.0
        }
      ]
    }
    

Query for average consumer lag over the last hour grouped by topic and consumer group

  1. Create a file named consumer_lag_max_hour.json using the following template. Be sure to change lkc-XXXXX and note the interval is for the last hour with a 1-minute granularity.

    {
      "aggregations": [
        {
          "metric": "io.confluent.kafka.server/consumer_lag_offsets"
        }
      ],
      "filter": {
        "field": "resource.kafka.id",
        "op": "EQ",
        "value": "lkc-XXXXX"
      },
      "granularity": "PT1M",
      "group_by": [
        "metric.consumer_group_id",
        "metric.topic"
      ],
      "intervals": [
        "PT1H/now"
      ],
      "limit": 25
    }
    
  2. Submit the query as a POST using the following command. Be sure to change API_KEY and SECRET to match your environments.

    http 'https://api.telemetry.confluent.cloud/v2/metrics/cloud/query' --auth '<API_KEY>:<SECRET>' < consumer_lag_max_hour.json
    

    Your output should resemble:

    {
      "data": [
        {
          "metric.consumer_group_id": "group_1",
          "metric.topic": "test_topic_1",
          "timestamp": "2022-03-23T21:00:00Z",
          "value": 0.0
        },
        {
          "metric.consumer_group_id": "group_2",
          "metric.topic": "test_topic_2",
          "timestamp": "2022-03-23T21:00:00Z",
          "value": 6.0
        }
      ]
    }
    

Query for the number of streaming units used per hour for ksqlDB cluster lksqlc-XXXXX

  1. Create a file named ksql_streaming_unit_count.json using the following template. Be sure to change lksqlc-XXXXX and the timestamp values to match your needs.

    {
      "aggregations": [
        {
          "metric": "io.confluent.kafka.ksql/streaming_unit_count"
        }
      ],
      "filter": {
        "field": "resource.ksql.id",
        "op": "EQ",
        "value": "lksqlc-XXXXX"
      },
      "granularity": "PT1H",
      "intervals": [
        "2021-02-24T10:00:00Z/2021-02-24T11:00:00Z"
      ],
      "group_by": [
        "resource.ksql.id"
      ]
    }
    
  2. Submit the query as a POST using the following command. Be sure to change API_KEY and SECRET to match your environments.

    http 'https://api.telemetry.confluent.cloud/v2/metrics/cloud/query' --auth '<API_KEY>:<SECRET>' < ksql_streaming_unit_count.json
    

    Your output should resemble:

    {
      "data": [
        {
          "resource.ksql.id": "lksqlc-XXXXX",
          "timestamp": "2021-02-24T10:00:00Z",
          "value": 4.0
        }
      ]
    }
    

Query for the max % of storage used over all CSUs for a ksqlDB cluster lksqlc-XXXXX

  1. Create a file named ksql_storage_utilization.json using the following template. Be sure to change lksqlc-XXXXX and the timestamp values to match your needs.

    {
      "aggregations": [
        {
          "metric": "io.confluent.kafka.ksql/storage_utilization"
        }
      ],
      "filter": {
        "field": "resource.ksql.id",
        "op": "EQ",
        "value": "lksqlc-xxxxx"
      },
      "granularity": "PT1M",
      "intervals": [
        "2021-02-24T10:00:00Z/2021-02-24T11:00:00Z"
      ]
    }
    
  2. Submit the query as a POST using the following command. Be sure to change API_KEY and SECRET to match your environments.

    http 'https://api.telemetry.confluent.cloud/v2/metrics/cloud/query' --auth '<API_KEY>:<SECRET>' < ksql_storage_utilization.json
    

    Your output should resemble:

    {
      "data": [
        {
          "resource.ksql.id": "lksqlc-XXXXX",
          "timestamp": "2021-02-24T10:00:00Z",
          "value": 0.85
        }
      ]
    }
    

Query for the bytes of ksqlDB storage used by a query on ksqlDB cluster lksqlc-XXXXX

  1. Create a file named ksql_query_storage.json using the following template. Be sure to change lksqlc-XXXXX and the timestamp values to match your needs.

    {
      "aggregations": [
        {
          "metric": "io.confluent.kafka.ksql/task_stored_bytes"
        }
      ],
      "filter": {
        "field": "resource.ksql.id",
        "op": "EQ",
        "value": "lksqlc-xxxxx"
      },
      "granularity": "PT1M",
      "group_by": [
        "metric.query_id"
      ],
      "intervals": [
        "2021-02-24T10:00:00Z/2021-02-24T11:00:00Z"
      ]
    }
    
  2. Submit the query as a POST using the following command. Be sure to changed API_KEY and SECRET to match your environments.

    http 'https://api.telemetry.confluent.cloud/v2/metrics/cloud/query' --auth '<API_KEY>:<SECRET>' < ksql_query_storage.json
    

    Your output should resemble:

    {
      "data": [
        {
          "resource.ksql.id": "lksqlc-XXXXX",
          "metric.query_id": "CTAS_PAGEVIEWS_2",
          "timestamp": "2021-02-24T10:00:00Z",
          "value": 7688174488
        }
      ]
    }
    

Query for the bytes of storage used by a task on ksqlDB cluster lksqlc-XXXXX

  1. Create a file named ksql_task_storage.json using the following template. Be sure to change lksqlc-XXXXX and the timestamp values to match your needs.

    {
      "aggregations": [
        {
          "metric": "io.confluent.kafka.ksql/task_stored_bytes"
        }
      ],
      "filter": {
        "field": "resource.ksql.id",
        "op": "EQ",
        "value": "lksqlc-xxxxx"
      },
      "granularity": "PT1M",
      "group_by": [
        "metric.query_id",
        "metric.task_id"
      ],
      "intervals": [
        "2021-02-24T10:00:00Z/2021-02-24T11:00:00Z"
      ]
    }
    
  2. Submit the query as a POST using the following command. Be sure to change API_KEY and SECRET to match your environments.

    http 'https://api.telemetry.confluent.cloud/v2/metrics/cloud/query' --auth '<API_KEY>:<SECRET>' < ksql_task_storage.json
    

    Your output should resemble:

    {
      "data": [
        {
          "resource.ksql.id": "lksqlc-XXXXX",
          "metric.task_id": "1_1",
          "metric.query_id": "CTAS_PAGEVIEWS_2",
          "timestamp": "2021-02-24T10:00:00Z",
          "value": 1079295760
        }
      ]
    }
    

Query for the query saturation on ksqlDB cluster lksqlc-XXXXX

  1. Create a file named ksql_query_saturation.json using the following template. Be sure to change lksqlc-XXXXX and the timestamp values to match your needs.

    {
      "aggregations": [
        {
          "metric": "io.confluent.kafka.ksql/query_saturation"
        }
      ],
      "filter": {
        "field": "resource.ksql.id",
        "op": "EQ",
        "value": "lksqlc-xxxxx"
      },
      "granularity": "PT1M",
      "intervals": [
        "2021-02-24T10:00:00Z/2021-02-24T11:00:00Z"
      ]
    }
    
  2. Submit the query as a POST using the following command. Be sure to change API_KEY and SECRET to match your environments.

    http 'https://api.telemetry.confluent.cloud/v2/metrics/cloud/query' --auth '<API_KEY>:<SECRET>' < ksql_query_saturation.json
    

    Your output should resemble:

    {
      "data": [
        {
          "resource.ksql.id": "lksqlc-XXXXX",
          "timestamp": "2021-02-24T10:00:00Z",
          "value": 0.85
        }
      ]
    }
    

Query for the total bytes consumed by ksqlDB cluster lksqlc-XXXXX

  1. Create a file named ksql_bytes_consumed.json using the following template. Be sure to change lksqlc-XXXXX and the timestamp values to match your needs.

    {
      "aggregations": [
        {
          "metric": "io.confluent.kafka.ksql/consumed_total_bytes"
        }
      ],
      "filter": {
        "field": "resource.ksql.id",
        "op": "EQ",
        "value": "lksqlc-xxxxx"
      },
      "granularity": "PT1M",
      "intervals": [
        "2021-02-24T10:00:00Z/2021-02-24T11:00:00Z"
      ]
    }
    
  2. Submit the query as a POST using the following command. Be sure to change API_KEY and SECRET to match your environments.

    http 'https://api.telemetry.confluent.cloud/v2/metrics/cloud/query' --auth '<API_KEY>:<SECRET>' < ksql_bytes_consumed.json
    

    Your output should resemble:

    {
      "data": [
        {
          "resource.ksql.id": "lksqlc-XXXXX",
          "timestamp": "2021-02-24T10:00:00Z",
          "value": 1024
        }
      ]
    }
    

Query for the total bytes produced by ksqlDB cluster lksqlc-XXXXX

  1. Create a file named ksql_bytes_produced.json using the following template. Be sure to change lksqlc-XXXXX and the timestamp values to match your needs.

    {
      "aggregations": [
        {
          "metric": "io.confluent.kafka.ksql/produced_total_bytes"
        }
      ],
      "filter": {
        "field": "resource.ksql.id",
        "op": "EQ",
        "value": "lksqlc-xxxxx"
      },
      "granularity": "PT1M",
      "intervals": [
        "2021-02-24T10:00:00Z/2021-02-24T11:00:00Z"
      ]
    }
    
  2. Submit the query as a POST using the following command. Be sure to change API_KEY and SECRET to match your environments.

    http 'https://api.telemetry.confluent.cloud/v2/metrics/cloud/query' --auth '<API_KEY>:<SECRET>' < ksql_bytes_produced.json
    

    Your output should resemble:

    {
      "data": [
        {
          "resource.ksql.id": "lksqlc-XXXXX",
          "timestamp": "2021-02-24T10:00:00Z",
          "value": 1024
        }
      ]
    }
    

Query for the total topic offsets processed by task on ksqlDB cluster lksqlc-XXXXX

  1. Create a file named ksql_offsets_processed.json using the following template. Be sure to change lksqlc-XXXXX and the timestamp values to match your needs.

    {
      "aggregations": [
        {
          "metric": "io.confluent.kafka.ksql/offsets_processed_total"
        }
      ],
      "filter": {
        "field": "resource.ksql.id",
        "op": "EQ",
        "value": "lksqlc-xxxxx"
      },
      "granularity": "PT1M",
      "group_by": [
          "metric.query_id",
          "metric.tasK_id"
      ],
      "intervals": [
        "2021-02-24T10:00:00Z/2021-02-24T11:00:00Z"
      ]
    }
    
  2. Submit the query as a POST using the following command. Be sure to change API_KEY and SECRET to match your environments.

    http 'https://api.telemetry.confluent.cloud/v2/metrics/cloud/query' --auth '<API_KEY>:<SECRET>' < ksql_offsets_processed.json
    

    Your output should resemble:

    {
      "data": [
        {
          "resource.ksql.id": "lksqlc-XXXXX",
          "timestamp": "2021-02-24T10:00:00Z",
          "metric.task_id": "1_1",
          "metric.query_id": "CTAS_PAGEVIEWS_2",
          "value": 123
        }
      ]
    }
    

Query for the total topic offsets processed by all tasks of query on ksqlDB cluster lksqlc-XXXXX

  1. Create a file named ksql_offsets_processed.json using the following template. Be sure to change lksqlc-XXXXX and the timestamp values to match your needs.

    {
      "aggregations": [
        {
          "metric": "io.confluent.kafka.ksql/offsets_processed_total"
        }
      ],
      "filter": {
        "field": "resource.ksql.id",
        "op": "EQ",
        "value": "lksqlc-xxxxx"
      },
      "granularity": "PT1M",
      "group_by": [
          "metric.query_id"
      ],
      "intervals": [
        "2021-02-24T10:00:00Z/2021-02-24T11:00:00Z"
      ]
    }
    
  2. Submit the query as a POST using the following command. Be sure to change API_KEY and SECRET to match your environments.

    http 'https://api.telemetry.confluent.cloud/v2/metrics/cloud/query' --auth '<API_KEY>:<SECRET>' < ksql_offsets_processed.json
    

    Your output should resemble:

    {
      "data": [
        {
          "resource.ksql.id": "lksqlc-XXXXX",
          "timestamp": "2021-02-24T10:00:00Z",
          "metric.query_id": "CTAS_PAGEVIEWS_2",
          "value": 123
        }
      ]
    }
    

Query for the current committed offset lag by task on ksqlDB cluster lksqlc-XXXXX

  1. Create a file named ksql_offset_lag.json using the following template. Be sure to change lksqlc-XXXXX and the timestamp values to match your needs.

    {
      "aggregations": [
        {
          "metric": "io.confluent.kafka.ksql/committed_offset_lag"
        }
      ],
      "filter": {
        "field": "resource.ksql.id",
        "op": "EQ",
        "value": "lksqlc-xxxxx"
      },
      "granularity": "PT1M",
      "group_by": [
          "metric.query_id",
          "metric.tasK_id"
      ],
      "intervals": [
        "2021-02-24T10:00:00Z/2021-02-24T11:00:00Z"
      ]
    }
    
  2. Submit the query as a POST using the following command. Be sure to change API_KEY and SECRET to match your environments.

    http 'https://api.telemetry.confluent.cloud/v2/metrics/cloud/query' --auth '<API_KEY>:<SECRET>' < ksql_offset_lag.json
    

    Your output should resemble:

    {
      "data": [
        {
          "resource.ksql.id": "lksqlc-XXXXX",
          "timestamp": "2021-02-24T10:00:00Z",
          "metric.task_id": "1_1",
          "metric.query_id": "CTAS_PAGEVIEWS_2",
          "value": 456
        }
      ]
    }
    

Query for the current total committed offset lag for all tasks in query on ksqlDB cluster lksqlc-XXXXX

  1. Create a file named ksql_offset_lag.json using the following template. Be sure to change lksqlc-XXXXX and the timestamp values to match your needs.

    {
      "aggregations": [
        {
          "metric": "io.confluent.kafka.ksql/committed_offset_lag"
        }
      ],
      "filter": {
        "field": "resource.ksql.id",
        "op": "EQ",
        "value": "lksqlc-xxxxx"
      },
      "granularity": "PT1M",
      "group_by": [
          "metric.query_id"
      ],
      "intervals": [
        "2021-02-24T10:00:00Z/2021-02-24T11:00:00Z"
      ]
    }
    
  2. Submit the query as a POST using the following command. Be sure to change API_KEY and SECRET to match your environments.

    http 'https://api.telemetry.confluent.cloud/v2/metrics/cloud/query' --auth '<API_KEY>:<SECRET>' < ksql_offset_lag.json
    

    Your output should resemble:

    {
      "data": [
        {
          "resource.ksql.id": "lksqlc-XXXXX",
          "timestamp": "2021-02-24T10:00:00Z",
          "metric.query_id": "CTAS_PAGEVIEWS_2",
          "value": 456
        }
      ]
    }
    

Query for the total number of processing errors by query on ksqlDB cluster lksqlc-XXXXX

  1. Create a file named ksql_processing_errors.json using the following template. Be sure to change lksqlc-XXXXX and the timestamp values to match your needs.

    {
      "aggregations": [
        {
          "metric": "io.confluent.kafka.ksql/processing_errors_total"
        }
      ],
      "filter": {
        "field": "resource.ksql.id",
        "op": "EQ",
        "value": "lksqlc-xxxxx"
      },
      "granularity": "PT1M",
      "group_by": [
          "metric.query_id"
      ],
      "intervals": [
        "2021-02-24T10:00:00Z/2021-02-24T11:00:00Z"
      ]
    }
    
  2. Submit the query as a POST using the following command. Be sure to change API_KEY and SECRET to match your environments.

    http 'https://api.telemetry.confluent.cloud/v2/metrics/cloud/query' --auth '<API_KEY>:<SECRET>' < ksql_processing_errors.json
    

    Your output should resemble:

    {
      "data": [
        {
          "resource.ksql.id": "lksqlc-XXXXX",
          "timestamp": "2021-02-24T10:00:00Z",
          "metric.query_id": "CTAS_PAGEVIEWS_2",
          "value": 16
        }
      ]
    }
    

Query for the total number of restarts due to failure by query on ksqlDB cluster lksqlc-XXXXX

  1. Create a file named ksql_query_restarts.json using the following template. Be sure to change lksqlc-XXXXX and the timestamp values to match your needs.

    {
      "aggregations": [
        {
          "metric": "io.confluent.kafka.ksql/query_restarts"
        }
      ],
      "filter": {
        "field": "resource.ksql.id",
        "op": "EQ",
        "value": "lksqlc-xxxxx"
      },
      "granularity": "PT1M",
      "group_by": [
          "metric.query_id"
      ],
      "intervals": [
        "2021-02-24T10:00:00Z/2021-02-24T11:00:00Z"
      ]
    }
    
  2. Submit the query as a POST using the following command. Be sure to change API_KEY and SECRET to match your environments.

    http 'https://api.telemetry.confluent.cloud/v2/metrics/cloud/query' --auth '<API_KEY>:<SECRET>' < ksql_query_restarts.json
    

    Your output should resemble:

    {
      "data": [
        {
          "resource.ksql.id": "lksqlc-XXXXX",
          "timestamp": "2021-02-24T10:00:00Z",
          "metric.query_id": "CTAS_PAGEVIEWS_2",
          "value": 3
        }
      ]
    }
    

Query for the number of schemas in the Schema Registry cluster lsrc-XXXXX

  1. Create a file named schema_count.json using the following template. Be sure to change lsrc-XXXXX and the timestamp values to match your needs.

    {
      "aggregations": [
        {
          "time_agg": "MAX",
          "agg": "SUM",
          "metric": "io.confluent.kafka.schema_registry/schema_count"
        }
      ],
      "filter": {
        "field": "resource.schema_registry.id",
        "op": "EQ",
        "value": "lsrc-XXXXX"
      },
      "granularity": "PT1M",
      "intervals": [
        "2021-02-24T10:00:00Z/2021-02-24T10:01:00Z"
      ],
      "group_by": [
        "resource.schema_registry.id"
      ]
    }
    
  2. Submit the query as a POST using the following command. Be sure to change API_KEY and SECRET to match your environments.

    http 'https://api.telemetry.confluent.cloud/v2/metrics/cloud/query' --auth '<API_KEY>:<SECRET>' < schema_count.json
    

    Your output should resemble:

    {
      "data": [
        {
          "resource.schema_registry.id": "lsrc-XXXXX",
          "timestamp": "2021-02-24T10:00:00Z",
          "value": 1.0
        }
      ]
    }
    

Query for the hourly number of records received by a sink connector lcc-XXXXX

  1. Create a file named sink_connector_record_number.json using the following template. Be sure to change lcc-XXXXX and the timestamp values to match your needs.

    {
      "aggregations": [
        {
          "metric": "io.confluent.kafka.connect/received_records"
        }
      ],
      "filter": {
        "field": "resource.connector.id",
        "op": "EQ",
        "value": "lcc-XXXXX"
      },
      "granularity": "PT1H",
      "intervals": [
        "2021-02-24T10:00:00Z/2021-02-24T11:00:00Z"
      ],
      "group_by": [
        "resource.connector.id"
      ]
    }
    
  2. Submit the query as a POST using the following command. Be sure to change API_KEY and SECRET to match your Confluent Cloud cluster credentials (--resource cloud credentials).

    http 'https://api.telemetry.confluent.cloud/v2/metrics/cloud/query' --auth '<API_KEY>:<SECRET>' < sink_connector_record_number.json
    

    Your output should resemble:

    {
      "data": [
        {
          "resource.connector.id": "lcc-XXXXX",
          "timestamp": "2021-02-24T10:00:00Z",
          "value": 26455991.0
        }
      ]
    }
    

Query for the total free memory on a custom connector clcc-XXXXX

  1. Create a file named custom_connector_free_memory.json using the following template. Be sure to change clcc-XXXXX and the timestamp values to match your needs.

    {
      "aggregations": [
        {
          "metric": "io.confluent.system/memory_free_bytes"
        }
      ],
      "filter": {
        "field": "resource.custom_connector.id",
        "op": "EQ",
        "value": "clcc-XXXXX"
      },
      "granularity": "PT1H",
      "intervals": [
        "2023-05-09T10:00:00Z/2023-05-09T15:00:00Z"
      ],
      "group_by": [
        "resource.custom_connector.id"
      ]
    }
    
  2. Submit the query as a POST using the following command.

    http 'https://api.telemetry.confluent.cloud/v2/metrics/cloud-custom/query' --auth '<API_KEY>:<SECRET>' < custom_connector_free_memory.json
    

    Your output should resemble:

    {
       "data": [
          {
                "resource.custom_connector.id": "clcc-XXXXXX",
                "timestamp": "2023-05-09T10:00:00Z",
                "value": 125229329.06666666
          },
          {
                "resource.custom_connector.id": "clcc-XXXXXX",
                "timestamp": "2023-05-09T11:00:00Z",
                "value": 125193966.93333334
          },
          {
                "resource.custom_connector.id": "clcc-XXXXXX",
                "timestamp": "2023-05-09T12:00:00Z",
                "value": 125140241.06666666
          },
          {
                "resource.custom_connector.id": "clcc-XXXXXX",
                "timestamp": "2023-05-09T13:00:00Z",
                "value": 125099622.4
          },
          {
                "resource.custom_connector.id": "clcc-XXXXXX",
                "timestamp": "2023-05-09T14:00:00Z",
                "value": 124849493.33333333
          }
       ]
    }
    

For Confluent Cloud UI metrics for custom connectors, see View metrics.

Query for the total percent CPU used by a custom connector clcc-XXXXX

  1. Create a file named custom_connector_percent_cpu.json using the following template. Be sure to change clcc-XXXXX and the timestamp values to match your needs.

    {
      "aggregations": [
        {
          "metric": "io.confluent.system/cpu_load_percent"
        }
      ],
      "filter": {
        "field": "resource.custom_connector.id",
        "op": "EQ",
        "value": "clcc-XXXXX"
      },
      "granularity": "PT1H",
      "intervals": [
        "2023-05-09T10:00:00Z/2023-05-09T15:00:00Z"
      ],
      "group_by": [
        "resource.custom_connector.id"
      ]
    }
    
  2. Submit the query as a POST using the following command.

    http 'https://api.telemetry.confluent.cloud/v2/metrics/cloud-custom/query' --auth '<API_KEY>:<SECRET>' < custom_connector_percent_cpu.json
    

    Your output should resemble:

    {
       "data": [
          {
                "resource.custom_connector.id": "clcc-XXXXX",
                "timestamp": "2023-05-09T10:00:00Z",
                "value": 0.021009808092643977
          },
          {
                "resource.custom_connector.id": "clcc-XXXXX",
                "timestamp": "2023-05-09T11:00:00Z",
                "value": 0.01990721858932965
          },
          {
                "resource.custom_connector.id": "clcc-XXXXX",
                "timestamp": "2023-05-09T12:00:00Z",
                "value": 0.020799848444189233
          },
          {
                "resource.custom_connector.id": "clcc-XXXXX",
                "timestamp": "2023-05-09T13:00:00Z",
                "value": 0.019948515028905416
          },
          {
                "resource.custom_connector.id": "clcc-XXXXX",
                "timestamp": "2023-05-09T14:00:00Z",
                "value": 0.020734587261390117
          }
       ]
    }
    

For Confluent Cloud UI metrics for custom connectors, see View metrics.

Query for metrics for a specific Principal ID

The metric.principal_id label can be used to filter metrics by specific users or service accounts. Metrics such as io.confluent.kafka.server/active_connection_count and io.confluent.kafka.server/request_count support filtering by the metric.principal_id label. To see all metrics that currently support this label see the API Reference.

  1. Create a file named principal_query.json using the following template. Be sure to change lkc-XXXXX and the timestamp values to match your needs.

    {
      "aggregations": [
        {
          "metric": "io.confluent.kafka.server/active_connection_count"
        }
      ],
      "filter": {
        "field": "resource.kafka.id",
        "op": "EQ",
        "value": "lkc-XXXXX"
      },
      "granularity": "PT1H",
      "group_by": [
        "metric.principal_id"
      ],
      "intervals": [
        "2022-01-01T00:00:00Z/PT1H"
      ],
      "limit": 5
    }
    
  2. Submit the query as a POST using the following command. Be sure to change API_KEY and SECRET to match your environments.

    http 'https://api.telemetry.confluent.cloud/v2/metrics/cloud/query' --auth '<API_KEY>:<SECRET>' < principal_query.json
    

    Your output should resemble:

    {
      "data": [
        {
          "metric.principal_id": "sa-abc123",
          "timestamp": "2022-01-01T00:00:00Z",
          "value": 430.99999999997
        },
        {
          "metric.principal_id": "u-def456",
          "timestamp": "2022-01-01T00:00:00Z",
          "value": 427.93333333332
        },
        {
          "metric.principal_id": "u-abc123",
          "timestamp": "2022-01-01T00:00:00Z",
          "value": 333.19999999997
        }
      ],
      "meta": {
        "pagination": {
          "next_page_token": "eyJ2ZXJzaW9uIjoiMSIsInJlcXVlc3RI",
          "page_size": 5
        }
      }
    

    Note

    Topics without reported metric values during the specified interval aren’t returned.

Query for the absolute number of CFUs at a given moment in a Flink compute pool

This metric represents the absolute number of CFUs or the current usage at a given moment in a Flink compute pool.

  1. Create a file named current_cfus.json using the following template. Be sure to change the compute pool ID (lfcp-XXXXXX), and the timestamp values to match your needs.

    {
      "aggregations": [
        {
          "metric": "io.confluent.flink/compute_pool_utilization/current_cfus"
        }
      ],
      "filter": {
        "field": "resource.compute_pool.id",
        "op": "EQ",
        "value": "lfcp-XXXXXX"
      },
      "granularity": "PT1M",
      "intervals": ["2024-05-15T14:00:00/2024-05-15T14:05:00"],
      "limit": 5
    }
    
  2. Submit the query as a POST using the following command. Be sure to change API_KEY and SECRET to match your environments.

    http 'https://api.telemetry.confluent.cloud/v2/metrics/cloud/query' --auth '<API_KEY>:<SECRET>' < current_cfus.json
    

    Your output should resemble:

    {
      "data": [
        {
          "timestamp": "2024-05-15T14:00:00Z",
          "value": 3.0
        },
        {
          "timestamp": "2024-05-15T14:01:00Z",
          "value": 3.0
        },
        {
          "timestamp": "2024-05-15T14:02:00Z",
          "value": 3.0
        },
        {
          "timestamp": "2024-05-15T14:03:00Z",
          "value": 3.0
        },
        {
          "timestamp": "2024-05-15T14:04:00Z",
          "value": 3.0
        }
      ]
    }
    

Query for the maximum number of CFUs assigned to a Flink compute pool

This metric represents the maximum number of CFUs assigned to a Flink compute pool. When Flink statements are running, the compute pool is autoscaled up to this maximum number of CFUs assigned to a Flink compute pool.

  1. Create a file named cfu_limit.json using the following template. Be sure to change the compute pool ID (lfcp-XXXXXX), and the timestamp values to match your needs.

    {
      "aggregations": [
        {
          "metric": "io.confluent.flink/compute_pool_utilization/cfu_limit"
        }
      ],
      "filter": {
        "field": "resource.compute_pool.id",
        "op": "EQ",
        "value": "lfcp-XXXXXX"
      },
      "granularity": "PT1M",
      "intervals": ["2024-05-15T14:00:00/2024-05-15T14:05:00"],
      "limit": 5
    }
    
  2. Submit the query as a POST using the following command. Be sure to change API_KEY and SECRET to match your environments.

    http 'https://api.telemetry.confluent.cloud/v2/metrics/cloud/query' --auth '<API_KEY>:<SECRET>' < cfu_limit.json
    

    Your output should resemble:

    {
      "data": [
        {
          "timestamp": "2024-05-15T14:00:00Z",
          "value": 10.0
        },
        {
          "timestamp": "2024-05-15T14:01:00Z",
          "value": 10.0
        },
        {
          "timestamp": "2024-05-15T14:02:00Z",
          "value": 10.0
        },
        {
          "timestamp": "2024-05-15T14:03:00Z",
          "value": 10.0
        },
        {
          "timestamp": "2024-05-15T14:04:00Z",
          "value": 10.0
        }
      ]
    }
    

Query for the statement status for a given Flink SQL statement

This metric represents the status of a Flink SQL statement.

  1. Create a file named statement_status.json using the following template. Be sure to change the compute pool ID (lfcp-XXXXXX), and the timestamp values to match your needs.

    {
      "aggregations": [
        {
          "metric": "io.confluent.flink/statement_status"
        }
      ],
      "filter": {
        "op": "AND",
        "filters": [
          {
            "field": "resource.flink_statement.name",
            "op": "EQ",
            "value": "workspace-2025-03-25-130905-70059bd3-2462-4ee8-8fb0-d33f41e44471"
          },{
            "field": "resource.compute_pool.id",
            "op": "EQ",
            "value": "lfcp-3mx0zj"
        }
    
        ]
      },
      "granularity": "PT1M",
      "intervals": ["now-6h/now"],
      "group_by": [
        "resource.flink_statement.uid",
        "metric.status"
      ],
      "limit": 1000
    }
    
  2. Submit the query as a POST using the following command. Be sure to change API_KEY and SECRET to match your environments.

    http 'https://api.telemetry.confluent.cloud/v2/metrics/cloud/query' --auth '<API_KEY>:<SECRET>' < statement_status.json
    

    Your output should resemble:

    {
      "data": [
        {
          "timestamp": "2025-03-10T09:27:00Z",
          "value": 1.0,
          "resource.flink_statement.uid": "e26f074c-0a26-465d-86b2-79ee685973f2",
          "metric.status": "RUNNING"
        },
        {
          "timestamp": "2025-03-10T09:32:00Z",
          "value": 1.0,
          "resource.flink_statement.uid": "e26f074c-0a26-465d-86b2-79ee685973f2",
          "metric.status": "RUNNING"
        },
        {
          "timestamp": "2025-03-10T09:34:00Z",
          "value": 1.0,
          "resource.flink_statement.uid": "e26f074c-0a26-465d-86b2-79ee685973f2",
          "metric.status": "RUNNING"
        },
        {
          "timestamp": "2025-03-10T09:36:00Z",
          "value": 1.0,
          "resource.flink_statement.uid": "e26f074c-0a26-465d-86b2-79ee685973f2",
          "metric.status": "RUNNING"
        }
      ]
    }