Prometheus Metrics Sink Connector for Confluent Platform
The Kafka Connect Prometheus Metrics Sink connector exports data from multiple
Apache Kafka® topics and makes the data available to an endpoint which is scraped by a
Prometheus server. The connector accepts Struct and schemaless JSON as a Kafka
record’s value. The name
and values
fields are required.
The values
field refers to a metric’s values. This field is expected to be a Struct object when the Kafka record value is Struct type, and nested JSON when the Kafka record value schemaless JSON. The input Struct or schemaless JSON object used as the record’s value should resemble the following:
{
"name": string,
"type": string,
"timestamp": long,
"dimensions": {
"<dimension-1>": string,
...
},
"values": {
"<datapoint-1>": double,
"<datapoint-2>": double,
...
}
}
Prerequisites
The following are required to run the Kafka Connect Prometheus Metrics sink connector:
- Kafka Broker: Confluent Platform 3.3.0 or above
- Connect: Confluent Platform 4.1.0 or above
- Java 1.8
- Prometheus installed and prometheus.yml configured.
Limitations
The Prometheus connector has the following limitations:
- Timestamp not supported: Prometheus uses the timestamp when metrics are scraped. Timestamps in Kafka records are ignored.
- Pull-based Connector: Prometheus is a pull-based system. The connector starts an HTTP server on the worker node. All the metrics processed by the connector are made available at the worker HTTP endpoint. The endpoint is configured using the
prometheus.listener.url
property. The prometheus.listener.url
HTTP endpoint must be added to the prometheus.yml
configuration file.
- Metric types: Almost all metrics are interpreted as
gauge
type. An exception to this is the counter
type used for incrementing values. Examples of these types are provided in the following section. For more information, see Prometheus Metric Types.
- Buffer limit: The connector buffers metrics from Kafka topics and makes them available when Prometheus scrapes the HTTP server endpoint. Buffer size is set at 3 million metric items to give Prometheus sufficient time to read and process metrics between consecutive scrapes.
Supported Metrics and Schemas
The connector supports metrics of type Gauge, Meter, Histogram, and Timer. Each metric type has a different schema. Kafka topics that contain these metrics must have records that adhere to these schemas.
Gauge schema
{
"doubleValue": double
}
Meter schema
{
"count": double,
"oneMinuteRate": double,
"fiveMinuteRate": double,
"fifteenMinuteRate": double,
"meanRate": double
}
Histogram schema
{
"count": double,
"max": double,
"min": double,
"mean": double,
"stdDev": double,
"sum": double,
"median": double,
"percentile75th": double,
"percentile95th": double,
"percentile98th": double,
"percentile99th": double,
"percentile999th": double,
}
Timer schema
{
"count": double,
"oneMinuteRate": double,
"fiveMinuteRate": double,
"fifteenMinuteRate": double,
"meanRate": double,
"max": double,
"min": double,
"mean": double,
"stdDev": double,
"sum": double,
"median": double,
"percentile75th": double,
"percentile95th": double,
"percentile98th": double,
"percentile99th": double,
"percentile999th": double
}
Record Mapping
Each value in the values
Struct or nested JSON object is converted into Prometheus-readable form. For example below shows the original form:
{
"name": "sample_meter_metric",
"type": "meter",
"timestamp": 23480239402348234,
"dimensions": {
"service": "ec2-2312",
"method": "update"
},
"values": {
"count": 12,
"oneMinuteRate": 5.2,
"fiveMinuteRate": 4.7,
"fifteenMinuteRate": 4.9,
"meanRate": 5.1"
}
}
The example below shows the converted Prometheus-readable form:
# HELP sample_meter_metric_count
# TYPE sample_meter_metric_count counter
sample_meter_metric_count{service="ec2-2312",method="update"} 12
# HELP sample_meter_metric_oneMinuteRate
# TYPE sample_meter_metric_oneMinuteRate gauge
sample_meter_metric_oneMinuteRate{service="ec2-2312",method="update"} 5.2
# HELP sample_meter_metric_fiveMinuteRate
# TYPE sample_meter_metric_fiveMinuteRate gauge
sample_meter_metric_fiveMinuteRate{service="ec2-2312",method="update"} 4.7
# HELP sample_meter_metric_fifteenMinuteRate
# TYPE sample_meter_metric_fifteenMinuteRate gauge
sample_meter_metric_fifteenMinuteRate{service="ec2-2312",method="update"} 4.9
# HELP sample_meter_metric_meanRate
# TYPE sample_meter_metric_meanRate gauge
sample_meter_metric_meanRate{service="ec2-2312",method="update"} 5.1
Install the Prometheus Metrics Connector
You can install this connector by using the instructions or you can
manually download the ZIP file.
Install the connector using Confluent Hub
- Prerequisite
- Confluent Hub Client must be installed. This is installed by default with Confluent Enterprise.
Navigate to your Confluent Platform installation directory and run the following command to install the latest (latest
) connector version. The connector must be installed on every machine where Connect will run.
confluent-hub install confluentinc/kafka-connect-prometheus-metrics:latest
You can install a specific version by replacing latest
with a version number. For example:
confluent-hub install confluentinc/kafka-connect-prometheus-metrics:1.0.2
Quick Start
In this Quick Start, you configure the Kafka Connect Prometheus Metrics sink
connector to read records from Kafka topics and make them accessible from an HTTP
server endpoint. Prometheus then scrapes the server endpoint.
- Prerequisites
-
Start Confluent
Start the Confluent services using the following Confluent CLI command:
Important
Do not use the Confluent CLI in production environments.
Property-based example
Create a configuration file prometheus-metrics-sink.properties
with the
following content. Put this file inside the Confluent Platform installation directory. This
configuration is used typically along with standalone workers.
Note
For details about using this connector with Kafka Connect Reporter, see Connect Reporter.
name=prometheus-connector
topics=test-topic
tasks.max=1
connector.class=io.confluent.connect.prometheus.PrometheusMetricsSinkConnector
confluent.topic.bootstrap.servers=localhost:9092
prometheus.listener.url=http://localhost:8889/metrics
key.converter=io.confluent.connect.avro.AvroConverter
key.converter.schema.registry.url=http://localhost:8081
value.converter=io.confluent.connect.avro.AvroConverter
value.converter.schema.registry.url=http://localhost:8081
reporter.result.topic.replication.factor=1
reporter.error.topic.replication.factor=1
behavior.on.error=log
Run the connector with the configuration properties file:
confluent local load prometheus-connector --config prometheus-metrics-sink.properties
The output should resemble:
{
"name": "prometheus-connector",
"config": {
"topics": "test-topic",
"tasks.max": "1",
"connector.class": "io.confluent.connect.prometheus.PrometheusMetricsSinkConnector",
"confluent.topic.bootstrap.servers": "localhost:9092",
"prometheus.listener.url": "http://localhost:8889/metrics",
"key.converter": "io.confluent.connect.avro.AvroConverter",
"key.converter.schema.registry.url": "http://localhost:8081",
"value.converter": "io.confluent.connect.avro.AvroConverter",
"value.converter.schema.registry.url": "http://localhost:8081",
"reporter.result.topic.replication.factor": "1",
"reporter.error.topic.replication.factor": "1",
"behavior.on.error": "log",
"name": "prometheus-connector"
},
"tasks": [],
"type": "sink"
}
Confirm that the connector is in a RUNNING
state.
confluent local status prometheus-connector
The output should resemble:
{
"name": "prometheus-connector",
"connector": {
"state": "RUNNING",
"worker_id": "127.0.1.1:8083"
},
"tasks": [
{
"id": 0,
"state": "RUNNING",
"worker_id": "127.0.1.1:8083",
}
],
"type": "sink"
}
REST-based example
REST-based configuration is used for distributed workers. For more information, see the Kafka Connect REST API documentation.
Note
For details about using this connector with Kafka Connect Reporter, see Connect Reporter.
Write the following JSON to config.json
and configure all of the required values.
{
"name" : "prometheus-connector",
"config" : {
"topics":"test-topic",
"connector.class" : "io.confluent.connect.prometheus.PrometheusMetricsSinkConnector",
"tasks.max" : "1",
"confluent.topic.bootstrap.servers":"localhost:9092",
"prometheus.listener.url": "http://localhost:8889/metrics",
"key.converter": "io.confluent.connect.avro.AvroConverter",
"key.converter.schema.registry.url": "http://localhost:8081",
"value.converter": "io.confluent.connect.avro.AvroConverter",
"value.converter.schema.registry.url": "http://localhost:8081",
"reporter.result.topic.replication.factor": "1",
"reporter.error.topic.replication.factor": "1",
"behavior.on.error": "log"
}
}
Note
Change the confluent.topic.bootstrap.servers
property to include your broker address(es) and change the confluent.topic.replication.factor
to 3
for production use.
Enter the following curl command to post the configuration to one of the Kafka Connect workers. Change http://localhost:8083/
to the endpoint of one of your Kafka Connect worker(s).
curl -sS -X POST -H 'Content-Type: application/json' --data @config.json http://localhost:8083/connectors
Enter the following curl command to update the configuration of the connector:
curl -s -X PUT -H 'Content-Type: application/json' --data @config.json http://localhost:8083/connectors/prometheus-connector/config
Enter the following curl command to confirm that the connector is in a RUNNING
state:
curl http://localhost:8083/connectors/prometheus-connector/status | jq
The output should resemble:
{
"name": "prometheus-connector",
"connector": {
"state": "RUNNING",
"worker_id": "127.0.1.1:8083"
},
"tasks": [
{
"id": 0,
"state": "RUNNING",
"worker_id": "127.0.1.1:8083",
}
],
"type": "sink"
}
Search for the endpoint /connectors/prometheus-connector/status
, the state of the connector and tasks should have status as RUNNING
.
Use the following command to produce Avro data to the Kafka topic: test-topic
:
./bin/kafka-avro-console-producer \
--broker-list localhost:9092 --topic test-topic \
--property value.schema='{"name": "metric","type": "record","fields": [{"name": "name","type": "string"},{"name": "type","type": "string"},{"name": "timestamp","type": "long"},{"name": "values","type": {"name": "values","type": "record","fields": [{"name":"doubleValue", "type": "double"}]}}]}'
While the console is waiting for the input, use the following three records and paste each of them on the console.
{"name":"kafka_gaugeMetric1", "type":"gauge","timestamp": 1576236481,"values": {"doubleValue": 5.639623848362502}}
{"name":"kafka_gaugeMetric1", "type":"gauge","timestamp": 1576236481,"values": {"doubleValue": 5.639623848362502}}
{"name":"kafka_gaugeMetric2", "type":"gauge","timestamp": 1576236481,"values": {"doubleValue": 5.639623848362502}}
{"name":"kafka_gaugeMetric3", "type":"gauge","timestamp": 1576236481,"values": {"doubleValue": 5.639623848362502}}
Check the Prometheus portal on localhost:9090
and verify that metrics were created.