Prometheus Metrics Sink Connector for Confluent Platform

The Kafka Connect Prometheus Metrics Sink connector exports data from multiple Apache Kafka® topics and makes the data available to an endpoint which is scraped by a Prometheus server. The connector accepts Struct and schemaless JSON as a Kafka record’s value. The name and values fields are required.

The values field refers to a metric’s values. This field is expected to be a Struct object when the Kafka record value is Struct type, and nested JSON when the Kafka record value schemaless JSON. The input Struct or schemaless JSON object used as the record’s value should resemble the following:

{
  "name": string,
  "type": string,
  "timestamp": long,
  "dimensions": {
    "<dimension-1>": string,
    ...
  },
  "values": {
    "<datapoint-1>": double,
    "<datapoint-2>": double,
    ...
  }
}

Prerequisites

The following are required to run the Kafka Connect Prometheus Metrics sink connector:

  • Kafka Broker: Confluent Platform 3.3.0 or above
  • Connect: Confluent Platform 4.1.0 or above
  • Java 1.8
  • Prometheus installed and prometheus.yml configured.

Limitations

The Prometheus connector has the following limitations:

  • Timestamp not supported: Prometheus uses the timestamp when metrics are scraped. Timestamps in Kafka records are ignored.
  • Pull-based Connector: Prometheus is a pull-based system. The connector starts an HTTP server on the worker node. All the metrics processed by the connector are made available at the worker HTTP endpoint. The endpoint is configured using the prometheus.listener.url property. The prometheus.listener.url HTTP endpoint must be added to the prometheus.yml configuration file.
  • Metric types: Almost all metrics are interpreted as gauge type. An exception to this is the counter type used for incrementing values. Examples of these types are provided in the following section. For more information, see Prometheus Metric Types.
  • Buffer limit: The connector buffers metrics from Kafka topics and makes them available when Prometheus scrapes the HTTP server endpoint. Buffer size is set at 3 million metric items to give Prometheus sufficient time to read and process metrics between consecutive scrapes.

Supported Metrics and Schemas

The connector supports metrics of type Gauge, Meter, Histogram, and Timer. Each metric type has a different schema. Kafka topics that contain these metrics must have records that adhere to these schemas.

Gauge schema

{
  "doubleValue": double
}

Meter schema

{
  "count": double,
  "oneMinuteRate": double,
  "fiveMinuteRate": double,
  "fifteenMinuteRate": double,
  "meanRate": double
}

Histogram schema

{
  "count": double,
  "max": double,
  "min": double,
  "mean": double,
  "stdDev": double,
  "sum": double,
  "median": double,
  "percentile75th": double,
  "percentile95th": double,
  "percentile98th": double,
  "percentile99th": double,
  "percentile999th": double,
}

Timer schema

{
  "count": double,
  "oneMinuteRate": double,
  "fiveMinuteRate": double,
  "fifteenMinuteRate": double,
  "meanRate": double,
  "max": double,
  "min": double,
  "mean": double,
  "stdDev": double,
  "sum": double,
  "median": double,
  "percentile75th": double,
  "percentile95th": double,
  "percentile98th": double,
  "percentile99th": double,
  "percentile999th": double
}

Record Mapping

Each value in the values Struct or nested JSON object is converted into Prometheus-readable form. For example below shows the original form:

{
  "name": "sample_meter_metric",
  "type": "meter",
  "timestamp": 23480239402348234,
  "dimensions": {
    "service": "ec2-2312",
    "method": "update"
  },
  "values": {
    "count": 12,
    "oneMinuteRate": 5.2,
    "fiveMinuteRate": 4.7,
    "fifteenMinuteRate": 4.9,
    "meanRate": 5.1"
  }
}

The example below shows the converted Prometheus-readable form:

# HELP sample_meter_metric_count
# TYPE sample_meter_metric_count counter
sample_meter_metric_count{service="ec2-2312",method="update"} 12
# HELP sample_meter_metric_oneMinuteRate
# TYPE sample_meter_metric_oneMinuteRate gauge
sample_meter_metric_oneMinuteRate{service="ec2-2312",method="update"} 5.2
# HELP sample_meter_metric_fiveMinuteRate
# TYPE sample_meter_metric_fiveMinuteRate gauge
sample_meter_metric_fiveMinuteRate{service="ec2-2312",method="update"} 4.7
# HELP sample_meter_metric_fifteenMinuteRate
# TYPE sample_meter_metric_fifteenMinuteRate gauge
sample_meter_metric_fifteenMinuteRate{service="ec2-2312",method="update"} 4.9
# HELP sample_meter_metric_meanRate
# TYPE sample_meter_metric_meanRate gauge
sample_meter_metric_meanRate{service="ec2-2312",method="update"} 5.1

Install the Prometheus Metrics Connector

You can install this connector by using the instructions or you can manually download the ZIP file.

Install the connector using Confluent Hub

Prerequisite
Confluent Hub Client must be installed. This is installed by default with Confluent Enterprise.

Navigate to your Confluent Platform installation directory and run the following command to install the latest (latest) connector version. The connector must be installed on every machine where Connect will run.

confluent-hub install confluentinc/kafka-connect-prometheus-metrics:latest

You can install a specific version by replacing latest with a version number. For example:

confluent-hub install confluentinc/kafka-connect-prometheus-metrics:1.0.2

Install the connector manually

Download and extract the ZIP file for your connector and then follow the manual connector installation instructions.

License

You can use this connector for a 30-day trial period without a license key.

After 30 days, this connector is available under a Confluent enterprise license. Confluent issues Confluent enterprise license keys to subscribers, along with providing enterprise-level support for Confluent Platform and your connectors. If you are a subscriber, please contact Confluent Support at support@confluent.io for more information.

See Confluent license properties for license properties and information about the license topic.

Configuration Properties

For a complete list of configuration properties for this connector, see Prometheus Metrics Sink Connector Configuration Properties.

Note

For an example of how to get Kafka Connect connected to Confluent Cloud, see Distributed Cluster.

Quick Start

In this Quick Start, you configure the Kafka Connect Prometheus Metrics sink connector to read records from Kafka topics and make them accessible from an HTTP server endpoint. Prometheus then scrapes the server endpoint.

Prerequisites

Start Confluent

Start the Confluent services using the following Confluent CLI command:

confluent local start

Important

Do not use the Confluent CLI in production environments.

Property-based example

Create a configuration file prometheus-metrics-sink.properties with the following content. Put this file inside the Confluent Platform installation directory. This configuration is used typically along with standalone workers.

Note

For details about using this connector with Kafka Connect Reporter, see Connect Reporter.

name=prometheus-connector
topics=test-topic
tasks.max=1
connector.class=io.confluent.connect.prometheus.PrometheusMetricsSinkConnector
confluent.topic.bootstrap.servers=localhost:9092
prometheus.listener.url=http://localhost:8889/metrics
key.converter=io.confluent.connect.avro.AvroConverter
key.converter.schema.registry.url=http://localhost:8081
value.converter=io.confluent.connect.avro.AvroConverter
value.converter.schema.registry.url=http://localhost:8081
reporter.result.topic.replication.factor=1
reporter.error.topic.replication.factor=1
behavior.on.error=log

Run the connector with the configuration properties file:

confluent local load prometheus-connector --config prometheus-metrics-sink.properties

The output should resemble:

{
  "name": "prometheus-connector",
  "config": {
  "topics": "test-topic",
  "tasks.max": "1",
  "connector.class": "io.confluent.connect.prometheus.PrometheusMetricsSinkConnector",
  "confluent.topic.bootstrap.servers": "localhost:9092",
  "prometheus.listener.url": "http://localhost:8889/metrics",
  "key.converter": "io.confluent.connect.avro.AvroConverter",
  "key.converter.schema.registry.url": "http://localhost:8081",
  "value.converter": "io.confluent.connect.avro.AvroConverter",
  "value.converter.schema.registry.url": "http://localhost:8081",
  "reporter.result.topic.replication.factor": "1",
  "reporter.error.topic.replication.factor": "1",
  "behavior.on.error": "log",
  "name": "prometheus-connector"
},
"tasks": [],
"type": "sink"
}

Confirm that the connector is in a RUNNING state.

confluent local status prometheus-connector

The output should resemble:

{
  "name": "prometheus-connector",
  "connector": {
  "state": "RUNNING",
  "worker_id": "127.0.1.1:8083"
},
  "tasks": [
  {
  "id": 0,
  "state": "RUNNING",
  "worker_id": "127.0.1.1:8083",
  }
],
"type": "sink"
}

REST-based example

REST-based configuration is used for distributed workers. For more information, see the Kafka Connect REST API documentation.

Note

For details about using this connector with Kafka Connect Reporter, see Connect Reporter.

  1. Write the following JSON to config.json and configure all of the required values.

    {
      "name" : "prometheus-connector",
      "config" : {
        "topics":"test-topic",
        "connector.class" : "io.confluent.connect.prometheus.PrometheusMetricsSinkConnector",
        "tasks.max" : "1",
        "confluent.topic.bootstrap.servers":"localhost:9092",
        "prometheus.listener.url": "http://localhost:8889/metrics",
        "key.converter": "io.confluent.connect.avro.AvroConverter",
        "key.converter.schema.registry.url": "http://localhost:8081",
        "value.converter": "io.confluent.connect.avro.AvroConverter",
        "value.converter.schema.registry.url": "http://localhost:8081",
        "reporter.result.topic.replication.factor": "1",
        "reporter.error.topic.replication.factor": "1",
        "behavior.on.error": "log"
      }
    }
    

    Note

    Change the confluent.topic.bootstrap.servers property to include your broker address(es) and change the confluent.topic.replication.factor to 3 for production use.

  2. Enter the following curl command to post the configuration to one of the Kafka Connect workers. Change http://localhost:8083/ to the endpoint of one of your Kafka Connect worker(s).

    curl -sS -X POST -H 'Content-Type: application/json' --data @config.json http://localhost:8083/connectors
    
  3. Enter the following curl command to update the configuration of the connector:

    curl -s -X PUT -H 'Content-Type: application/json' --data @config.json http://localhost:8083/connectors/prometheus-connector/config
    
  4. Enter the following curl command to confirm that the connector is in a RUNNING state:

    curl http://localhost:8083/connectors/prometheus-connector/status | jq
    

    The output should resemble:

    {
      "name": "prometheus-connector",
      "connector": {
      "state": "RUNNING",
      "worker_id": "127.0.1.1:8083"
    },
      "tasks": [
      {
      "id": 0,
      "state": "RUNNING",
      "worker_id": "127.0.1.1:8083",
      }
    ],
    "type": "sink"
    }
    

    Search for the endpoint /connectors/prometheus-connector/status, the state of the connector and tasks should have status as RUNNING.

  5. Use the following command to produce Avro data to the Kafka topic: test-topic:

     ./bin/kafka-avro-console-producer \
    --broker-list localhost:9092 --topic test-topic \
    --property value.schema='{"name": "metric","type": "record","fields": [{"name": "name","type": "string"},{"name": "type","type": "string"},{"name": "timestamp","type": "long"},{"name": "values","type": {"name": "values","type": "record","fields": [{"name":"doubleValue", "type": "double"}]}}]}'
    

    While the console is waiting for the input, use the following three records and paste each of them on the console.

    {"name":"kafka_gaugeMetric1", "type":"gauge","timestamp": 1576236481,"values": {"doubleValue": 5.639623848362502}}
    {"name":"kafka_gaugeMetric1", "type":"gauge","timestamp": 1576236481,"values": {"doubleValue": 5.639623848362502}}
    {"name":"kafka_gaugeMetric2", "type":"gauge","timestamp": 1576236481,"values": {"doubleValue": 5.639623848362502}}
    {"name":"kafka_gaugeMetric3", "type":"gauge","timestamp": 1576236481,"values": {"doubleValue": 5.639623848362502}}
    
  6. Check the Prometheus portal on localhost:9090 and verify that metrics were created.

Security

The Prometheus connector can create a secure server endpoint. For instructions, see Configure Prometheus Connector with Security.