Prometheus Metrics Sink Connector for Confluent Platform

The Prometheus Metrics Sink connector exports data from multiple Apache Kafka® topics and makes the data available to an endpoint which is scraped by a Prometheus server. The connector accepts Struct and schemaless JSON as a Kafka record’s value. The name and values fields are required.

The values field refers to a metric’s values. This field is expected to be a Struct object when the Kafka record value is Struct type, and nested JSON when the Kafka record value schemaless JSON. The input Struct or schemaless JSON object used as the record’s value should resemble the following:

{
  "name": string,
  "type": string,
  "timestamp": long,
  "dimensions": {
    "<dimension-1>": string,
   ...
  },
  "values": {
    "<datapoint-1>": double,
    "<datapoint-2>": double,
    ...
  }
}

Features

The Prometheus Sink connector includes the following features:

At least once delivery

This connector guarantees that records are delivered at least once to the Kafka topic. If the connector restarts, there may be some duplicate records in the Kafka topic.

Dead Letter Queue

This connector supports the Dead Letter Queue (DLQ) functionality. For information about accessing and using the DLQ, see Confluent Platform Dead Letter Queue.

Multiple tasks

The Prometheus Metrics Sink connector supports running one or more tasks. You can specify the number of tasks in the tasks.max configuration parameter. Multiple tasks may improve performance when moving a large amount of data.

Metrics and schemas

The connector supports metrics of type Gauge, Meter, Histogram, and Timer. Each metric type has a different schema. Kafka topics that contain these metrics must have records that adhere to these schemas.

Gauge schema

{
  "doubleValue": double
}

Meter schema

{
  "count": double,
  "oneMinuteRate": double,
  "fiveMinuteRate": double,
  "fifteenMinuteRate": double,
  "meanRate": double
}

Histogram schema

{
  "count": double,
  "max": double,
  "min": double,
  "mean": double,
  "stdDev": double,
  "sum": double,
  "median": double,
  "percentile75th": double,
  "percentile95th": double,
  "percentile98th": double,
  "percentile99th": double,
  "percentile999th": double,
}

Timer schema

{
  "count": double,
  "oneMinuteRate": double,
  "fiveMinuteRate": double,
  "fifteenMinuteRate": double,
  "meanRate": double,
  "max": double,
  "min": double,
  "mean": double,
  "stdDev": double,
  "sum": double,
  "median": double,
  "percentile75th": double,
  "percentile95th": double,
  "percentile98th": double,
  "percentile99th": double,
  "percentile999th": double
}

Record mapping

Each value in the values Struct or nested JSON object is converted into Prometheus-readable form. For example below shows the original form:

{
  "name": "sample_meter_metric",
  "type": "meter",
  "timestamp": 23480239402348234,
  "dimensions": {
    "service": "ec2-2312",
    "method": "update"
  },
  "values": {
    "count": 12,
    "oneMinuteRate": 5.2,
    "fiveMinuteRate": 4.7,
    "fifteenMinuteRate": 4.9,
    "meanRate": 5.1"
  }
}

The following example shows the converted Prometheus-readable form:

# HELP sample_meter_metric_count
# TYPE sample_meter_metric_count counter
sample_meter_metric_count{service="ec2-2312",method="update"} 12
# HELP sample_meter_metric_oneMinuteRate
# TYPE sample_meter_metric_oneMinuteRate gauge
sample_meter_metric_oneMinuteRate{service="ec2-2312",method="update"} 5.2
# HELP sample_meter_metric_fiveMinuteRate
# TYPE sample_meter_metric_fiveMinuteRate gauge
sample_meter_metric_fiveMinuteRate{service="ec2-2312",method="update"} 4.7
# HELP sample_meter_metric_fifteenMinuteRate
# TYPE sample_meter_metric_fifteenMinuteRate gauge
sample_meter_metric_fifteenMinuteRate{service="ec2-2312",method="update"} 4.9
# HELP sample_meter_metric_meanRate
# TYPE sample_meter_metric_meanRate gauge
sample_meter_metric_meanRate{service="ec2-2312",method="update"} 5.1

Prerequisites

The following are required to run the Prometheus Metrics sink connector:

  • Kafka Broker: Confluent Platform 3.3.0 or above
  • Connect: Confluent Platform 4.1.0 or above
  • Java 1.8
  • Prometheus installed and prometheus.yml configured.

Limitations

The Prometheus connector has the following limitations:

  • Timestamp not supported: Prometheus uses the timestamp when metrics are scraped. Timestamps in Kafka records are ignored.
  • Pull-based Connector: Prometheus is a pull-based system. The connector starts an HTTP server on the worker node. All the metrics processed by the connector are made available at the worker HTTP endpoint. The endpoint is configured using the prometheus.listener.url property. The prometheus.listener.url HTTP endpoint must be added to the prometheus.yml configuration file.
  • Metric types: Almost all metrics are interpreted as gauge type. An exception to this is the counter type used for incrementing values. Examples of these types are provided in the following section. For more information, see Prometheus Metric Types.
  • Buffer limit: The connector buffers metrics from Kafka topics and makes them available when Prometheus scrapes the HTTP server endpoint. Buffer size is set at 3 million metric items to give Prometheus sufficient time to read and process metrics between consecutive scrapes.

Install the Prometheus Metrics Connector

You can install this connector by using the confluent connect plugin install command, or by manually downloading the ZIP file.

Prerequisites

  • You must install the connector on every machine where Connect will run.

  • An installation of the latest (latest) connector version.

    To install the latest connector version, navigate to your Confluent Platform installation directory and run the following command:

    confluent connect plugin install confluentinc/kafka-connect-prometheus-metrics:latest
    

    You can install a specific version by replacing latest with a version number as shown in the following example:

    confluent connect plugin install confluentinc/kafka-connect-prometheus-metrics:1.1.1-preview
    

Install the connector manually

Download and extract the ZIP file for your connector and then follow the manual connector installation instructions.

License

You can use this connector for a 30-day trial period without a license key.

After 30 days, you must purchase a connector subscription which includes Confluent enterprise license keys to subscribers, along with enterprise-level support for Confluent Platform and your connectors. If you are a subscriber, you can contact Confluent Support at support@confluent.io for more information.

See Confluent license properties for license properties and information about the license topic.

Configuration Properties

For a complete list of configuration properties for this connector, see Configuration Reference for Prometheus Metrics Sink Connector for Confluent Platform.

For an example of how to get Kafka Connect connected to Confluent Cloud, see Connect Self-Managed Kafka Connect to Confluent Cloud.

Quick Start

In this Quick Start, you configure the Prometheus Metrics sink connector to read records from Kafka topics and make them accessible from an HTTP server endpoint. Prometheus then scrapes the server endpoint.

Prerequisites

Start Confluent

Start the Confluent services using the following Confluent CLI command:

confluent local start

Important

Do not use the Confluent CLI in production environments.

Property-based example

Create a configuration file prometheus-metrics-sink.properties with the following content. Put this file inside the Confluent Platform installation directory. This configuration is used typically along with standalone workers.

Note

For details about using this connector with Kafka Connect Reporter, see Connect Reporter.

name=prometheus-connector
topics=test-topic
tasks.max=1
connector.class=io.confluent.connect.prometheus.PrometheusMetricsSinkConnector
confluent.topic.bootstrap.servers=localhost:9092
prometheus.listener.url=http://localhost:8889/metrics
key.converter=io.confluent.connect.avro.AvroConverter
key.converter.schema.registry.url=http://localhost:8081
value.converter=io.confluent.connect.avro.AvroConverter
value.converter.schema.registry.url=http://localhost:8081
reporter.result.topic.replication.factor=1
reporter.error.topic.replication.factor=1
behavior.on.error=log

Run the connector with the configuration properties file:

confluent local load prometheus-connector --config prometheus-metrics-sink.properties

The output should resemble:

{
   "name": "prometheus-connector",
   "config": {
   "topics": "test-topic",
   "tasks.max": "1",
   "connector.class": "io.confluent.connect.prometheus.PrometheusMetricsSinkConnector",
   "confluent.topic.bootstrap.servers": "localhost:9092",
   "prometheus.listener.url": "http://localhost:8889/metrics",
   "key.converter": "io.confluent.connect.avro.AvroConverter",
   "key.converter.schema.registry.url": "http://localhost:8081",
   "value.converter": "io.confluent.connect.avro.AvroConverter",
   "value.converter.schema.registry.url": "http://localhost:8081",
   "reporter.result.topic.replication.factor": "1",
   "reporter.error.topic.replication.factor": "1",
   "behavior.on.error": "log",
   "name": "prometheus-connector"
 },
 "tasks": [],
 "type": "sink"
}

Confirm that the connector is in a RUNNING state.

confluent local status prometheus-connector

The output should resemble:

{
  "name": "prometheus-connector",
  "connector": {
  "state": "RUNNING",
  "worker_id": "127.0.1.1:8083"
},
  "tasks": [
  {
  "id": 0,
  "state": "RUNNING",
  "worker_id": "127.0.1.1:8083",
  }
],
"type": "sink"
}

REST-based example

REST-based configuration is used for distributed workers. For more information, see the Kafka Connect REST API documentation.

Note

For details about using this connector with Kafka Connect Reporter, see Connect Reporter.

  1. Write the following JSON to config.json and configure all of the required values.

    {
      "name" : "prometheus-connector",
      "config" : {
        "topics":"test-topic",
        "connector.class" : "io.confluent.connect.prometheus.PrometheusMetricsSinkConnector",
        "tasks.max" : "1",
        "confluent.topic.bootstrap.servers":"localhost:9092",
        "prometheus.listener.url": "http://localhost:8889/metrics",
        "key.converter": "io.confluent.connect.avro.AvroConverter",
        "key.converter.schema.registry.url": "http://localhost:8081",
        "value.converter": "io.confluent.connect.avro.AvroConverter",
        "value.converter.schema.registry.url": "http://localhost:8081",
        "reporter.result.topic.replication.factor": "1",
        "reporter.error.topic.replication.factor": "1",
        "behavior.on.error": "log"
      }
    }
    

    Note

    Change the confluent.topic.bootstrap.servers property to include your broker address(es) and change the confluent.topic.replication.factor to 3 for production use.

  2. Enter the following curl command to post the configuration to one of the Kafka Connect workers. Change http://localhost:8083/ to the endpoint of one of your Kafka Connect worker(s).

    curl -sS -X POST -H 'Content-Type: application/json' --data @config.json http://localhost:8083/connectors
    
  3. Enter the following curl command to update the configuration of the connector:

    curl -s -X PUT -H 'Content-Type: application/json' --data @config.json http://localhost:8083/connectors/prometheus-connector/config
    
  4. Enter the following curl command to confirm that the connector is in a RUNNING state:

    curl http://localhost:8083/connectors/prometheus-connector/status | jq
    

    The output should resemble:

    {
      "name": "prometheus-connector",
      "connector": {
      "state": "RUNNING",
      "worker_id": "127.0.1.1:8083"
    },
      "tasks": [
      {
      "id": 0,
      "state": "RUNNING",
      "worker_id": "127.0.1.1:8083",
      }
    ],
    "type": "sink"
    }
    

    Search for the endpoint /connectors/prometheus-connector/status, the state of the connector and tasks should have status as RUNNING.

  5. Use the following command to produce Avro data to the Kafka topic: test-topic:

     ./bin/kafka-avro-console-producer \
    --broker-list localhost:9092 --topic test-topic \
    --property value.schema='{"name": "metric","type": "record","fields": [{"name": "name","type": "string"},{"name": "type","type": "string"},{"name": "timestamp","type": "long"},{"name": "values","type": {"name": "values","type": "record","fields": [{"name":"doubleValue", "type": "double"}]}}]}'
    

    While the console is waiting for the input, use the following three records and paste each of them on the console.

    {"name":"kafka_gaugeMetric1", "type":"gauge","timestamp": 1576236481,"values": {"doubleValue": 5.639623848362502}}
    {"name":"kafka_gaugeMetric1", "type":"gauge","timestamp": 1576236481,"values": {"doubleValue": 5.639623848362502}}
    {"name":"kafka_gaugeMetric2", "type":"gauge","timestamp": 1576236481,"values": {"doubleValue": 5.639623848362502}}
    {"name":"kafka_gaugeMetric3", "type":"gauge","timestamp": 1576236481,"values": {"doubleValue": 5.639623848362502}}
    
  6. Check the Prometheus portal on localhost:9090 and verify that metrics were created.

Example Prometheus Config File for Multiple Tasks

This connector can spin up multiple tasks. Tasks that come up in a single worker will expose their metrics through a single endpoint specified by prometheus.listener.url configuration. If there are workers that do not host the connector task, Prometheus will skip collecting metrics from that worker’s endpoint.

In a multi-cluster environment, assuming the Connect clusters are on hosts c1.svc.local, c2.svc.local and c3.svc.local, and assuming prometheus.listener.url is http://localhost:8889/metrics, the prometheus.yml configuration file should look similar to the following:

# global config
global:
scrape_interval:     10s # Set the scrape interval to every 15 seconds. Default is every 1 minute.
evaluation_interval: 20s # Evaluate rules every 15 seconds. The default is every 1 minute.
# scrape_timeout is set to the global default (10s).

scrape_configs:
# The job name is added as a label `job=<job_name>` to any timeseries scraped from this config.
- job_name: 'prometheus'
  static_configs:
    - targets: ['localhost:9090']

# The job name is added as a label `job=<job_name>` to any timeseries scraped from this config.
- job_name: 'connect'
  static_configs:
    - targets: ['http://c1.svc.local:8889/metrics', 'http://c2.svc.local:8889/metrics','http://c3.svc.local:8889/metrics']

Security

The Prometheus connector can create a secure server endpoint. For instructions, see Configure Prometheus Connector with Security.