Important
You are viewing documentation for an older version of Confluent Platform. For the latest, click here.
Prometheus Metrics Sink Connector for Confluent Platform¶
The Kafka Connect Prometheus Metrics Sink Connector is used to export data
from multiple Apache Kafka® topics and make them available to an endpoint which is
scraped by a Prometheus server. The connector accepts Struct and schemaless JSON
as a Kafka record’s value. The name
and values
fields are required.
The values
field refers to a metric’s values. This field is expected to be a Struct object when the Kafka record value is Struct type, and nested JSON when the Kafka record value schemaless JSON. The input Struct or schemaless JSON object used as the record’s value should resemble the following:
{
"name": string,
"type": string,
"timestamp": long,
"dimensions": {
"<dimension-1>": string,
...
},
"values": {
"<datapoint-1>": double,
"<datapoint-2>": double,
...
}
}
Prerequisites¶
The following are required to run the Kafka Connect Prometheus Metrics sink connector:
- Kafka Broker: Confluent Platform 3.3.0 or above
- Connect: Confluent Platform 4.1.0 or above
- Java 1.8
- Prometheus installed and prometheus.yml configured.
Limitations¶
The Prometheus connector has the following limitations:
- Timestamp not supported: Prometheus uses the timestamp when metrics are scraped. Timestamps in Kafka records are ignored.
- Pull-based Connector: Prometheus is a pull-based system. The connector starts an HTTP server on the worker node. All the metrics processed by the connector are made available at the worker HTTP endpoint. The endpoint is configured using the
prometheus.scrape.url
property. Theprometheus.scrape.url
HTTP endpoint must be added to theprometheus.yml
configuration file. - Metric types: Almost all metrics are interpreted as
gauge
type. An exception to this is thecounter
type used for incrementing values. Examples of these types are provided in the following section. For more information, see Prometheus Metric Types. - Buffer limit: The connector buffers metrics from Kafka topics and makes them available when Prometheus scrapes the HTTP server endpoint. Buffer size is set at 3 million metric items to give Prometheus sufficient time to read and process metrics between consecutive scrapes.
Supported Metrics and Schemas¶
The connector supports metrics of type Gauge, Meter, Histogram, and Timer. Each metric type has a different schema. Kafka topics that contain these metrics must have records that adhere to these schemas.
Gauge schema¶
{
"doubleValue": double
}
Meter schema¶
{
"count": double,
"oneMinuteRate": double,
"fiveMinuteRate": double,
"fifteenMinuteRate": double,
"meanRate": double
}
Histogram schema¶
{
"count": double,
"max": double,
"min": double,
"mean": double,
"stdDev": double,
"sum": double,
"median": double,
"percentile75th": double,
"percentile95th": double,
"percentile98th": double,
"percentile99th": double,
"percentile999th": double,
}
Timer schema¶
{
"count": double,
"oneMinuteRate": double,
"fiveMinuteRate": double,
"fifteenMinuteRate": double,
"meanRate": double,
"max": double,
"min": double,
"mean": double,
"stdDev": double,
"sum": double,
"median": double,
"percentile75th": double,
"percentile95th": double,
"percentile98th": double,
"percentile99th": double,
"percentile999th": double
}
Record Mapping¶
Each value in the values
Struct or nested JSON object is converted into Prometheus-readable form. For example below shows the original form:
{
"name": "sample_meter_metric",
"type": "meter",
"timestamp": 23480239402348234,
"dimensions": {
"service": "ec2-2312",
"method": "update"
},
"values": {
"count": 12,
"oneMinuteRate": 5.2,
"fiveMinuteRate": 4.7,
"fifteenMinuteRate": 4.9,
"meanRate": 5.1"
}
}
The example below shows the converted Prometheus-readable form:
# HELP sample_meter_metric_count
# TYPE sample_meter_metric_count counter
sample_meter_metric_count{service="ec2-2312",method="update"} 12
# HELP sample_meter_metric_oneMinuteRate
# TYPE sample_meter_metric_oneMinuteRate gauge
sample_meter_metric_oneMinuteRate{service="ec2-2312",method="update"} 5.2
# HELP sample_meter_metric_fiveMinuteRate
# TYPE sample_meter_metric_fiveMinuteRate gauge
sample_meter_metric_fiveMinuteRate{service="ec2-2312",method="update"} 4.7
# HELP sample_meter_metric_fifteenMinuteRate
# TYPE sample_meter_metric_fifteenMinuteRate gauge
sample_meter_metric_fifteenMinuteRate{service="ec2-2312",method="update"} 4.9
# HELP sample_meter_metric_meanRate
# TYPE sample_meter_metric_meanRate gauge
sample_meter_metric_meanRate{service="ec2-2312",method="update"} 5.1
Install the Prometheus Metrics Connector¶
You can install this connector by using the Confluent Hub client (recommended) or you can manually download the ZIP file.
Install the connector using Confluent Hub¶
- Prerequisite
- Confluent Hub Client must be installed. This is installed by default with Confluent Enterprise.
Navigate to your Confluent Platform installation directory and run the following command to install the latest (latest
) connector version. The connector must be installed on every machine where Connect will run.
confluent-hub install confluentinc/kafka-connect-prometheus-metrics:latest
You can install a specific version by replacing latest
with a version number. For example:
confluent-hub install confluentinc/kafka-connect-prometheus-metrics:1.0.2
Install the connector manually¶
Download and extract the ZIP file for your connector and then follow the manual connector installation instructions.
License¶
You can use this connector for a 30-day trial period without a license key.
After 30 days, this connector is available under a Confluent enterprise license. Confluent issues enterprise license keys to subscribers, along with providing enterprise-level support for Confluent Platform and your connectors. If you are a subscriber, please contact Confluent Support at support@confluent.io for more information.
See Confluent Platform license for license properties and License topic configuration for information about the license topic.
Configuration Properties¶
For a complete list of configuration properties for this connector, see Prometheus Metrics Sink Connector Configuration Properties.
Note
For an example of how to get Kafka Connect connected to Confluent Cloud, see Distributed Cluster in Connect Kafka Connect to Confluent Cloud.
Quick Start¶
In this Quick Start, you configure the Kafka Connect Prometheus Metrics sink connector to read records from Kafka topics and make them accessible from an HTTP server endpoint. Prometheus then scrapes the server endpoint.
- Prerequisites
- Confluent Platform is installed.
- The Confluent CLI is installed.
- Prometheus is installed. See Prometheus Installation.
- The prometheus.yml file is configured to monitor the HTTP server endpoint exposed by the Connect worker.
Start Confluent¶
Start the Confluent services using the following Confluent CLI command:
confluent local start
Important
Do not use the Confluent CLI in production environments.
Property-based example¶
Create a configuration file prometheus-metrics-sink.properties
with the
following content. Put this file inside the Confluent Platform installation directory. This
configuration is used typically along with standalone workers.
Note
For details about using this connector with Kafka Connect Reporter, see Connect Reporter.
name=prometheus-connector
topics=test-topic
tasks.max=1
connector.class=io.confluent.connect.prometheus.PrometheusMetricsSinkConnector
confluent.topic.bootstrap.servers=localhost:9092
prometheus.scrape.url=http://localhost:8889/metrics
key.converter=io.confluent.connect.avro.AvroConverter
key.converter.schema.registry.url=http://localhost:8081
value.converter=io.confluent.connect.avro.AvroConverter
value.converter.schema.registry.url=http://localhost:8081
reporter.result.topic.replication.factor=1
reporter.error.topic.replication.factor=1
behavior.on.error=log
Run the connector with the configuration properties file:
confluent local load prometheus-connector -- -d prometheus-metrics-sink.properties
The output should resemble:
{
"name": "prometheus-connector",
"config": {
"topics": "test-topic",
"tasks.max": "1",
"connector.class": "io.confluent.connect.prometheus.PrometheusMetricsSinkConnector",
"confluent.topic.bootstrap.servers": "localhost:9092",
"prometheus.scrape.url": "http://localhost:8889/metrics",
"key.converter": "io.confluent.connect.avro.AvroConverter",
"key.converter.schema.registry.url": "http://localhost:8081",
"value.converter": "io.confluent.connect.avro.AvroConverter",
"value.converter.schema.registry.url": "http://localhost:8081",
"reporter.result.topic.replication.factor": "1",
"reporter.error.topic.replication.factor": "1",
"behavior.on.error": "log",
"name": "prometheus-connector"
},
"tasks": [],
"type": "sink"
}
Confirm that the connector is in a RUNNING
state.
confluent local status prometheus-connector
The output should resemble:
{
"name": "prometheus-connector",
"connector": {
"state": "RUNNING",
"worker_id": "127.0.1.1:8083"
},
"tasks": [
{
"id": 0,
"state": "RUNNING",
"worker_id": "127.0.1.1:8083",
}
],
"type": "sink"
}
REST-based example¶
REST-based configuration is used for distributed workers. For more information, see the Kafka Connect REST API documentation.
Note
For details about using this connector with Kafka Connect Reporter, see Connect Reporter.
Write the following JSON to
config.json
and configure all of the required values.{ "name" : "prometheus-connector", "config" : { "topics":"test-topic", "connector.class" : "io.confluent.connect.prometheus.PrometheusMetricsSinkConnector", "tasks.max" : "1", "confluent.topic.bootstrap.servers":"localhost:9092", "prometheus.scrape.url": "http://localhost:8889/metrics", "key.converter": "io.confluent.connect.avro.AvroConverter", "key.converter.schema.registry.url": "http://localhost:8081", "value.converter": "io.confluent.connect.avro.AvroConverter", "value.converter.schema.registry.url": "http://localhost:8081", "reporter.result.topic.replication.factor": "1", "reporter.error.topic.replication.factor": "1", "behavior.on.error": "log" } }
Note
Change the
confluent.topic.bootstrap.servers
property to include your broker address(es) and change theconfluent.topic.replication.factor
to3
for production use.Enter the following curl command to post the configuration to one of the Kafka Connect workers. Change
http://localhost:8083/
to the endpoint of one of your Kafka Connect worker(s).curl -sS -X POST -H 'Content-Type: application/json' --data @config.json http://localhost:8083/connectors
Enter the following curl command to update the configuration of the connector:
curl -s -X PUT -H 'Content-Type: application/json' --data @config.json http://localhost:8083/connectors/prometheus-connector/config
Enter the following curl command to confirm that the connector is in a
RUNNING
state:curl http://localhost:8083/connectors/prometheus-connector/status | jq
The output should resemble:
{ "name": "prometheus-connector", "connector": { "state": "RUNNING", "worker_id": "127.0.1.1:8083" }, "tasks": [ { "id": 0, "state": "RUNNING", "worker_id": "127.0.1.1:8083", } ], "type": "sink" }
Search for the endpoint
/connectors/prometheus-connector/status
, the state of the connector and tasks should have status asRUNNING
.Use the following command to produce Avro data to the Kafka topic:
test-topic
:./bin/kafka-avro-console-producer \ --broker-list localhost:9092 --topic test-topic \ --property value.schema='{"name": "metric","type": "record","fields": [{"name": "name","type": "string"},{"name": "type","type": "string"},{"name": "timestamp","type": "long"},{"name": "values","type": {"name": "values","type": "record","fields": [{"name":"doubleValue", "type": "double"}]}}]}'
While the console is waiting for the input, use the following three records and paste each of them on the console.
{"name":"kafka_gaugeMetric1", "type":"gauge","timestamp": 1576236481,"values": {"doubleValue": 5.639623848362502}} {"name":"kafka_gaugeMetric1", "type":"gauge","timestamp": 1576236481,"values": {"doubleValue": 5.639623848362502}} {"name":"kafka_gaugeMetric2", "type":"gauge","timestamp": 1576236481,"values": {"doubleValue": 5.639623848362502}} {"name":"kafka_gaugeMetric3", "type":"gauge","timestamp": 1576236481,"values": {"doubleValue": 5.639623848362502}}
Check the Prometheus portal on
localhost:9090
and verify that metrics were created.
Security¶
The Prometheus connector can create a secure server endpoint. For instructions, see Configure Prometheus Connector with Security.