Datadog Metrics Sink Connector for Confluent Platform

The Kafka Connect Datadog Metrics Sink connector is used to export data from Apache Kafka® topics to Datadog using the Post timeseries API. The connector accepts a Struct as a Kafka record’s value, where there must be name, timestamp, and values fields. The values field refers to the metrics value.

The input data should look like the following:

{
    "name": string,
    "type": string,               -- optional (DEFAULT = "gauge")
    "timestamp": long,
    "dimensions": {               -- optional
      "host": string,             -- optional
      "interval": int,            -- optional (DEFAULT = 0)
      <tag1-key>: <tag1-value>,   -- optional
      <tag2-key>: <tag2-value>,
      .....
    },
    "values": {
      "doubleValue": double
    }
}

This connector can start at minimum one task supporting all exportation of data and can scale horizontally by adding more tasks. However, performance is limited by Datadog. See API rate limiting for more information.

Features

The Datadog Metrics Sink connector includes the following features:

At least once delivery

This connector guarantees that records from the Kafka topic are delivered at least once.

Dead Letter Queue

This connector supports the Dead Letter Queue (DLQ) functionality. For information about accessing and using the DLQ, see Confluent Platform Dead Letter Queue.

Multiple tasks

The Datadog Metrics Sink connector supports running one or more tasks. You can specify the number of tasks in the tasks.max configuration parameter. This can lead to performance gains when multiple files need to be parsed.

Support for Kafka record value of type Struct, Schemaless JSON, and JSON String

The connector will attempt to fit the Kafka record values of type Struct, schemaless JSON, and JSON string into the one of the three defined metric types (gauge, rate, or count) depending on the type field. Alternatively, if the value for type is anything other than the three types mentioned above, Datadog will treat it as gauge.

Batching multiple metrics

The connector tries to batch metrics in a single payload of maximum size 3.2 megabytes for each API request. For additional details, see Post timeseries points.

Metrics and schemas

The connector supports metrics of type Gauge, Rate, and Count. Each metric type has a different schema. Kafka topics that contain these metrics must have records that adhere to these schemas.

Gauge schema

The gauge metric submission type represents a value associated with system entity/parameter reporting continuously over time.

{
  "doubleValue": double
}

Rate schema

The rate metric submission type represents the number of events over a defined time interval (flush interval) that is normalized per-second.

{
  "doubleValue": double
}

Count schema

The count metric submission type represents the number of events that occur in a defined time interval. This is also known as the flush interval.

{
  "doubleValue": double
}

Record mapping

Individual data in the provided Kafka record value is mapped to a Datadog Post Timeseries Metric API request body. Following shows an example of the mapping done by the connector:

{
  "name": "metric-1",
  "type": "rate",
  "timestamp": 1575366904,
  "dimensions": {
    "host" : "host-1",
    "interval" : 1,
    "tag1" : "test",
    "tag2" : "linux"
  },
  "host": "host-1",
  "values": {
    "doubleValue": 10.832442530901606
  }
}

The example record above is mapped to Datadog TimeSeries Metric Post API request body as shown below:

{
  "series":
  [
    {
      "host":"host-1",
      "metric":"metric-1",
      "points":
      [
        [
          "1575366904",
          "10.832442530901606"
        ]
      ],
      "tags":["host:host1", "interval:1", "tag1:test", "tag2:linux"],
      "type":"rate",
      "interval":1
    }
  ]
}

Limitations

The connector does not currently support Single Message Transformations (SMTs) that modify the topic name. Additionally, the following transformations are not allowed:

  • io.debezium.transforms.ByLogicalTableRouter
  • io.debezium.transforms.outbox.EventRouter
  • org.apache.kafka.connect.transforms.RegexRouter
  • org.apache.kafka.connect.transforms.TimestampRouter
  • io.confluent.connect.transforms.MessageTimestampRouter
  • io.confluent.connect.transforms.ExtractTopic$Key
  • io.confluent.connect.transforms.ExtractTopic$Value

Install the Datadog Metrics connector

You can install this connector by using the confluent connect plugin install command, or by manually downloading the ZIP file.

Prerequisites

  • You must install the connector on every machine where Connect will run.

  • Kafka Broker: Confluent Platform 3.3.0 or later.

  • Connect: Confluent Platform 4.1.0 or later.

  • Java 1.8.

  • Datadog account with at least reporting access to send data through Post Timeseries Metric API. For more information, see the Datadog Documentation.

  • An install of the Confluent Hub Client. This is installed by default with Confluent Enterprise.

  • An installation of the latest (latest) connector version.

    To install the latest connector version, navigate to your Confluent Platform installation directory and run the following command:

    confluent connect plugin install confluentinc/kafka-connect-datadog-metrics:latest
    

    You can install a specific version by replacing latest with a version number as shown in the following example:

    confluent connect plugin install confluentinc/kafka-connect-datadog-metrics:1.1.2
    

Install the connector manually

Download and extract the ZIP file for your connector and then follow the manual connector installation instructions.

License

You can use this connector for a 30-day trial period without a license key.

After 30 days, you must purchase a connector subscription which includes Confluent enterprise license keys to subscribers, along with enterprise-level support for Confluent Platform and your connectors. If you are a subscriber, you can contact Confluent Support at support@confluent.io for more information.

See Confluent Platform license for license properties and License topic configuration for information about the license topic.

Configuration Properties

For a complete list of configuration properties for this connector, see Configuration Reference for Datadog Metrics Sink Connector for Confluent Platform.

For an example of how to get Kafka Connect connected to Confluent Cloud, see Connect Self-Managed Kafka Connect to Confluent Cloud.

Quick Start

In this Quick Start, you configure the Kafka Connect Datadog Metrics Sink connector to read records from Kafka topics and export the data to Datadog.

Prerequisites

Preliminary setup

To add a new connector plugin you must restart Connect. Use the Confluent CLI command to restart Connect.

Tip

The command syntax for the Confluent CLI development commands changed in 5.3.0. These commands have been moved to confluent local. For example, the syntax for confluent start is now confluent local services start. For more information, see confluent local.

confluent local services connect stop && confluent local services connect start

Your output should resemble:

Using CONFLUENT_CURRENT: /Users/username/Sandbox/confluent-snapshots/var/confluent.NuZHxXfq
Starting Zookeeper
Zookeeper is [UP]
Starting Kafka
Kafka is [UP]
Starting Schema Registry
Schema Registry is [UP]
Starting Kafka REST
Kafka REST is [UP]
Starting Connect
Connect is [UP]

Check if the Datadog plugin has been installed correctly and picked up by the plugin loader:

curl -sS localhost:8083/connector-plugins | jq '.[].class' | grep datadog

Your output should resemble:

"io.confluent.connect.datadog.metrics.DatadogMetricsSinkConnector"

Sink Connector Configuration

Start the services using the Confluent CLI:

confluent local services start

Create a configuration file named datadog-metrics-sink-config.json with the following contents:

 {
  "name": "datadog-metrics-sink",
  "config": {
    "topics": "datadog-metrics-topic",
    "connector.class": "io.confluent.connect.datadog.metrics.DatadogMetricsSinkConnector",
    "tasks.max": "1",
    "key.converter": "io.confluent.connect.string.StringConverter",
    "key.converter.schema.registry.url": "http://localhost:8081",
    "value.converter": "io.confluent.connect.json.JsonConverter",
    "value.converter.schema.registry.url": "http://localhost:8081",
    "datadog.api.key": "< your-api-key >"
    "datadog.domain": "COM"
    "behavior.on.error": "fail",
    "confluent.topic.bootstrap.servers": "localhost:9092",
    "confluent.topic.replication.factor": "1"
  }
}

Run this command to start the Datadog Metrics sink connector.

Caution

You must include a double dash (--) between the topic name and your flag. For more information, see this post.

confluent local services connect connector load datadog-metrics-sink --config datadog-metrics-sink-config.json

To check that the connector started successfully view the Connect worker’s log by running:

confluent local services connect log

Produce test data to the datadog-metrics-topic topic in Kafka using the Confluent CLI confluent local services kafka produce command.

  kafka-avro-console-producer \
--broker-list localhost:9092 --topic datadog-metrics-topic \
--property value.schema='{"name": "metric","type": "record","fields": [{"name": "name","type": "string"},{"name": "type","type": "string"},{"name": "timestamp","type": "long"}, {"name": "dimensions", "type": {"name": "dimensions", "type": "record", "fields": [{"name": "host", "type":"string"}, {"name":"interval", "type":"int"}, {"name": "tag1", "type":"string"}]}},{"name": "values","type": {"name": "values","type": "record","fields": [{"name":"doubleValue", "type": "double"}]}}]}'

Important

The timestamp should be in Unix epoch second format, current. Current is defined as not more than 10 minutes in the future or more than one hour in the past.

Unix epoch second format

{"name":"perf.metric", "type":"rate","timestamp": 1575875976, "dimensions": {"host": "metric.host1", "interval": 1, "tag1": "testing-data"},"values": {"doubleValue": 5.639623848362502}}

Using the Datadog Dashboard, you can view the metrics being produced. You can produce AVRO and JSON data to a Kafka topic for this connector.

When completed, stop the Confluent services using the command:

confluent local stop

Examples

Property-based example

Create a configuration file for the connector. This file is included with the connector in etc/kafka-connect-datadog-metrics/datadog-metrics-sink-connector.properties. This configuration is typically used with standalone workers.

Note

For details about using this connector with Kafka Connect Reporter, see Connect Reporter.

name=datadog-metrics-sink
topics=datadog-metrics-topic
connector.class=io.confluent.connect.datadog.metrics.DatadogMetricsSinkConnector
tasks.max=1
datadog.api.key=< Your Datadog Api key >
datadog.domain=< anyone of COM/EU >
behavior.on.error=< Optional Configuration >
reporter.bootstrap.servers=localhost:9092

key.converter=io.confluent.connect.avro.AvroConverter
key.converter.schema.registry.url=http://localhost:8081
value.converter=io.confluent.connect.avro.AvroConverter
value.converter.schema.registry.url=http://localhost:8081

confluent.topic.bootstrap.servers=localhost:9092
confluent.topic.replication.factor=1
confluent.license=

Before starting the connector, make sure that the configurations in datadog properties are properly set.

Note

Provide datadog.api.key, datadog.domain and behavior.on.error and start the connector.

Then start the Datadog metrics connector by loading its configuration with the following command.

Caution

You must include a double dash (--) between the connector name and your flag. For more information, see this post.

confluent local services connect connector load datadog-metrics-sink --config datadog-metrics-sink-connector.properties
{
 "name": "datadog-metrics-sink",
 "config": {
     "connector.class": "io.confluent.connect.datadog.metrics.DatadogMetricsSinkConnector",
     "tasks.max":"1",
     "topics":"datadog-metrics-topic",
     "datadog.api.key": "< your-api-key > "
     "datadog.domain": "COM"
     "behavior.on.error": "fail",
     "key.converter":"io.confluent.connect.avro.AvroConverter",
     "key.converter.schema.registry.url":"http://localhost:8081",
     "value.converter":"io.confluent.connect.avro.AvroConverter",
     "value.converter.schema.registry.url":"http://localhost:8081",
     "confluent.topic.bootstrap.servers":"localhost:9092",
     "confluent.topic.replication.factor":"1",
     "reporter.bootstrap.servers": "localhost:9092"
 },
  "tasks": []
}

REST-based example

This configuration is typically used with distributed workers. Write the following JSON to connector.json, configure all of the required values, and use the command below to post the configuration to one the distributed connect workers. Check here for more information about the Kafka Connect REST Interface.

Note

For details about using this connector with Kafka Connect Reporter, see Connect Reporter.

{
  "name" : "datadog-metrics-sink-connector",
  "config" : {
   "connector.class": "io.confluent.connect.datadog.metrics.DatadogMetricsSinkConnector",
   "tasks.max": "1",
   "datadog.domain": "COM",
   "datadog.api.key": "< your-api-key >",
   "behavior.on.error": "fail",
   "reporter.bootstrap.servers": "localhost:9092",
   "confluent.topic.bootstrap.servers": "localhost:9092",
   "confluent.topic.replication.factor": "1"
  }
}

Use curl to post the configuration to one of the Kafka Connect workers. Change http://localhost:8083/ to the endpoint of one of your Kafka Connect workers.

curl -s -X POST -H 'Content-Type: application/json' --data @connector.json http://localhost:8083/connectors
curl -s -X PUT -H 'Content-Type: application/json' --data @connector.json \
http://localhost:8083/connectors/datadog-metrics-sink-connector/config