Datadog Metrics Sink Connector for Confluent Platform¶
The Kafka Connect Datadog Metrics Sink connector is used to export data from
Apache Kafka® topics to Datadog using the Post timeseries API. The
connector accepts a Struct as a Kafka record’s value, where there must be
name
, timestamp
, and values
fields. The values
field refers to
the metrics value.
The input data should look like the following:
{
"name": string,
"type": string, -- optional (DEFAULT = "gauge")
"timestamp": long,
"dimensions": { -- optional
"host": string, -- optional
"interval": int, -- optional (DEFAULT = 0)
<tag1-key>: <tag1-value>, -- optional
<tag2-key>: <tag2-value>,
.....
},
"values": {
"doubleValue": double
}
}
This connector can start at minimum one task supporting all exportation of data and can scale horizontally by adding more tasks. However, performance is limited by Datadog. See API rate limiting for more information.
Features¶
The Datadog Metrics Sink connector includes the following features:
- At least once delivery
- Dead Letter Queue
- Multiple tasks
- Support for Kafka record value of type Struct, Schemaless JSON, and JSON String
- Batching multiple metrics
- Metrics and schemas
- Record mapping
At least once delivery¶
This connector guarantees that records from the Kafka topic are delivered at least once.
Dead Letter Queue¶
This connector supports the Dead Letter Queue (DLQ) functionality. For information about accessing and using the DLQ, see Confluent Platform Dead Letter Queue.
Multiple tasks¶
The Datadog Metrics Sink connector supports running one or more tasks. You can specify
the number of tasks in the tasks.max
configuration parameter. This can lead
to performance gains when multiple files need to be parsed.
Support for Kafka record value of type Struct, Schemaless JSON, and JSON String¶
The connector will attempt to fit the Kafka record values
of type Struct,
schemaless JSON, and JSON string into the one of the three defined metric types
(gauge
, rate
, or count
) depending on the type
field. Alternatively, if the
value for type
is anything other than the three types mentioned above,
Datadog will treat it as gauge
.
Batching multiple metrics¶
The connector tries to batch metrics in a single payload of maximum size 3.2 megabytes for each API request. For additional details, see Post timeseries points.
Metrics and schemas¶
The connector supports metrics of type Gauge, Rate, and Count. Each metric type has a different schema. Kafka topics that contain these metrics must have records that adhere to these schemas.
Gauge schema¶
The gauge
metric submission type represents a value associated with system
entity/parameter reporting continuously over time.
{
"doubleValue": double
}
Rate schema¶
The rate
metric submission type represents the number of events over a
defined time interval (flush interval) that is normalized per-second.
{
"doubleValue": double
}
Count schema¶
The count
metric submission type represents the number of events that occur
in a defined time interval. This is also known as the flush interval.
{
"doubleValue": double
}
Record mapping¶
Individual data in the provided Kafka record value is mapped to a Datadog Post Timeseries Metric API request body. Following shows an example of the mapping done by the connector:
{
"name": "metric-1",
"type": "rate",
"timestamp": 1575366904,
"dimensions": {
"host" : "host-1",
"interval" : 1,
"tag1" : "test",
"tag2" : "linux"
},
"host": "host-1",
"values": {
"doubleValue": 10.832442530901606
}
}
The example record above is mapped to Datadog TimeSeries Metric Post API
request body as shown below:
{
"series":
[
{
"host":"host-1",
"metric":"metric-1",
"points":
[
[
"1575366904",
"10.832442530901606"
]
],
"tags":["host:host1", "interval:1", "tag1:test", "tag2:linux"],
"type":"rate",
"interval":1
}
]
}
Limitations¶
The connector does not currently support Single Message Transformations (SMTs) that modify the topic name. Additionally, the following transformations are not allowed:
io.debezium.transforms.ByLogicalTableRouter
io.debezium.transforms.outbox.EventRouter
org.apache.kafka.connect.transforms.RegexRouter
org.apache.kafka.connect.transforms.TimestampRouter
io.confluent.connect.transforms.MessageTimestampRouter
io.confluent.connect.transforms.ExtractTopic$Key
io.confluent.connect.transforms.ExtractTopic$Value
Install the Datadog Metrics connector¶
You can install this connector by using the confluent connect plugin install command, or by manually downloading the ZIP file.
Prerequisites¶
You must install the connector on every machine where Connect will run.
Kafka Broker: Confluent Platform 3.3.0 or later.
Connect: Confluent Platform 4.1.0 or later.
Java 1.8.
Datadog account with at least reporting access to send data through
Post Timeseries Metric API
. For more information, see the Datadog Documentation.An install of the Confluent Hub Client. This is installed by default with Confluent Enterprise.
An installation of the latest (
latest
) connector version.To install the
latest
connector version, navigate to your Confluent Platform installation directory and run the following command:confluent connect plugin install confluentinc/kafka-connect-datadog-metrics:latest
You can install a specific version by replacing
latest
with a version number as shown in the following example:confluent connect plugin install confluentinc/kafka-connect-datadog-metrics:1.1.2
Install the connector manually¶
Download and extract the ZIP file for your connector and then follow the manual connector installation instructions.
License¶
You can use this connector for a 30-day trial period without a license key.
After 30 days, you must purchase a connector subscription which includes Confluent enterprise license keys to subscribers, along with enterprise-level support for Confluent Platform and your connectors. If you are a subscriber, you can contact Confluent Support at support@confluent.io for more information.
See Confluent Platform license for license properties and License topic configuration for information about the license topic.
Configuration Properties¶
For a complete list of configuration properties for this connector, see Configuration Reference for Datadog Metrics Sink Connector for Confluent Platform.
For an example of how to get Kafka Connect connected to Confluent Cloud, see Connect Self-Managed Kafka Connect to Confluent Cloud.
Quick Start¶
In this Quick Start, you configure the Kafka Connect Datadog Metrics Sink connector to read records from Kafka topics and export the data to Datadog.
- Prerequisites
- Confluent Platform is installed.
- The Confluent CLI is installed.
- Get started with Datadog is completed.
- The Datadog API key is available. Find the API key under
Integration > APIs > API Keys
accessible from the Datadog Dashboard.
Preliminary setup¶
To add a new connector plugin you must restart Connect. Use the Confluent CLI command to restart Connect.
Tip
The command syntax for the Confluent CLI development commands changed in 5.3.0.
These commands have been moved to confluent local
. For example, the syntax for confluent start
is now
confluent local services start
. For more information, see confluent local.
confluent local services connect stop && confluent local services connect start
Your output should resemble:
Using CONFLUENT_CURRENT: /Users/username/Sandbox/confluent-snapshots/var/confluent.NuZHxXfq
Starting Zookeeper
Zookeeper is [UP]
Starting Kafka
Kafka is [UP]
Starting Schema Registry
Schema Registry is [UP]
Starting Kafka REST
Kafka REST is [UP]
Starting Connect
Connect is [UP]
Check if the Datadog plugin has been installed correctly and picked up by the plugin loader:
curl -sS localhost:8083/connector-plugins | jq '.[].class' | grep datadog
Your output should resemble:
"io.confluent.connect.datadog.metrics.DatadogMetricsSinkConnector"
Sink Connector Configuration¶
Start the services using the Confluent CLI:
confluent local services start
Create a configuration file named datadog-metrics-sink-config.json with the following contents:
{
"name": "datadog-metrics-sink",
"config": {
"topics": "datadog-metrics-topic",
"connector.class": "io.confluent.connect.datadog.metrics.DatadogMetricsSinkConnector",
"tasks.max": "1",
"key.converter": "io.confluent.connect.string.StringConverter",
"key.converter.schema.registry.url": "http://localhost:8081",
"value.converter": "io.confluent.connect.json.JsonConverter",
"value.converter.schema.registry.url": "http://localhost:8081",
"datadog.api.key": "< your-api-key >"
"datadog.domain": "COM"
"behavior.on.error": "fail",
"confluent.topic.bootstrap.servers": "localhost:9092",
"confluent.topic.replication.factor": "1"
}
}
Run this command to start the Datadog Metrics sink connector.
Caution
You must include a double dash (--
) between the topic name and your flag. For more information,
see this post.
confluent local services connect connector load datadog-metrics-sink --config datadog-metrics-sink-config.json
To check that the connector started successfully view the Connect worker’s log by running:
confluent local services connect log
Produce test data to the datadog-metrics-topic
topic in Kafka using the
Confluent CLI confluent local services kafka produce command.
kafka-avro-console-producer \
--broker-list localhost:9092 --topic datadog-metrics-topic \
--property value.schema='{"name": "metric","type": "record","fields": [{"name": "name","type": "string"},{"name": "type","type": "string"},{"name": "timestamp","type": "long"}, {"name": "dimensions", "type": {"name": "dimensions", "type": "record", "fields": [{"name": "host", "type":"string"}, {"name":"interval", "type":"int"}, {"name": "tag1", "type":"string"}]}},{"name": "values","type": {"name": "values","type": "record","fields": [{"name":"doubleValue", "type": "double"}]}}]}'
Important
The timestamp should be in Unix epoch second format, current. Current is defined as not more than 10 minutes in the future or more than one hour in the past.
Unix epoch second format
{"name":"perf.metric", "type":"rate","timestamp": 1575875976, "dimensions": {"host": "metric.host1", "interval": 1, "tag1": "testing-data"},"values": {"doubleValue": 5.639623848362502}}
Using the Datadog Dashboard, you can view the metrics being produced. You can produce AVRO and JSON data to a Kafka topic for this connector.
When completed, stop the Confluent services using the command:
confluent local stop
Examples¶
Property-based example¶
Create a configuration file for the connector. This file is included with the
connector in
etc/kafka-connect-datadog-metrics/datadog-metrics-sink-connector.properties
.
This configuration is typically used with standalone
workers.
Note
For details about using this connector with Kafka Connect Reporter, see Connect Reporter.
name=datadog-metrics-sink
topics=datadog-metrics-topic
connector.class=io.confluent.connect.datadog.metrics.DatadogMetricsSinkConnector
tasks.max=1
datadog.api.key=< Your Datadog Api key >
datadog.domain=< anyone of COM/EU >
behavior.on.error=< Optional Configuration >
reporter.bootstrap.servers=localhost:9092
key.converter=io.confluent.connect.avro.AvroConverter
key.converter.schema.registry.url=http://localhost:8081
value.converter=io.confluent.connect.avro.AvroConverter
value.converter.schema.registry.url=http://localhost:8081
confluent.topic.bootstrap.servers=localhost:9092
confluent.topic.replication.factor=1
confluent.license=
Before starting the connector, make sure that the configurations in datadog
properties
are properly set.
Note
Provide datadog.api.key
, datadog.domain
and behavior.on.error
and
start the connector.
Then start the Datadog metrics connector by loading its configuration with the following command.
Caution
You must include a double dash (--
) between the connector name and your flag. For more information,
see this post.
confluent local services connect connector load datadog-metrics-sink --config datadog-metrics-sink-connector.properties
{
"name": "datadog-metrics-sink",
"config": {
"connector.class": "io.confluent.connect.datadog.metrics.DatadogMetricsSinkConnector",
"tasks.max":"1",
"topics":"datadog-metrics-topic",
"datadog.api.key": "< your-api-key > "
"datadog.domain": "COM"
"behavior.on.error": "fail",
"key.converter":"io.confluent.connect.avro.AvroConverter",
"key.converter.schema.registry.url":"http://localhost:8081",
"value.converter":"io.confluent.connect.avro.AvroConverter",
"value.converter.schema.registry.url":"http://localhost:8081",
"confluent.topic.bootstrap.servers":"localhost:9092",
"confluent.topic.replication.factor":"1",
"reporter.bootstrap.servers": "localhost:9092"
},
"tasks": []
}
REST-based example¶
This configuration is typically used with distributed
workers. Write the following JSON to
connector.json
, configure all of the required values, and use the command
below to post the configuration to one the distributed connect workers. Check
here for more information about the Kafka Connect REST Interface.
Note
For details about using this connector with Kafka Connect Reporter, see Connect Reporter.
{
"name" : "datadog-metrics-sink-connector",
"config" : {
"connector.class": "io.confluent.connect.datadog.metrics.DatadogMetricsSinkConnector",
"tasks.max": "1",
"datadog.domain": "COM",
"datadog.api.key": "< your-api-key >",
"behavior.on.error": "fail",
"reporter.bootstrap.servers": "localhost:9092",
"confluent.topic.bootstrap.servers": "localhost:9092",
"confluent.topic.replication.factor": "1"
}
}
Use curl to post the configuration to one of the Kafka Connect workers. Change
http://localhost:8083/
to the endpoint of one of your Kafka Connect workers.
curl -s -X POST -H 'Content-Type: application/json' --data @connector.json http://localhost:8083/connectors
curl -s -X PUT -H 'Content-Type: application/json' --data @connector.json \
http://localhost:8083/connectors/datadog-metrics-sink-connector/config