Prometheus Metrics Sink Connector for Confluent Platform¶
The Prometheus Metrics Sink connector exports data from multiple Apache Kafka® topics
and makes the data available to an endpoint which is scraped by a Prometheus
server. The connector accepts Struct and schemaless JSON as a Kafka record’s
value. The name
and values
fields are required.
The values
field refers to a metric’s values. This field is expected to be a
Struct object when the Kafka record value is Struct type, and nested JSON when
the Kafka record value schemaless JSON. The input Struct or schemaless JSON
object used as the record’s value should resemble the following:
{
"name": string,
"type": string,
"timestamp": long,
"dimensions": {
"<dimension-1>": string,
...
},
"values": {
"<datapoint-1>": double,
"<datapoint-2>": double,
...
}
}
Features¶
The Prometheus Sink connector includes the following features:
At least once delivery¶
This connector guarantees that records are delivered at least once to the Kafka topic. If the connector restarts, there may be some duplicate records in the Kafka topic.
Dead Letter Queue¶
This connector supports the Dead Letter Queue (DLQ) functionality. For information about accessing and using the DLQ, see Confluent Platform Dead Letter Queue.
Multiple tasks¶
The Prometheus Metrics Sink connector supports running one or more tasks. You
can specify the number of tasks in the tasks.max
configuration parameter.
Multiple tasks may improve performance when moving a large amount of data.
Metrics and schemas¶
The connector supports metrics of type Gauge, Meter, Histogram, and Timer. Each metric type has a different schema. Kafka topics that contain these metrics must have records that adhere to these schemas.
Gauge schema¶
{
"doubleValue": double
}
Meter schema¶
{
"count": double,
"oneMinuteRate": double,
"fiveMinuteRate": double,
"fifteenMinuteRate": double,
"meanRate": double
}
Histogram schema¶
{
"count": double,
"max": double,
"min": double,
"mean": double,
"stdDev": double,
"sum": double,
"median": double,
"percentile75th": double,
"percentile95th": double,
"percentile98th": double,
"percentile99th": double,
"percentile999th": double,
}
Timer schema¶
{
"count": double,
"oneMinuteRate": double,
"fiveMinuteRate": double,
"fifteenMinuteRate": double,
"meanRate": double,
"max": double,
"min": double,
"mean": double,
"stdDev": double,
"sum": double,
"median": double,
"percentile75th": double,
"percentile95th": double,
"percentile98th": double,
"percentile99th": double,
"percentile999th": double
}
Record mapping¶
Each value in the values
Struct or nested JSON object is converted into
Prometheus-readable form. For example below shows the original form:
{
"name": "sample_meter_metric",
"type": "meter",
"timestamp": 23480239402348234,
"dimensions": {
"service": "ec2-2312",
"method": "update"
},
"values": {
"count": 12,
"oneMinuteRate": 5.2,
"fiveMinuteRate": 4.7,
"fifteenMinuteRate": 4.9,
"meanRate": 5.1"
}
}
The following example shows the converted Prometheus-readable form:
# HELP sample_meter_metric_count
# TYPE sample_meter_metric_count counter
sample_meter_metric_count{service="ec2-2312",method="update"} 12
# HELP sample_meter_metric_oneMinuteRate
# TYPE sample_meter_metric_oneMinuteRate gauge
sample_meter_metric_oneMinuteRate{service="ec2-2312",method="update"} 5.2
# HELP sample_meter_metric_fiveMinuteRate
# TYPE sample_meter_metric_fiveMinuteRate gauge
sample_meter_metric_fiveMinuteRate{service="ec2-2312",method="update"} 4.7
# HELP sample_meter_metric_fifteenMinuteRate
# TYPE sample_meter_metric_fifteenMinuteRate gauge
sample_meter_metric_fifteenMinuteRate{service="ec2-2312",method="update"} 4.9
# HELP sample_meter_metric_meanRate
# TYPE sample_meter_metric_meanRate gauge
sample_meter_metric_meanRate{service="ec2-2312",method="update"} 5.1
Prerequisites¶
The following are required to run the Prometheus Metrics sink connector:
- Kafka Broker: Confluent Platform 3.3.0 or above
- Connect: Confluent Platform 4.1.0 or above
- Java 1.8
- Prometheus installed and prometheus.yml configured.
Limitations¶
The Prometheus connector has the following limitations:
- Timestamp not supported: Prometheus uses the timestamp when metrics are scraped. Timestamps in Kafka records are ignored.
- Pull-based Connector: Prometheus is a pull-based system. The connector starts an HTTP server on the worker node. All the metrics processed by the connector are made available at the worker HTTP endpoint. The endpoint is configured using the
prometheus.listener.url
property. Theprometheus.listener.url
HTTP endpoint must be added to theprometheus.yml
configuration file. - Metric types: Almost all metrics are interpreted as
gauge
type. An exception to this is thecounter
type used for incrementing values. Examples of these types are provided in the following section. For more information, see Prometheus Metric Types. - Buffer limit: The connector buffers metrics from Kafka topics and makes them available when Prometheus scrapes the HTTP server endpoint. Buffer size is set at 3 million metric items to give Prometheus sufficient time to read and process metrics between consecutive scrapes.
Install the Prometheus Metrics Connector¶
You can install this connector by using the confluent connect plugin install command, or by manually downloading the ZIP file.
Prerequisites¶
You must install the connector on every machine where Connect will run.
An installation of the latest (
latest
) connector version.To install the
latest
connector version, navigate to your Confluent Platform installation directory and run the following command:confluent connect plugin install confluentinc/kafka-connect-prometheus-metrics:latest
You can install a specific version by replacing
latest
with a version number as shown in the following example:confluent connect plugin install confluentinc/kafka-connect-prometheus-metrics:1.1.1-preview
Install the connector manually¶
Download and extract the ZIP file for your connector and then follow the manual connector installation instructions.
License¶
You can use this connector for a 30-day trial period without a license key.
After 30 days, you must purchase a connector subscription which includes Confluent enterprise license keys to subscribers, along with enterprise-level support for Confluent Platform and your connectors. If you are a subscriber, you can contact Confluent Support at support@confluent.io for more information.
See Confluent license properties for license properties and information about the license topic.
Configuration Properties¶
For a complete list of configuration properties for this connector, see Configuration Reference for Prometheus Metrics Sink Connector for Confluent Platform.
For an example of how to get Kafka Connect connected to Confluent Cloud, see Connect Self-Managed Kafka Connect to Confluent Cloud.
Quick Start¶
In this Quick Start, you configure the Prometheus Metrics sink connector to read records from Kafka topics and make them accessible from an HTTP server endpoint. Prometheus then scrapes the server endpoint.
Prerequisites¶
- Confluent Platform is installed.
- The Confluent CLI is installed.
- Prometheus is installed. See Prometheus Installation.
- The prometheus.yml file is configured to monitor the HTTP server endpoint exposed by the Connect worker.
Start Confluent¶
Start the Confluent services using the following Confluent CLI command:
confluent local start
Important
Do not use the Confluent CLI in production environments.
Property-based example¶
Create a configuration file prometheus-metrics-sink.properties
with the
following content. Put this file inside the Confluent Platform installation directory. This
configuration is used typically along with standalone
workers.
Note
For details about using this connector with Kafka Connect Reporter, see Connect Reporter.
name=prometheus-connector
topics=test-topic
tasks.max=1
connector.class=io.confluent.connect.prometheus.PrometheusMetricsSinkConnector
confluent.topic.bootstrap.servers=localhost:9092
prometheus.listener.url=http://localhost:8889/metrics
key.converter=io.confluent.connect.avro.AvroConverter
key.converter.schema.registry.url=http://localhost:8081
value.converter=io.confluent.connect.avro.AvroConverter
value.converter.schema.registry.url=http://localhost:8081
reporter.result.topic.replication.factor=1
reporter.error.topic.replication.factor=1
behavior.on.error=log
Run the connector with the configuration properties file:
confluent local load prometheus-connector --config prometheus-metrics-sink.properties
The output should resemble:
{
"name": "prometheus-connector",
"config": {
"topics": "test-topic",
"tasks.max": "1",
"connector.class": "io.confluent.connect.prometheus.PrometheusMetricsSinkConnector",
"confluent.topic.bootstrap.servers": "localhost:9092",
"prometheus.listener.url": "http://localhost:8889/metrics",
"key.converter": "io.confluent.connect.avro.AvroConverter",
"key.converter.schema.registry.url": "http://localhost:8081",
"value.converter": "io.confluent.connect.avro.AvroConverter",
"value.converter.schema.registry.url": "http://localhost:8081",
"reporter.result.topic.replication.factor": "1",
"reporter.error.topic.replication.factor": "1",
"behavior.on.error": "log",
"name": "prometheus-connector"
},
"tasks": [],
"type": "sink"
}
Confirm that the connector is in a RUNNING
state.
confluent local status prometheus-connector
The output should resemble:
{
"name": "prometheus-connector",
"connector": {
"state": "RUNNING",
"worker_id": "127.0.1.1:8083"
},
"tasks": [
{
"id": 0,
"state": "RUNNING",
"worker_id": "127.0.1.1:8083",
}
],
"type": "sink"
}
REST-based example¶
REST-based configuration is used for distributed workers. For more information, see the Kafka Connect REST API documentation.
Note
For details about using this connector with Kafka Connect Reporter, see Connect Reporter.
Write the following JSON to
config.json
and configure all of the required values.{ "name" : "prometheus-connector", "config" : { "topics":"test-topic", "connector.class" : "io.confluent.connect.prometheus.PrometheusMetricsSinkConnector", "tasks.max" : "1", "confluent.topic.bootstrap.servers":"localhost:9092", "prometheus.listener.url": "http://localhost:8889/metrics", "key.converter": "io.confluent.connect.avro.AvroConverter", "key.converter.schema.registry.url": "http://localhost:8081", "value.converter": "io.confluent.connect.avro.AvroConverter", "value.converter.schema.registry.url": "http://localhost:8081", "reporter.result.topic.replication.factor": "1", "reporter.error.topic.replication.factor": "1", "behavior.on.error": "log" } }
Note
Change the
confluent.topic.bootstrap.servers
property to include your broker address(es) and change theconfluent.topic.replication.factor
to3
for production use.Enter the following curl command to post the configuration to one of the Kafka Connect workers. Change
http://localhost:8083/
to the endpoint of one of your Kafka Connect worker(s).curl -sS -X POST -H 'Content-Type: application/json' --data @config.json http://localhost:8083/connectors
Enter the following curl command to update the configuration of the connector:
curl -s -X PUT -H 'Content-Type: application/json' --data @config.json http://localhost:8083/connectors/prometheus-connector/config
Enter the following curl command to confirm that the connector is in a
RUNNING
state:curl http://localhost:8083/connectors/prometheus-connector/status | jq
The output should resemble:
{ "name": "prometheus-connector", "connector": { "state": "RUNNING", "worker_id": "127.0.1.1:8083" }, "tasks": [ { "id": 0, "state": "RUNNING", "worker_id": "127.0.1.1:8083", } ], "type": "sink" }
Search for the endpoint
/connectors/prometheus-connector/status
, the state of the connector and tasks should have status asRUNNING
.Use the following command to produce Avro data to the Kafka topic:
test-topic
:./bin/kafka-avro-console-producer \ --broker-list localhost:9092 --topic test-topic \ --property value.schema='{"name": "metric","type": "record","fields": [{"name": "name","type": "string"},{"name": "type","type": "string"},{"name": "timestamp","type": "long"},{"name": "values","type": {"name": "values","type": "record","fields": [{"name":"doubleValue", "type": "double"}]}}]}'
While the console is waiting for the input, use the following three records and paste each of them on the console.
{"name":"kafka_gaugeMetric1", "type":"gauge","timestamp": 1576236481,"values": {"doubleValue": 5.639623848362502}} {"name":"kafka_gaugeMetric1", "type":"gauge","timestamp": 1576236481,"values": {"doubleValue": 5.639623848362502}} {"name":"kafka_gaugeMetric2", "type":"gauge","timestamp": 1576236481,"values": {"doubleValue": 5.639623848362502}} {"name":"kafka_gaugeMetric3", "type":"gauge","timestamp": 1576236481,"values": {"doubleValue": 5.639623848362502}}
Check the Prometheus portal on
localhost:9090
and verify that metrics were created.
Example Prometheus Config File for Multiple Tasks¶
This connector can spin up multiple tasks. Tasks that come up in a single worker will expose their
metrics through a single endpoint specified by prometheus.listener.url
configuration. If there
are workers that do not host the connector task, Prometheus will skip collecting metrics from that
worker’s endpoint.
In a multi-cluster environment, assuming the Connect clusters are on hosts
c1.svc.local
, c2.svc.local
and c3.svc.local
, and assuming prometheus.listener.url
is http://localhost:8889/metrics
, the prometheus.yml
configuration file should
look similar to the following:
# global config
global:
scrape_interval: 10s # Set the scrape interval to every 15 seconds. Default is every 1 minute.
evaluation_interval: 20s # Evaluate rules every 15 seconds. The default is every 1 minute.
# scrape_timeout is set to the global default (10s).
scrape_configs:
# The job name is added as a label `job=<job_name>` to any timeseries scraped from this config.
- job_name: 'prometheus'
static_configs:
- targets: ['localhost:9090']
# The job name is added as a label `job=<job_name>` to any timeseries scraped from this config.
- job_name: 'connect'
static_configs:
- targets: ['http://c1.svc.local:8889/metrics', 'http://c2.svc.local:8889/metrics','http://c3.svc.local:8889/metrics']
Security¶
The Prometheus connector can create a secure server endpoint. For instructions, see Configure Prometheus Connector with Security.