Confluent Cloud Metrics
The Confluent Cloud Metrics API provides a comprehensive way to monitor and analyze the health and performance of your data streaming workloads. Use it to query metrics related to data streaming, connectors, governance, and Apache Flink® stream processing. For information about third-party integrations with the Metrics API, see Integrate Confluent Cloud Metrics API with Third-party Monitoring Tools.
Metrics quick start
The Metrics quick start is meant to be instructional and to help you get started with using the metrics that Confluent Cloud provides. The Metrics API supports a diverse set of querying patterns to provide usage and performance analysis over time. For more information on the Confluent Cloud Metrics API, see the API Reference.
- Considerations
You must use a API Key resource-scoped for resource management to communicate with the Metrics API
API Keys resource-scoped for a Kafka cluster cause an authentication error
Metrics in Confluent Cloud are available either through first-class integrations with third-party monitoring providers or by directly querying the Confluent Cloud Metrics API
Users who would like to monitor Confluent Cloud are encouraged to use an integration with third-party monitoring providers to reduce the operational burden of monitoring
- Prerequisites
API Key resource-scoped for resource management
Confluent Cloud resources to monitor
- To get started with the Metrics API
Create an API key and run example queries. For more information, see Create an API key to authenticate to the Metrics API and Run example queries.
See also
For an example that showcases how to monitor an Kafka client application and Confluent Cloud metrics, and steps through various failure scenarios to show metrics results, see the Observability for Kafka Clients to Confluent Cloud.
Create an API key to authenticate to the Metrics API
To securely access Confluent Cloud metrics programmatically or via third-party monitoring tools, you need both the MetricsViewer role for authorization and an API key for authentication.
The MetricsViewer role provides service account access to the Metrics API for all clusters in an organization. This role also enables service accounts to import metrics into third-party metrics platforms.
To create an API key to authenticate to the Metrics API:
Run the following commands to add a role binding for MetricsViewer to a new
service account. Remember to log in with the confluent login command first.
Create the service account:
confluent iam service-account create MetricsImporter --description "A test service account to import Confluent Cloud metrics into our monitoring system"
Your output should resemble:
+-------------+--------------------------------+ | ID | sa-123abc | | Name | MetricsImporter | | Description | A test service account to | | | import Confluent Cloud metrics | | | into our monitoring system | +-------------+--------------------------------+
Make note of the ID field.
Add the MetricsViewer role binding to the service account:
confluent iam rbac role-binding create --role MetricsViewer --principal User:sa-123abc
Your output should resemble:
+-----------+----------------+ | Principal | User:sa-123abc | | Role | MetricsViewer | +-----------+----------------+
List the role bindings to confirm that the MetricViewer role was created:
confluent iam rbac role-binding list --principal User:sa-123abc
Your output should resemble:
Principal | Email | Role | Environment | ... -----------------+-------+---------------+-------------+---- User:sa-123abc | | MetricsViewer | |
List the existing service accounts:
confluent iam service-account list
Your output should resemble:
ID | Name | Description ------------+--------------------------------+----------------------------------- sa-1a2b3c | test-account | for testing sa-112233 | ProactiveSupport.1614189731753 | SA for Proactive Support sa-aabbcc | KSQL.lksqlc-ab123 | SA for KSQL w/ ID lksqlc-ab123 | | and Name ksqlDB_app_0 ...Create an API key and add it to the new service account:
confluent api-key create --resource cloud --service-account sa-123abc
Your output should resemble:
It may take a couple of minutes for the API key to be ready. Save the API key and secret. The secret is not retrievable later. +---------+------------------------------------------------------------------+ | API Key | 1234567ABCDEFGHI | | Secret | ABCDEF123456.................................................... | +---------+------------------------------------------------------------------+
Save the API key and secret in a secure location.
In the top-right administration menu (☰) in the upper-right corner of the Confluent Cloud user interface, click ADMINISTRATION > API keys.
Click Add key.
Click the Granular access tile to set the scope for the API key. Click Next.
Click Create a new one and specify the service account name, and optionally, a description. Click Next.
The API key and secret are generated for the service account. You will need this API key and secret to connect to the cluster, so be sure to safely store this information. Click Save. The new service account with the API key and associated ACLs is created. When you return to the API access tab, you can view the newly-created API key to confirm.
Return to Accounts & access in the administration menu, and in the Accounts tab, click Service accounts to view your service accounts.
Select the service account that you want to assign the MetricsViewer role to.
In service account’s details page, click Access.
In the tree view, open the resource where you want the service account to have the MetricsViewer role.
Click Add role assignment and select the MetricsViewer tile. Click Save.
When you return to Accounts & access, you can view the resources for the organization, and also see that the service account you created has the MetricsViewer role binding.
Discover resources and metrics with the Metrics API
The Confluent Cloud Metrics API provides endpoints for programmatic discovery of available
resources and their metrics. This resource and metric metadata is represented by
descriptor objects.
The discovery endpoints can be used to avoid hardcoding metric and resource names into client scripts.
- Considerations:
HTTPie can be installed using most common software package managers by following the documentation.
cURL is a standard component of most operating systems, but if you don’t have cURL, you can install it by following the documentation.
- Prerequisites
Either HTTPie, cURL, or your favorite tool for making RESTful requests.
API Key resource-scoped for resource management
Confluent Cloud resources to monitor
Discover available resources
A resource represents the entity against which metrics are collected, for example, a Kafka cluster, a Kafka Connector, a ksqlDB application, etc.
Get a description of the available resources by sending a GET request to the
descriptors/resources endpoint of the API:
http 'https://api.telemetry.confluent.cloud/v2/metrics/cloud/descriptors/resources' --auth '<API_KEY>:<SECRET>'
curl -X GET 'https://api.telemetry.confluent.cloud/v2/metrics/cloud/descriptors/resources' -u '<API_KEY>:<SECRET>'
This returns a JSON document describing the available resources to query and their labels.
Discover available metrics
Get a description of the available metrics by sending a GET request to the
descriptors/metrics endpoint of the API:
http 'https://api.telemetry.confluent.cloud/v2/metrics/cloud/descriptors/metrics?resource_type=kafka' --auth '<API_KEY>:<SECRET>'
curl -X GET 'https://api.telemetry.confluent.cloud/v2/metrics/cloud/descriptors/metrics?resource_type=kafka' -u '<API_KEY>:<SECRET>'
Note
The resource_type query parameter is required to specify the type of resource for which to list metrics. The valid resource types can be determined using the /descriptors/resources endpoint.
This returns a JSON document describing the available metrics to query and their labels.
A human-readable list of the current metrics is available in the API Reference.
Metric data export and automated consumption
The Metrics API provides actionable operational metrics about your Confluent Cloud deployment, allowing for integration with external monitoring and observability platforms.
- Prerequisites:
API Key resource-scoped for resource management.
Confluent Cloud resources to monitor.
External monitoring tool such as Prometheus, Grafana, Datadog.
Export endpoint
Use the /export endpoint to export current metric values in OpenMetrics
or Prometheus format, suitable for import into an
external monitoring system. The /export endpoint returns the single most recent data point for each metric, for each distinct combination of labels.
For more information, see the /export endpoint in the
Metrics API reference.
- To use the export endpoint:
Create a Confluent Cloud API Key/Secret with the MetricsViewer role.
Specify the target resource IDs, such as Kafka cluster IDs, using query parameters to limit the metrics to only the resources you want to monitor.
Set up an external monitoring tool such as Prometheus to initiate a periodic
GETrequest, or a scrape, to the/exportendpoint.The endpoint returns a payload in an OpenMetrics-compatible format, which your external tool can immediately ingest, store, and use for visualization and alerting.
Discovery endpoint
Use the /discovery endpoint to configure your monitoring tool to dynamically discover and scrape metrics from all
authorized Confluent Cloud resources. For more information, see the /discovery endpoint in the
Metrics API reference.
- To use the discovery endpoint:
Create a Confluent Cloud API Key/Secret with the MetricsViewer role.
Configure a Prometheus scrape job with the
/discoveryendpoint URL and the API Key/Secret.When the Prometheus server connects to the
/discoveryendpoint, it receives a Prometheus-compatible list of scrape targets. This eliminates the need to mention resource IDs in the scraping-configuration file.Prometheus automatically uses the response to the
/discoveryendpoint to dynamically generate, or relabel, a list of active target URLs for its metric scraping jobs.If a new resource is created, such as a new Kafka cluster, Prometheus automatically adds the new target on its next scrape cycle, eliminating the need for manually adding new resources in the scraping-configuration file.
KIP-714 Client Metrics
With the implementation of KIP-714, Kafka clients can now push selected metrics (for example, connection counts, latency, production/consumption rates) directly to Confluent Cloud brokers. This enhancement improves observability by allowing client metrics to be centrally available through the Confluent Cloud Metrics API.
Monitor client metrics collected by Confluent Cloud
The following client metrics are pushed to Confluent Cloud and available for monitoring based on KIP-714.
Metric Name |
Description |
|---|---|
Metric Name in Kafka
Metric Name in Confluent Cloud
|
Average produce request latency |
Run example queries
The Confluent Cloud Metrics API has an expressive query language that allows users to flexibly filter and group timeseries data. Example queries are provided as a template. Additional examples can be found within the Cloud Console which also uses the Confluent Cloud Metrics API.
Timestamps in metrics queries use UTC (Coordinated Universal Time) time. Use either UTC or an offset appropriate for your location.
Query for bytes produced to the cluster per minute grouped by topic
This query measures bytes produced (ingress). If you want to query bytes consumed
(egress), see Query for bytes consumed from the cluster per minute grouped by topic. Note that if you are using Cluster
Linking, the received_bytes does not include the mirror-in bytes to the cluster.
You can use the cluster_link_destination_response_bytes metrics to query the
mirror-in bytes instead.
Create a file named
received_bytes_query.jsonusing the following template. Be sure to changelkc-XXXXXand the timestamp values to match your needs.{ "aggregations": [ { "metric": "io.confluent.kafka.server/received_bytes" } ], "filter": { "field": "resource.kafka.id", "op": "EQ", "value": "lkc-XXXXX" }, "granularity": "PT1M", "group_by": [ "metric.topic" ], "intervals": [ "2019-12-19T11:00:00-05:00/2019-12-19T11:05:00-05:00" ], "limit": 25 }
Submit the query as a
POSTusing the following command. Be sure to changeAPI_KEYandSECRETto match your environments.http 'https://api.telemetry.confluent.cloud/v2/metrics/cloud/query' --auth '<API_KEY>:<SECRET>' < received_bytes_query.json
curl -X POST 'https://api.telemetry.confluent.cloud/v2/metrics/cloud/query' -u '<API_KEY>:<SECRET>' -d @received_bytes_query.json -H 'Content-Type: application/json'
Your output should resemble:
{ "data": [ { "timestamp": "2019-12-19T16:00:00Z", "metric.topic": "test-topic", "value": 72.0 }, { "timestamp": "2019-12-19T16:01:00Z", "metric.topic": "test-topic", "value": 139.0 }, { "timestamp": "2019-12-19T16:02:00Z", "metric.topic": "test-topic", "value": 232.0 }, { "timestamp": "2019-12-19T16:03:00Z", "metric.topic": "test-topic", "value": 0.0 }, { "timestamp": "2019-12-19T16:04:00Z", "metric.topic": "test-topic", "value": 0.0 } ] }
Query for bytes consumed from the cluster per minute grouped by topic
This query measures bytes consumed (egress). If you want to query bytes produced
(ingress), see Query for bytes produced to the cluster per minute grouped by topic. Note that if you are using
Cluster Linking, the sent_bytes metrics also includes the mirror-out bytes
from the cluster.
Create a file named
sent_bytes_query.jsonusing the following template. Be sure to changelkc-XXXXXand the timestamp values to match your needs.{ "aggregations": [ { "metric": "io.confluent.kafka.server/sent_bytes" } ], "filter": { "field": "resource.kafka.id", "op": "EQ", "value": "lkc-XXXXX" }, "granularity": "PT1M", "group_by": [ "metric.topic" ], "intervals": [ "2019-12-19T11:00:00-05:00/2019-12-19T11:05:00-05:00" ], "limit": 25 }
Submit the query as a
POSTusing the following command. Be sure to changeAPI_KEYandSECRETto match your environments.http 'https://api.telemetry.confluent.cloud/v2/metrics/cloud/query' --auth '<API_KEY>:<SECRET>' < sent_bytes_query.json
curl -X POST 'https://api.telemetry.confluent.cloud/v2/metrics/cloud/query' -u '<API_KEY>:<SECRET>' -d @sent_bytes_query.json -H 'Content-Type: application/json'
Your output should resemble:
{ "data": [ { "timestamp": "2019-12-19T16:01:00Z", "metric.topic": "test-topic", "value": 0.0 }, { "timestamp": "2019-12-19T16:02:00Z", "metric.topic": "test-topic", "value": 157.0 }, { "timestamp": "2019-12-19T16:03:00Z", "metric.topic": "test-topic", "value": 371.0 }, { "timestamp": "2019-12-19T16:04:00Z", "metric.topic": "test-topic", "value": 0.0 } ] }
Note
If you haven’t produced data during the time window, the dataset is empty for a given topic. For more details on
sent_bytesandreceived_bytesin Cluster Linking, please refer to Cluster Linking Performance Limits
Query for max retained bytes per hour over 2 hours for a cluster lkc-XXXXX
Create a file named
cluster_retained_bytes_query.jsonusing the following template. Be sure to changelkc-XXXXXand the timestamp values to match your needs:{ "aggregations": [ { "metric": "io.confluent.kafka.server/retained_bytes" } ], "filter": { "field": "resource.kafka.id", "op": "EQ", "value": "lkc-XXXXX" }, "granularity": "PT1H", "intervals": [ "2019-12-19T11:00:00-05:00/P0Y0M0DT2H0M0S" ], "limit": 5 }
Submit the query as a
POSTusing the following command. Be sure to changeAPI_KEYandSECRETto match your environments.http 'https://api.telemetry.confluent.cloud/v2/metrics/cloud/query' --auth '<API_KEY>:<SECRET>' < cluster_retained_bytes_query.json
curl -X POST 'https://api.telemetry.confluent.cloud/v2/metrics/cloud/query' -u '<API_KEY>:<SECRET>' -d @cluster_retained_bytes_query.json -H 'Content-Type: application/json'
Your output should resemble:
{ "data": [ { "timestamp": "2019-12-19T16:00:00Z", "value": 507350.0 }, { "timestamp": "2019-12-19T17:00:00Z", "value": 507350.0 } ] }
Query for average consumer lag over the last hour grouped by topic and consumer group
Create a file named
consumer_lag_max_hour.jsonusing the following template. Be sure to changelkc-XXXXXand note the interval is for the last hour with a 1-minute granularity.{ "aggregations": [ { "metric": "io.confluent.kafka.server/consumer_lag_offsets" } ], "filter": { "field": "resource.kafka.id", "op": "EQ", "value": "lkc-XXXXX" }, "granularity": "PT1M", "group_by": [ "metric.consumer_group_id", "metric.topic" ], "intervals": [ "PT1H/now" ], "limit": 25 }
Submit the query as a
POSTusing the following command. Be sure to changeAPI_KEYandSECRETto match your environments.http 'https://api.telemetry.confluent.cloud/v2/metrics/cloud/query' --auth '<API_KEY>:<SECRET>' < consumer_lag_max_hour.json
curl -X POST 'https://api.telemetry.confluent.cloud/v2/metrics/cloud/query' -u '<API_KEY>:<SECRET>' -d @consumer_lag_max_hour.json -H 'Content-Type: application/json'
Your output should resemble:
{ "data": [ { "metric.consumer_group_id": "group_1", "metric.topic": "test_topic_1", "timestamp": "2022-03-23T21:00:00Z", "value": 0.0 }, { "metric.consumer_group_id": "group_2", "metric.topic": "test_topic_2", "timestamp": "2022-03-23T21:00:00Z", "value": 6.0 } ] }
Query for the number of streaming units used per hour for ksqlDB cluster lksqlc-XXXXX
Create a file named
ksql_streaming_unit_count.jsonusing the following template. Be sure to changelksqlc-XXXXXand the timestamp values to match your needs.{ "aggregations": [ { "metric": "io.confluent.kafka.ksql/streaming_unit_count" } ], "filter": { "field": "resource.ksql.id", "op": "EQ", "value": "lksqlc-XXXXX" }, "granularity": "PT1H", "intervals": [ "2021-02-24T10:00:00Z/2021-02-24T11:00:00Z" ], "group_by": [ "resource.ksql.id" ] }
Submit the query as a
POSTusing the following command. Be sure to changeAPI_KEYandSECRETto match your environments.http 'https://api.telemetry.confluent.cloud/v2/metrics/cloud/query' --auth '<API_KEY>:<SECRET>' < ksql_streaming_unit_count.json
curl -X POST 'https://api.telemetry.confluent.cloud/v2/metrics/cloud/query' -u '<API_KEY>:<SECRET>' -d @ksql_streaming_unit_count.json -H 'Content-Type: application/json'
Your output should resemble:
{ "data": [ { "resource.ksql.id": "lksqlc-XXXXX", "timestamp": "2021-02-24T10:00:00Z", "value": 4.0 } ] }
Query for the max % of storage used over all CSUs for a ksqlDB cluster lksqlc-XXXXX
Create a file named
ksql_storage_utilization.jsonusing the following template. Be sure to changelksqlc-XXXXXand the timestamp values to match your needs.{ "aggregations": [ { "metric": "io.confluent.kafka.ksql/storage_utilization" } ], "filter": { "field": "resource.ksql.id", "op": "EQ", "value": "lksqlc-xxxxx" }, "granularity": "PT1M", "intervals": [ "2021-02-24T10:00:00Z/2021-02-24T11:00:00Z" ] }
Submit the query as a
POSTusing the following command. Be sure to changeAPI_KEYandSECRETto match your environments.http 'https://api.telemetry.confluent.cloud/v2/metrics/cloud/query' --auth '<API_KEY>:<SECRET>' < ksql_storage_utilization.json
curl -X POST 'https://api.telemetry.confluent.cloud/v2/metrics/cloud/query' -u '<API_KEY>:<SECRET>' -d @ksql_storage_utilization.json -H 'Content-Type: application/json'
Your output should resemble:
{ "data": [ { "resource.ksql.id": "lksqlc-XXXXX", "timestamp": "2021-02-24T10:00:00Z", "value": 0.85 } ] }
Query for the bytes of ksqlDB storage used by a query on ksqlDB cluster lksqlc-XXXXX
Create a file named
ksql_query_storage.jsonusing the following template. Be sure to changelksqlc-XXXXXand the timestamp values to match your needs.{ "aggregations": [ { "metric": "io.confluent.kafka.ksql/task_stored_bytes" } ], "filter": { "field": "resource.ksql.id", "op": "EQ", "value": "lksqlc-xxxxx" }, "granularity": "PT1M", "group_by": [ "metric.query_id" ], "intervals": [ "2021-02-24T10:00:00Z/2021-02-24T11:00:00Z" ] }
Submit the query as a
POSTusing the following command. Be sure to changedAPI_KEYandSECRETto match your environments.http 'https://api.telemetry.confluent.cloud/v2/metrics/cloud/query' --auth '<API_KEY>:<SECRET>' < ksql_query_storage.json
curl -X POST 'https://api.telemetry.confluent.cloud/v2/metrics/cloud/query' -u '<API_KEY>:<SECRET>' -d @ksql_query_storage.json -H 'Content-Type: application/json'
Your output should resemble:
{ "data": [ { "resource.ksql.id": "lksqlc-XXXXX", "metric.query_id": "CTAS_PAGEVIEWS_2", "timestamp": "2021-02-24T10:00:00Z", "value": 7688174488 } ] }
Query for the bytes of storage used by a task on ksqlDB cluster lksqlc-XXXXX
Create a file named
ksql_task_storage.jsonusing the following template. Be sure to changelksqlc-XXXXXand the timestamp values to match your needs.{ "aggregations": [ { "metric": "io.confluent.kafka.ksql/task_stored_bytes" } ], "filter": { "field": "resource.ksql.id", "op": "EQ", "value": "lksqlc-xxxxx" }, "granularity": "PT1M", "group_by": [ "metric.query_id", "metric.task_id" ], "intervals": [ "2021-02-24T10:00:00Z/2021-02-24T11:00:00Z" ] }
Submit the query as a
POSTusing the following command. Be sure to changeAPI_KEYandSECRETto match your environments.http 'https://api.telemetry.confluent.cloud/v2/metrics/cloud/query' --auth '<API_KEY>:<SECRET>' < ksql_task_storage.json
curl -X POST 'https://api.telemetry.confluent.cloud/v2/metrics/cloud/query' -u '<API_KEY>:<SECRET>' -d @ksql_task_storage.json -H 'Content-Type: application/json'
Your output should resemble:
{ "data": [ { "resource.ksql.id": "lksqlc-XXXXX", "metric.task_id": "1_1", "metric.query_id": "CTAS_PAGEVIEWS_2", "timestamp": "2021-02-24T10:00:00Z", "value": 1079295760 } ] }
Query for the query saturation on ksqlDB cluster lksqlc-XXXXX
Create a file named
ksql_query_saturation.jsonusing the following template. Be sure to changelksqlc-XXXXXand the timestamp values to match your needs.{ "aggregations": [ { "metric": "io.confluent.kafka.ksql/query_saturation" } ], "filter": { "field": "resource.ksql.id", "op": "EQ", "value": "lksqlc-xxxxx" }, "granularity": "PT1M", "intervals": [ "2021-02-24T10:00:00Z/2021-02-24T11:00:00Z" ] }
Submit the query as a
POSTusing the following command. Be sure to changeAPI_KEYandSECRETto match your environments.http 'https://api.telemetry.confluent.cloud/v2/metrics/cloud/query' --auth '<API_KEY>:<SECRET>' < ksql_query_saturation.json
curl -X POST 'https://api.telemetry.confluent.cloud/v2/metrics/cloud/query' -u '<API_KEY>:<SECRET>' -d @ksql_query_saturation.json -H 'Content-Type: application/json'
Your output should resemble:
{ "data": [ { "resource.ksql.id": "lksqlc-XXXXX", "timestamp": "2021-02-24T10:00:00Z", "value": 0.85 } ] }
Query for the total bytes consumed by ksqlDB cluster lksqlc-XXXXX
Create a file named
ksql_bytes_consumed.jsonusing the following template. Be sure to changelksqlc-XXXXXand the timestamp values to match your needs.{ "aggregations": [ { "metric": "io.confluent.kafka.ksql/consumed_total_bytes" } ], "filter": { "field": "resource.ksql.id", "op": "EQ", "value": "lksqlc-xxxxx" }, "granularity": "PT1M", "intervals": [ "2021-02-24T10:00:00Z/2021-02-24T11:00:00Z" ] }
Submit the query as a
POSTusing the following command. Be sure to changeAPI_KEYandSECRETto match your environments.http 'https://api.telemetry.confluent.cloud/v2/metrics/cloud/query' --auth '<API_KEY>:<SECRET>' < ksql_bytes_consumed.json
curl -X POST 'https://api.telemetry.confluent.cloud/v2/metrics/cloud/query' -u '<API_KEY>:<SECRET>' -d @ksql_bytes_consumed.json -H 'Content-Type: application/json'
Your output should resemble:
{ "data": [ { "resource.ksql.id": "lksqlc-XXXXX", "timestamp": "2021-02-24T10:00:00Z", "value": 1024 } ] }
Query for the total bytes produced by ksqlDB cluster lksqlc-XXXXX
Create a file named
ksql_bytes_produced.jsonusing the following template. Be sure to changelksqlc-XXXXXand the timestamp values to match your needs.{ "aggregations": [ { "metric": "io.confluent.kafka.ksql/produced_total_bytes" } ], "filter": { "field": "resource.ksql.id", "op": "EQ", "value": "lksqlc-xxxxx" }, "granularity": "PT1M", "intervals": [ "2021-02-24T10:00:00Z/2021-02-24T11:00:00Z" ] }
Submit the query as a
POSTusing the following command. Be sure to changeAPI_KEYandSECRETto match your environments.http 'https://api.telemetry.confluent.cloud/v2/metrics/cloud/query' --auth '<API_KEY>:<SECRET>' < ksql_bytes_produced.json
curl -X POST 'https://api.telemetry.confluent.cloud/v2/metrics/cloud/query' -u '<API_KEY>:<SECRET>' -d @ksql_bytes_produced.json -H 'Content-Type: application/json'
Your output should resemble:
{ "data": [ { "resource.ksql.id": "lksqlc-XXXXX", "timestamp": "2021-02-24T10:00:00Z", "value": 1024 } ] }
Query for the total topic offsets processed by task on ksqlDB cluster lksqlc-XXXXX
Create a file named
ksql_offsets_processed.jsonusing the following template. Be sure to changelksqlc-XXXXXand the timestamp values to match your needs.{ "aggregations": [ { "metric": "io.confluent.kafka.ksql/offsets_processed_total" } ], "filter": { "field": "resource.ksql.id", "op": "EQ", "value": "lksqlc-xxxxx" }, "granularity": "PT1M", "group_by": [ "metric.query_id", "metric.tasK_id" ], "intervals": [ "2021-02-24T10:00:00Z/2021-02-24T11:00:00Z" ] }
Submit the query as a
POSTusing the following command. Be sure to changeAPI_KEYandSECRETto match your environments.http 'https://api.telemetry.confluent.cloud/v2/metrics/cloud/query' --auth '<API_KEY>:<SECRET>' < ksql_offsets_processed.json
curl -X POST 'https://api.telemetry.confluent.cloud/v2/metrics/cloud/query' -u '<API_KEY>:<SECRET>' -d @ksql_offsets_processed.json -H 'Content-Type: application/json'
Your output should resemble:
{ "data": [ { "resource.ksql.id": "lksqlc-XXXXX", "timestamp": "2021-02-24T10:00:00Z", "metric.task_id": "1_1", "metric.query_id": "CTAS_PAGEVIEWS_2", "value": 123 } ] }
Query for the total topic offsets processed by all tasks of query on ksqlDB cluster lksqlc-XXXXX
Create a file named
ksql_offsets_processed.jsonusing the following template. Be sure to changelksqlc-XXXXXand the timestamp values to match your needs.{ "aggregations": [ { "metric": "io.confluent.kafka.ksql/offsets_processed_total" } ], "filter": { "field": "resource.ksql.id", "op": "EQ", "value": "lksqlc-xxxxx" }, "granularity": "PT1M", "group_by": [ "metric.query_id" ], "intervals": [ "2021-02-24T10:00:00Z/2021-02-24T11:00:00Z" ] }
Submit the query as a
POSTusing the following command. Be sure to changeAPI_KEYandSECRETto match your environments.http 'https://api.telemetry.confluent.cloud/v2/metrics/cloud/query' --auth '<API_KEY>:<SECRET>' < ksql_offsets_processed.json
curl -X POST 'https://api.telemetry.confluent.cloud/v2/metrics/cloud/query' -u '<API_KEY>:<SECRET>' -d @ksql_offsets_processed.json -H 'Content-Type: application/json'
Your output should resemble:
{ "data": [ { "resource.ksql.id": "lksqlc-XXXXX", "timestamp": "2021-02-24T10:00:00Z", "metric.query_id": "CTAS_PAGEVIEWS_2", "value": 123 } ] }
Query for the current committed offset lag by task on ksqlDB cluster lksqlc-XXXXX
Create a file named
ksql_offset_lag.jsonusing the following template. Be sure to changelksqlc-XXXXXand the timestamp values to match your needs.{ "aggregations": [ { "metric": "io.confluent.kafka.ksql/committed_offset_lag" } ], "filter": { "field": "resource.ksql.id", "op": "EQ", "value": "lksqlc-xxxxx" }, "granularity": "PT1M", "group_by": [ "metric.query_id", "metric.tasK_id" ], "intervals": [ "2021-02-24T10:00:00Z/2021-02-24T11:00:00Z" ] }
Submit the query as a
POSTusing the following command. Be sure to changeAPI_KEYandSECRETto match your environments.http 'https://api.telemetry.confluent.cloud/v2/metrics/cloud/query' --auth '<API_KEY>:<SECRET>' < ksql_offset_lag.json
curl -X POST 'https://api.telemetry.confluent.cloud/v2/metrics/cloud/query' -u '<API_KEY>:<SECRET>' -d @ksql_offset_lag.json -H 'Content-Type: application/json'
Your output should resemble:
{ "data": [ { "resource.ksql.id": "lksqlc-XXXXX", "timestamp": "2021-02-24T10:00:00Z", "metric.task_id": "1_1", "metric.query_id": "CTAS_PAGEVIEWS_2", "value": 456 } ] }
Query for the current total committed offset lag for all tasks in query on ksqlDB cluster lksqlc-XXXXX
Create a file named
ksql_offset_lag.jsonusing the following template. Be sure to changelksqlc-XXXXXand the timestamp values to match your needs.{ "aggregations": [ { "metric": "io.confluent.kafka.ksql/committed_offset_lag" } ], "filter": { "field": "resource.ksql.id", "op": "EQ", "value": "lksqlc-xxxxx" }, "granularity": "PT1M", "group_by": [ "metric.query_id" ], "intervals": [ "2021-02-24T10:00:00Z/2021-02-24T11:00:00Z" ] }
Submit the query as a
POSTusing the following command. Be sure to changeAPI_KEYandSECRETto match your environments.http 'https://api.telemetry.confluent.cloud/v2/metrics/cloud/query' --auth '<API_KEY>:<SECRET>' < ksql_offset_lag.json
curl -X POST 'https://api.telemetry.confluent.cloud/v2/metrics/cloud/query' -u '<API_KEY>:<SECRET>' -d @ksql_offset_lag.json -H 'Content-Type: application/json'
Your output should resemble:
{ "data": [ { "resource.ksql.id": "lksqlc-XXXXX", "timestamp": "2021-02-24T10:00:00Z", "metric.query_id": "CTAS_PAGEVIEWS_2", "value": 456 } ] }
Query for the total number of processing errors by query on ksqlDB cluster lksqlc-XXXXX
Create a file named
ksql_processing_errors.jsonusing the following template. Be sure to changelksqlc-XXXXXand the timestamp values to match your needs.{ "aggregations": [ { "metric": "io.confluent.kafka.ksql/processing_errors_total" } ], "filter": { "field": "resource.ksql.id", "op": "EQ", "value": "lksqlc-xxxxx" }, "granularity": "PT1M", "group_by": [ "metric.query_id" ], "intervals": [ "2021-02-24T10:00:00Z/2021-02-24T11:00:00Z" ] }
Submit the query as a
POSTusing the following command. Be sure to changeAPI_KEYandSECRETto match your environments.http 'https://api.telemetry.confluent.cloud/v2/metrics/cloud/query' --auth '<API_KEY>:<SECRET>' < ksql_processing_errors.json
curl -X POST 'https://api.telemetry.confluent.cloud/v2/metrics/cloud/query' -u '<API_KEY>:<SECRET>' -d @ksql_processing_errors.json -H 'Content-Type: application/json'
Your output should resemble:
{ "data": [ { "resource.ksql.id": "lksqlc-XXXXX", "timestamp": "2021-02-24T10:00:00Z", "metric.query_id": "CTAS_PAGEVIEWS_2", "value": 16 } ] }
Query for the total number of restarts due to failure by query on ksqlDB cluster lksqlc-XXXXX
Create a file named
ksql_query_restarts.jsonusing the following template. Be sure to changelksqlc-XXXXXand the timestamp values to match your needs.{ "aggregations": [ { "metric": "io.confluent.kafka.ksql/query_restarts" } ], "filter": { "field": "resource.ksql.id", "op": "EQ", "value": "lksqlc-xxxxx" }, "granularity": "PT1M", "group_by": [ "metric.query_id" ], "intervals": [ "2021-02-24T10:00:00Z/2021-02-24T11:00:00Z" ] }
Submit the query as a
POSTusing the following command. Be sure to changeAPI_KEYandSECRETto match your environments.http 'https://api.telemetry.confluent.cloud/v2/metrics/cloud/query' --auth '<API_KEY>:<SECRET>' < ksql_query_restarts.json
curl -X POST 'https://api.telemetry.confluent.cloud/v2/metrics/cloud/query' -u '<API_KEY>:<SECRET>' -d @ksql_query_restarts.json -H 'Content-Type: application/json'
Your output should resemble:
{ "data": [ { "resource.ksql.id": "lksqlc-XXXXX", "timestamp": "2021-02-24T10:00:00Z", "metric.query_id": "CTAS_PAGEVIEWS_2", "value": 3 } ] }
Query for the number of schemas in the Schema Registry cluster lsrc-XXXXX
Create a file named
schema_count.jsonusing the following template. Be sure to changelsrc-XXXXXand the timestamp values to match your needs.{ "aggregations": [ { "time_agg": "MAX", "agg": "SUM", "metric": "io.confluent.kafka.schema_registry/schema_count" } ], "filter": { "field": "resource.schema_registry.id", "op": "EQ", "value": "lsrc-XXXXX" }, "granularity": "PT1M", "intervals": [ "2021-02-24T10:00:00Z/2021-02-24T10:01:00Z" ], "group_by": [ "resource.schema_registry.id" ] }
Submit the query as a
POSTusing the following command. Be sure to changeAPI_KEYandSECRETto match your environments.http 'https://api.telemetry.confluent.cloud/v2/metrics/cloud/query' --auth '<API_KEY>:<SECRET>' < schema_count.json
curl -X POST 'https://api.telemetry.confluent.cloud/v2/metrics/cloud/query' -u '<API_KEY>:<SECRET>' -d @schema_count.json -H 'Content-Type: application/json'
Your output should resemble:
{ "data": [ { "resource.schema_registry.id": "lsrc-XXXXX", "timestamp": "2021-02-24T10:00:00Z", "value": 1.0 } ] }
Query for the hourly number of records received by a sink connector lcc-XXXXX
Create a file named
sink_connector_record_number.jsonusing the following template. Be sure to changelcc-XXXXXand the timestamp values to match your needs.{ "aggregations": [ { "metric": "io.confluent.kafka.connect/received_records" } ], "filter": { "field": "resource.connector.id", "op": "EQ", "value": "lcc-XXXXX" }, "granularity": "PT1H", "intervals": [ "2021-02-24T10:00:00Z/2021-02-24T11:00:00Z" ], "group_by": [ "resource.connector.id" ] }
Submit the query as a
POSTusing the following command. Be sure to changeAPI_KEYandSECRETto match your Confluent Cloud cluster credentials (--resource cloudcredentials).http 'https://api.telemetry.confluent.cloud/v2/metrics/cloud/query' --auth '<API_KEY>:<SECRET>' < sink_connector_record_number.json
curl -X POST 'https://api.telemetry.confluent.cloud/v2/metrics/cloud/query' -u '<API_KEY>:<SECRET>' -d @sink_connector_record_number.json -H 'Content-Type: application/json'
Your output should resemble:
{ "data": [ { "resource.connector.id": "lcc-XXXXX", "timestamp": "2021-02-24T10:00:00Z", "value": 26455991.0 } ] }
Query for the total free memory on a custom connector clcc-XXXXX
Create a file named
custom_connector_free_memory.jsonusing the following template. Be sure to changeclcc-XXXXXand the timestamp values to match your needs.{ "aggregations": [ { "metric": "io.confluent.system/memory_free_bytes" } ], "filter": { "field": "resource.custom_connector.id", "op": "EQ", "value": "clcc-XXXXX" }, "granularity": "PT1H", "intervals": [ "2023-05-09T10:00:00Z/2023-05-09T15:00:00Z" ], "group_by": [ "resource.custom_connector.id" ] }
Submit the query as a
POSTusing the following command.http 'https://api.telemetry.confluent.cloud/v2/metrics/cloud-custom/query' --auth '<API_KEY>:<SECRET>' < custom_connector_free_memory.json
curl -X POST 'https://api.telemetry.confluent.cloud/v2/metrics/cloud-custom/query' -u '<API_KEY>:<SECRET>' -d @custom_connector_free_memory.json -H 'Content-Type: application/json'
Your output should resemble:
{ "data": [ { "resource.custom_connector.id": "clcc-XXXXXX", "timestamp": "2023-05-09T10:00:00Z", "value": 125229329.06666666 }, { "resource.custom_connector.id": "clcc-XXXXXX", "timestamp": "2023-05-09T11:00:00Z", "value": 125193966.93333334 }, { "resource.custom_connector.id": "clcc-XXXXXX", "timestamp": "2023-05-09T12:00:00Z", "value": 125140241.06666666 }, { "resource.custom_connector.id": "clcc-XXXXXX", "timestamp": "2023-05-09T13:00:00Z", "value": 125099622.4 }, { "resource.custom_connector.id": "clcc-XXXXXX", "timestamp": "2023-05-09T14:00:00Z", "value": 124849493.33333333 } ] }
For Confluent Cloud UI metrics for custom connectors, see View metrics.
Query for the total percent CPU used by a custom connector clcc-XXXXX
Create a file named
custom_connector_percent_cpu.jsonusing the following template. Be sure to changeclcc-XXXXXand the timestamp values to match your needs.{ "aggregations": [ { "metric": "io.confluent.system/cpu_load_percent" } ], "filter": { "field": "resource.custom_connector.id", "op": "EQ", "value": "clcc-XXXXX" }, "granularity": "PT1H", "intervals": [ "2023-05-09T10:00:00Z/2023-05-09T15:00:00Z" ], "group_by": [ "resource.custom_connector.id" ] }
Submit the query as a
POSTusing the following command.http 'https://api.telemetry.confluent.cloud/v2/metrics/cloud-custom/query' --auth '<API_KEY>:<SECRET>' < custom_connector_percent_cpu.json
curl -X POST 'https://api.telemetry.confluent.cloud/v2/metrics/cloud-custom/query' -u '<API_KEY>:<SECRET>' -d @custom_connector_percent_cpu.json -H 'Content-Type: application/json'
Your output should resemble:
{ "data": [ { "resource.custom_connector.id": "clcc-XXXXX", "timestamp": "2023-05-09T10:00:00Z", "value": 0.021009808092643977 }, { "resource.custom_connector.id": "clcc-XXXXX", "timestamp": "2023-05-09T11:00:00Z", "value": 0.01990721858932965 }, { "resource.custom_connector.id": "clcc-XXXXX", "timestamp": "2023-05-09T12:00:00Z", "value": 0.020799848444189233 }, { "resource.custom_connector.id": "clcc-XXXXX", "timestamp": "2023-05-09T13:00:00Z", "value": 0.019948515028905416 }, { "resource.custom_connector.id": "clcc-XXXXX", "timestamp": "2023-05-09T14:00:00Z", "value": 0.020734587261390117 } ] }
For Confluent Cloud UI metrics for custom connectors, see View metrics.
Query for metrics for a specific Principal ID
The metric.principal_id label can be used to filter metrics by specific users or service accounts. Metrics such as io.confluent.kafka.server/active_connection_count and io.confluent.kafka.server/request_count support filtering by the
metric.principal_id label. To see all metrics that currently support this label see the API Reference.
Create a file named
principal_query.jsonusing the following template. Be sure to changelkc-XXXXXand the timestamp values to match your needs.{ "aggregations": [ { "metric": "io.confluent.kafka.server/active_connection_count" } ], "filter": { "field": "resource.kafka.id", "op": "EQ", "value": "lkc-XXXXX" }, "granularity": "PT1H", "group_by": [ "metric.principal_id" ], "intervals": [ "2022-01-01T00:00:00Z/PT1H" ], "limit": 5 }
Submit the query as a
POSTusing the following command. Be sure to changeAPI_KEYandSECRETto match your environments.http 'https://api.telemetry.confluent.cloud/v2/metrics/cloud/query' --auth '<API_KEY>:<SECRET>' < principal_query.json
curl -X POST 'https://api.telemetry.confluent.cloud/v2/metrics/cloud/query' -u '<API_KEY>:<SECRET>' -d @principal_query.json -H 'Content-Type: application/json'
Your output should resemble:
{ "data": [ { "metric.principal_id": "sa-abc123", "timestamp": "2022-01-01T00:00:00Z", "value": 430.99999999997 }, { "metric.principal_id": "u-def456", "timestamp": "2022-01-01T00:00:00Z", "value": 427.93333333332 }, { "metric.principal_id": "u-abc123", "timestamp": "2022-01-01T00:00:00Z", "value": 333.19999999997 } ], "meta": { "pagination": { "next_page_token": "eyJ2ZXJzaW9uIjoiMSIsInJlcXVlc3RI", "page_size": 5 } }
Note
Topics without reported metric values during the specified interval aren’t returned.
Query for the total number of records a Flink SQL statement has received
Create a file named
num_records_in.jsonusing the following template. Be sure to change the compute pool ID (lfcp-XXXXXX), statement name (XXXXXXXX-XXXX-XXXX), and the timestamp values to match your needs.{ "aggregations": [ { "metric": "io.confluent.flink/num_records_in" } ], "filter": { "op": "AND", "filters": [ { "field": "resource.flink_statement.name", "op": "EQ", "value": "XXXXXXXX-XXXX-XXXX" }, { "field": "resource.compute_pool.id", "op": "EQ", "value": "lfcp-XXXXXX" } ] }, "granularity": "PT1M", "intervals": [ "2023-10-23T16:30:00/2023-10-23T16:35:00" ], "limit": 5 }
Submit the query as a
POSTusing the following command. Be sure to changeAPI_KEYandSECRETto match your environments.http 'https://api.telemetry.confluent.cloud/v2/metrics/cloud/query' --auth '<API_KEY>:<SECRET>' < num_records_in.json
curl -X POST 'https://api.telemetry.confluent.cloud/v2/metrics/cloud/query' -u '<API_KEY>:<SECRET>' -d @num_records_in.json -H 'Content-Type: application/json'
Your output should resemble:
{ "data": [ { "timestamp": "2023-10-23T16:30:00Z", "value": 115.0 }, { "timestamp": "2023-10-23T16:31:00Z", "value": 116.0 }, { "timestamp": "2023-10-23T16:32:00Z", "value": 116.0 }, { "timestamp": "2023-10-23T16:33:00Z", "value": 131.0 }, { "timestamp": "2023-10-23T16:34:00Z", "value": 127.0 } ] }
Query for the total number of records a Flink SQL statement has emitted
Create a file named
num_records_out.jsonusing the following template. Be sure to change the compute pool ID (lfcp-XXXXXX), statement name (XXXXXXXX-XXXX-XXXX), and the timestamp values to match your needs.{ "aggregations": [ { "metric": "io.confluent.flink/num_records_out" } ], "filter": { "op": "AND", "filters": [ { "field": "resource.flink_statement.name", "op": "EQ", "value": "XXXXXXXX-XXXX-XXXX" }, { "field": "resource.compute_pool.id", "op": "EQ", "value": "lfcp-XXXXXX" } ] }, "granularity": "PT1M", "intervals": [ "2023-10-23T16:30:00/2023-10-23T16:35:00" ], "limit": 5 }
Submit the query as a
POSTusing the following command. Be sure to changeAPI_KEYandSECRETto match your environments.http 'https://api.telemetry.confluent.cloud/v2/metrics/cloud/query' --auth '<API_KEY>:<SECRET>' < num_records_out.json
curl -X POST 'https://api.telemetry.confluent.cloud/v2/metrics/cloud/query' -u '<API_KEY>:<SECRET>' -d @num_records_out.json -H 'Content-Type: application/json'
Your output should resemble:
{ "data": [ { "timestamp": "2023-10-23T16:30:00Z", "value": 115.0 }, { "timestamp": "2023-10-23T16:31:00Z", "value": 116.0 }, { "timestamp": "2023-10-23T16:32:00Z", "value": 116.0 }, { "timestamp": "2023-10-23T16:33:00Z", "value": 131.0 }, { "timestamp": "2023-10-23T16:34:00Z", "value": 127.0 } ] }
Query for the backlog of a Flink SQL statement
This metric represents the total number of available records after the consumer offset in a Kafka partition for a Flink SQL statement, across all operators.
Create a file named
pending_records.jsonusing the following template. Be sure to change the compute pool ID (lfcp-XXXXXX), statement name (XXXXXXXX-XXXX-XXXX), and the timestamp values to match your needs.{ "aggregations": [ { "metric": "io.confluent.flink/pending_records" } ], "filter": { "op": "AND", "filters": [ { "field": "resource.flink_statement.name", "op": "EQ", "value": "XXXXXXXX-XXXX-XXXX" }, { "field": "resource.compute_pool.id", "op": "EQ", "value": "lfcp-XXXXXX" } ] }, "granularity": "PT1M", "intervals": [ "2023-10-23T16:30:00/2023-10-23T16:35:00" ], "limit": 5 }
Submit the query as a
POSTusing the following command. Be sure to changeAPI_KEYandSECRETto match your environments.http 'https://api.telemetry.confluent.cloud/v2/metrics/cloud/query' --auth '<API_KEY>:<SECRET>' < pending_records.json
curl -X POST 'https://api.telemetry.confluent.cloud/v2/metrics/cloud/query' -u '<API_KEY>:<SECRET>' -d @pending_records.json -H 'Content-Type: application/json'
Your output should resemble:
{ "data": [ { "timestamp": "2023-10-23T16:30:00Z", "value": 0.0 }, { "timestamp": "2023-10-23T16:31:00Z", "value": 0.0 }, { "timestamp": "2023-10-23T16:32:00Z", "value": 0.0 }, { "timestamp": "2023-10-23T16:33:00Z", "value": 0.0 }, { "timestamp": "2023-10-23T16:34:00Z", "value": 0.0 } ] }
Note
The above value may not always be zero. A non-zero value indicates some backlog associated with the Flink statement.
Query for the total number of records all Flink SQL statements leveraging a Flink compute pool have received
Create a file named
num_records_in.jsonusing the following template. Be sure to change the compute pool ID (lfcp-XXXXXX), and the timestamp values to match your needs.{ "aggregations": [ { "metric": "io.confluent.flink/num_records_in" } ], "filter": { "field": "resource.compute_pool.id", "op": "EQ", "value": "lfcp-XXXXXX" }, "granularity": "PT1M", "intervals": ["2023-10-25T16:30:00/2023-10-25T16:35:00"], "limit": 5 }
Submit the query as a
POSTusing the following command. Be sure to changeAPI_KEYandSECRETto match your environments.http 'https://api.telemetry.confluent.cloud/v2/metrics/cloud/query' --auth '<API_KEY>:<SECRET>' < num_records_in.json
curl -X POST 'https://api.telemetry.confluent.cloud/v2/metrics/cloud/query' -u '<API_KEY>:<SECRET>' -d @num_records_in.json -H 'Content-Type: application/json'
Your output should resemble:
{ "data": [ { "timestamp": "2023-10-25T16:30:00Z", "value": 236.0 }, { "timestamp": "2023-10-25T16:31:00Z", "value": 228.0 }, { "timestamp": "2023-10-25T16:32:00Z", "value": 240.0 }, { "timestamp": "2023-10-25T16:33:00Z", "value": 230.0 }, { "timestamp": "2023-10-25T16:34:00Z", "value": 252.0 } ] }
Query for the total number of records all Flink SQL statements leveraging a Flink compute pool have emitted
Create a file named
num_records_out.jsonusing the following template. Be sure to change the compute pool ID (lfcp-XXXXXX), and the timestamp values to match your needs.{ "aggregations": [ { "metric": "io.confluent.flink/num_records_out" } ], "filter": { "field": "resource.compute_pool.id", "op": "EQ", "value": "lfcp-XXXXXX" }, "granularity": "PT1M", "intervals": ["2023-10-25T16:30:00/2023-10-25T16:35:00"], "limit": 5 }
Submit the query as a
POSTusing the following command. Be sure to changeAPI_KEYandSECRETto match your environments.http 'https://api.telemetry.confluent.cloud/v2/metrics/cloud/query' --auth '<API_KEY>:<SECRET>' < num_records_out.json
curl -X POST 'https://api.telemetry.confluent.cloud/v2/metrics/cloud/query' -u '<API_KEY>:<SECRET>' -d @num_records_out.json -H 'Content-Type: application/json'
Your output should resemble:
{ "data": [ { "timestamp": "2023-10-25T16:30:00Z", "value": 236.0 }, { "timestamp": "2023-10-25T16:31:00Z", "value": 228.0 }, { "timestamp": "2023-10-25T16:32:00Z", "value": 240.0 }, { "timestamp": "2023-10-25T16:33:00Z", "value": 230.0 }, { "timestamp": "2023-10-25T16:34:00Z", "value": 252.0 } ] }
Query for the total backlog of all Flink SQL statements leveraging a Flink compute pool
This metric represents the total number of available records after the consumer offset in a Kafka partition for all Flink SQL statements leveraging a Flink compute pool, across all operators.
Create a file named
pending_records.jsonusing the following template. Be sure to change the compute pool ID (lfcp-XXXXXX), and the timestamp values to match your needs.{ "aggregations": [ { "metric": "io.confluent.flink/pending_records" } ], "filter": { "field": "resource.compute_pool.id", "op": "EQ", "value": "lfcp-XXXXXX" }, "granularity": "PT1M", "intervals": ["2023-10-25T16:30:00/2023-10-25T16:35:00"], "limit": 5 }
Submit the query as a
POSTusing the following command. Be sure to changeAPI_KEYandSECRETto match your environments.http 'https://api.telemetry.confluent.cloud/v2/metrics/cloud/query' --auth '<API_KEY>:<SECRET>' < pending_records.json
curl -X POST 'https://api.telemetry.confluent.cloud/v2/metrics/cloud/query' -u '<API_KEY>:<SECRET>' -d @pending_records.json -H 'Content-Type: application/json'
Your output should resemble:
{ "data": [ { "timestamp": "2023-10-25T16:30:00Z", "value": 0.0 }, { "timestamp": "2023-10-25T16:31:00Z", "value": 0.0 }, { "timestamp": "2023-10-25T16:32:00Z", "value": 0.0 }, { "timestamp": "2023-10-25T16:33:00Z", "value": 0.0 }, { "timestamp": "2023-10-25T16:34:00Z", "value": 0.0 } ] }
Note
The above value may not always be zero. A non-zero value indicates the combined backlog associated with the Flink statements leveraging the Flink compute pool in the query.
Query for the absolute number of CFUs at a given moment in a Flink compute pool
This metric represents the absolute number of CFUs or the current usage at a given moment in a Flink compute pool.
Create a file named
current_cfus.jsonusing the following template. Be sure to change the compute pool ID (lfcp-XXXXXX), and the timestamp values to match your needs.{ "aggregations": [ { "metric": "io.confluent.flink/compute_pool_utilization/current_cfus" } ], "filter": { "field": "resource.compute_pool.id", "op": "EQ", "value": "lfcp-XXXXXX" }, "granularity": "PT1M", "intervals": ["2024-05-15T14:00:00/2024-05-15T14:05:00"], "limit": 5 }
Submit the query as a
POSTusing the following command. Be sure to changeAPI_KEYandSECRETto match your environments.http 'https://api.telemetry.confluent.cloud/v2/metrics/cloud/query' --auth '<API_KEY>:<SECRET>' < current_cfus.json
curl -X POST 'https://api.telemetry.confluent.cloud/v2/metrics/cloud/query' -u '<API_KEY>:<SECRET>' -d @current_cfus.json -H 'Content-Type: application/json'
Your output should resemble:
{ "data": [ { "timestamp": "2024-05-15T14:00:00Z", "value": 3.0 }, { "timestamp": "2024-05-15T14:01:00Z", "value": 3.0 }, { "timestamp": "2024-05-15T14:02:00Z", "value": 3.0 }, { "timestamp": "2024-05-15T14:03:00Z", "value": 3.0 }, { "timestamp": "2024-05-15T14:04:00Z", "value": 3.0 } ] }
Query for the maximum number of CFUs assigned to a Flink compute pool
This metric represents the maximum number of CFUs assigned to a Flink compute pool. When Flink statements are running, the compute pool is autoscaled up to this maximum number of CFUs assigned to a Flink compute pool.
Create a file named
cfu_limit.jsonusing the following template. Be sure to change the compute pool ID (lfcp-XXXXXX), and the timestamp values to match your needs.{ "aggregations": [ { "metric": "io.confluent.flink/compute_pool_utilization/cfu_limit" } ], "filter": { "field": "resource.compute_pool.id", "op": "EQ", "value": "lfcp-XXXXXX" }, "granularity": "PT1M", "intervals": ["2024-05-15T14:00:00/2024-05-15T14:05:00"], "limit": 5 }
Submit the query as a
POSTusing the following command. Be sure to changeAPI_KEYandSECRETto match your environments.http 'https://api.telemetry.confluent.cloud/v2/metrics/cloud/query' --auth '<API_KEY>:<SECRET>' < cfu_limit.json
curl -X POST 'https://api.telemetry.confluent.cloud/v2/metrics/cloud/query' -u '<API_KEY>:<SECRET>' -d @cfu_limit.json -H 'Content-Type: application/json'
Your output should resemble:
{ "data": [ { "timestamp": "2024-05-15T14:00:00Z", "value": 10.0 }, { "timestamp": "2024-05-15T14:01:00Z", "value": 10.0 }, { "timestamp": "2024-05-15T14:02:00Z", "value": 10.0 }, { "timestamp": "2024-05-15T14:03:00Z", "value": 10.0 }, { "timestamp": "2024-05-15T14:04:00Z", "value": 10.0 } ] }
Query for the statement status for a given Flink SQL statement
This metric represents the status of a Flink SQL statement.
Create a file named
statement_status.jsonusing the following template. Be sure to change the compute pool ID (lfcp-XXXXXX), and the timestamp values to match your needs.{ "aggregations": [ { "metric": "io.confluent.flink/statement_status" } ], "filter": { "op": "AND", "filters": [ { "field": "resource.flink_statement.name", "op": "EQ", "value": "workspace-2025-03-25-130905-70059bd3-2462-4ee8-8fb0-d33f41e44471" },{ "field": "resource.compute_pool.id", "op": "EQ", "value": "lfcp-3mx0zj" } ] }, "granularity": "PT1M", "intervals": ["now-6h/now"], "group_by": [ "resource.flink_statement.uid", "metric.status" ], "limit": 1000 }
Submit the query as a
POSTusing the following command. Be sure to changeAPI_KEYandSECRETto match your environments.http 'https://api.telemetry.confluent.cloud/v2/metrics/cloud/query' --auth '<API_KEY>:<SECRET>' < statement_status.json
curl -X POST 'https://api.telemetry.confluent.cloud/v2/metrics/cloud/query' -u '<API_KEY>:<SECRET>' -d @statement_status.json -H 'Content-Type: application/json'
Your output should resemble:
{ "data": [ { "timestamp": "2025-03-10T09:27:00Z", "value": 1.0, "resource.flink_statement.uid": "e26f074c-0a26-465d-86b2-79ee685973f2", "metric.status": "RUNNING" }, { "timestamp": "2025-03-10T09:32:00Z", "value": 1.0, "resource.flink_statement.uid": "e26f074c-0a26-465d-86b2-79ee685973f2", "metric.status": "RUNNING" }, { "timestamp": "2025-03-10T09:34:00Z", "value": 1.0, "resource.flink_statement.uid": "e26f074c-0a26-465d-86b2-79ee685973f2", "metric.status": "RUNNING" }, { "timestamp": "2025-03-10T09:36:00Z", "value": 1.0, "resource.flink_statement.uid": "e26f074c-0a26-465d-86b2-79ee685973f2", "metric.status": "RUNNING" } ] }