Datadog Metrics Sink Connector for Confluent Platform¶
Kafka Connect Datadog Metrics Sink Connector は、Post timeseries API を使用して Apache Kafka® のトピックから Datadog にデータをエクスポートするために使用されます。このコネクターは、Kafka レコードの値として構造体を受け取ります。ここでは、name
、timestamp
、values
の各フィールドが必要です。values
フィールドでは、メトリクスの値が参照されます。
入力データは以下のようになります。
{
"name": string,
"type": string, -- optional (DEFAULT = "gauge")
"timestamp": long,
"dimensions": { -- optional
"host": string, -- optional
"interval": int, -- optional (DEFAULT = 0)
<tag1-key>: <tag1-value>, -- optional
<tag2-key>: <tag2-value>,
.....
},
"values": {
"doubleValue": double
}
}
このコネクターは、データの全エクスポートをサポートする 1 つ以上のタスクから開始して、さらにタスクを追加して水平に拡張できます。ただし、パフォーマンスは、Datadog により制限されます。詳細については、API のレート制限 を参照してください。
機能¶
Datadog Metrics Sink Connector には、以下の機能があります。
- 少なくとも 1 回のデリバリー
- デッドレターキュー
- 複数のタスク
- 構造体型、スキーマレス JSON 型、JSON 文字列型の Kafka レコード値をサポート
- 複数のメトリクスのバッチ処理
- メトリクスとスキーマ
- レコードマッピング
少なくとも 1 回のデリバリー¶
このコネクターによって、Kafka のトピックのレコードが少なくとも 1 回は配信されることが保証されます。
複数のタスク¶
The Datadog Metrics Sink connector supports running one or more tasks. You can specify
the number of tasks in the tasks.max
configuration parameter. This can lead
to huge performance gains when multiple files need to be parsed.
構造体型、スキーマレス JSON 型、JSON 文字列型の Kafka レコード値をサポート¶
このコネクターは、Kafka レコードの構造体型、スキーマレス JSON 型、JSON 文字列型の values
を、type
フィールドに応じて定義済みの 3 つのメトリックタイプ(gauge
、rate
、または count
)のいずれかに当てはめようとします。また、type
の値が前述の 3 つ以外のタイプである場合、Datadog では gauge
として扱われます。
複数のメトリクスのバッチ処理¶
このコネクターでは、API リクエストごとに最大サイズ 3.2 MB の単一ペイロードで、メトリクスのバッチ処理を試行します。詳細については、『Submit metrics』を参照してください。
メトリクスとスキーマ¶
コネクターでは、メトリクスタイプとして Gauge、Rate、Count がサポートされています。それぞれのメトリクス タイプのスキーマは異なります。これらのメトリクスを含む Kafka のトピックは、これらのスキーマに準拠するレコードを保持する必要があります。
Gauge スキーマ¶
gauge
メトリック送信タイプは、長い時間をかけて継続的にレポートされる、システムエンティティやパラメーターに関連付けられた値を表します。
{
"doubleValue": double
}
Count スキーマ¶
count
メトリック送信タイプは、定義した時間間隔の間に発生したイベント数を表します。定義された時間間隔は、フラッシュ間隔とも呼ばれます。
{
"doubleValue": double
}
レコードマッピング¶
提供される Kafka レコード値の個々のデータは、Datadog Post Timeseries Metric API リクエスト本文にマップされます。以下はコネクターによって実行されるマッピングの例です。
{
"name": "metric-1",
"type": "rate",
"timestamp": 1575366904,
"dimensions": {
"host" : "host-1",
"interval" : 1,
"tag1" : "test",
"tag2" : "linux"
},
"host": "host-1",
"values": {
"doubleValue": 10.832442530901606
}
}
上の例のレコードは、下に示すように Datadog TimeSeries Metric Post API
リクエスト本文にマップされます。
{
"series":
[
{
"host":"host-1",
"metric":"metric-1",
"points":
[
[
"1575366904",
"10.832442530901606"
]
],
"tags":["host:host1", "interval:1", "tag1:test", "tag2:linux"],
"type":"rate",
"interval":1
}
]
}
制限¶
現在、コネクターではトピック名を変更する Single Message Transformations(SMT) をサポートしていません。また、以下の変換は許可されていません。
io.debezium.transforms.ByLogicalTableRouter
io.debezium.transforms.outbox.EventRouter
org.apache.kafka.connect.transforms.RegexRouter
org.apache.kafka.connect.transforms.TimestampRouter
io.confluent.connect.transforms.MessageTimestampRouter
io.confluent.connect.transforms.ExtractTopic$Key
io.confluent.connect.transforms.ExtractTopic$Value
前提条件¶
Kafka Connect Datadog コネクターを実行するには、以下が必要です。
- Kafka ブローカー: Confluent Platform 3.3.0 以上
- Connect: Confluent Platform 4.1.0 以上
- Java 1.8
Post Timeseries Metric API
でデータを送信するために、少なくとも レポートアクセス を持つ Datadog アカウント。詳細については、Datadog のドキュメント を参照してください。
Datadog Metrics Connector のインストール¶
このコネクターは、Confluent Hub クライアントのインストール手順 に従うか、手作業で ZIP ファイルをダウンロードしてインストールします。
前提条件¶
注釈
このコネクターは、Connect が実行されるすべてのマシンにインストールする必要があります。
Confluent Hub クライアント のインストール。
注釈
これは、Confluent Enterprise とともにデフォルトでインストールされます。
コネクターの最新(
latest
)バージョンのインストール。コネクターの
latest
バージョンをインストールするには、Confluent Platform のインストールディレクトリに移動し、次のコマンドを実行します。confluent-hub install confluentinc/kafka-connect-datadog-metrics:latest
特定のバージョンをインストールするには、次の例に示すように
latest
をバージョン番号に置き換えます。confluent-hub install confluentinc/kafka-connect-datadog-metrics:1.1.2
コネクターの手動インストール¶
コネクターの ZIP ファイル をダウンロードして展開し、コネクターの手動インストール 手順 に従ってください。
ライセンス¶
ライセンスのプロパティについては、「Confluent Platform ライセンス」を参照してください。ライセンストピックの詳細については、「datadog_metrics_sink_license_topic_configuration」を参照してください。
構成プロパティ¶
このコネクター構成プロパティの網羅的なリストについては、「Datadog Metrics Sink Connector 構成プロパティ」を参照してください。
クイックスタート¶
このクイックスタートでは、Kafka Connect Datadog Metrics Sink Connector を、Kafka のトピックからレコードを読み取って Datadog にデータをエクスポートするように構成します。
- 前提条件
- Confluent Platform がインストールされている。
- Confluent CLI がインストールされている。
- 『Get started with Datadog』を完了している。
- Datadog API キーを利用できる。API キーは、Datadog ダッシュボードから
Integration > APIs > API Keys
にアクセスして確認できます。
事前セットアップ¶
新しいコネクタープラグインを追加する場合は、Connect を再起動する必要があります。Confluent CLI コマンドを使用して Connect を再起動します。
confluent local services connect stop && confluent local services connect start
出力は以下のようになります。
Using CONFLUENT_CURRENT: /Users/username/Sandbox/confluent-snapshots/var/confluent.NuZHxXfq
Starting Zookeeper
Zookeeper is [UP]
Starting Kafka
Kafka is [UP]
Starting Schema Registry
Schema Registry is [UP]
Starting Kafka REST
Kafka REST is [UP]
Starting Connect
Connect is [UP]
Datadog プラグインが正しくインストールされ、プラグインローダーによって選択されていることを確認します。
curl -sS localhost:8083/connector-plugins | jq '.[].class' | grep datadog
出力は以下のようになります。
"io.confluent.connect.datadog.metrics.DatadogMetricsSinkConnector"
シンクコネクターの構成¶
Confluent CLI を使用してサービスを開始します。
confluent local services start
datadog-metrics-sink-config.json という名前の構成ファイルを以下の内容で作成します。
{
"name": "datadog-metrics-sink",
"config": {
"topics": "datadog-metrics-topic",
"connector.class": "io.confluent.connect.datadog.metrics.DatadogMetricsSinkConnector",
"tasks.max": "1",
"key.converter": "io.confluent.connect.string.StringConverter",
"key.converter.schema.registry.url": "http://localhost:8081",
"value.converter": "io.confluent.connect.json.JsonConverter",
"value.converter.schema.registry.url": "http://localhost:8081",
"datadog.api.key": "< your-api-key >"
"datadog.domain": "COM"
"behavior.on.error": "fail",
"confluent.topic.bootstrap.servers": "localhost:9092",
"confluent.topic.replication.factor": "1"
}
}
このコマンドを実行して、Datadog Metrics Sink Connector を起動します。
confluent local services connect connector load datadog-metrics-sink --config datadog-metrics-sink-config.json
コネクターが正常に開始されたことを確認するには、以下を実行して Connect ワーカーのログを表示します。
confluent local services connect log
Confluent CLI confluent local services kafka produce コマンドを使用して、テストデータを Kafka の datadog-metrics-topic
トピックに生成します。
kafka-avro-console-producer \
--broker-list localhost:9092 --topic datadog-metrics-topic \
--property value.schema='{"name": "metric","type": "record","fields": [{"name": "name","type": "string"},{"name": "type","type": "string"},{"name": "timestamp","type": "long"}, {"name": "dimensions", "type": {"name": "dimensions", "type": "record", "fields": [{"name": "host", "type":"string"}, {"name":"interval", "type":"int"}, {"name": "tag1", "type":"string"}]}},{"name": "values","type": {"name": "values","type": "record","fields": [{"name":"doubleValue", "type": "double"}]}}]}'
重要
タイムスタンプは、Unix エポック秒形式としての "現在時刻" にする必要があります。現在時刻は、未来の 10 分以内、過去 1 時間以内と定義されます。
Unix エポック秒形式
{"name":"perf.metric", "type":"rate","timestamp": 1575875976, "dimensions": {"host": "metric.host1", "interval": 1, "tag1": "testing-data"},"values": {"doubleValue": 5.639623848362502}}
Datadog ダッシュボードを使用して、生成されるメトリクスを表示できます。このコネクター用に、AVRO および JSON データを Kafka のトピックに生成することができます。
完了したら、以下のコマンドを使用して Confluent サービスを停止します。
confluent local stop
例¶
プロパティベースの例¶
コネクターの構成ファイルを作成します。このファイルは、コネクターの etc/kafka-connect-datadog-metrics/datadog-metrics-sink-connector.properties
に含まれています。この構成は通常、スタンドアロンワーカー で使用されます。
注釈
このコネクターを Kafka Connect Reporter と併せて使用する方法の詳細については、「Connect Reporter」を参照してください。
name=datadog-metrics-sink
topics=datadog-metrics-topic
connector.class=io.confluent.connect.datadog.metrics.DatadogMetricsSinkConnector
tasks.max=1
datadog.api.key=< Your Datadog Api key >
datadog.domain=< anyone of COM/EU >
behavior.on.error=< Optional Configuration >
reporter.bootstrap.servers=localhost:9092
key.converter=io.confluent.connect.avro.AvroConverter
key.converter.schema.registry.url=http://localhost:8081
value.converter=io.confluent.connect.avro.AvroConverter
value.converter.schema.registry.url=http://localhost:8081
confluent.topic.bootstrap.servers=localhost:9092
confluent.topic.replication.factor=1
confluent.license=
コネクターを起動する前に、datadog.properties
の構成が正しく設定されていることを確認します。
注釈
datadog.api.key
、datadog.domain
、および behavior.on.error
を指定して、コネクターを起動してください。
以下のコマンドを使用して構成を読み込んで、Datadog Metrics Sink Connector を起動します。
confluent local services connect connector load datadog-metrics-sink --config datadog-metrics-sink-connector.properties
{
"name": "datadog-metrics-sink",
"config": {
"connector.class": "io.confluent.connect.datadog.metrics.DatadogMetricsSinkConnector",
"tasks.max":"1",
"topics":"datadog-metrics-topic",
"datadog.api.key": "< your-api-key > "
"datadog.domain": "COM"
"behavior.on.error": "fail",
"key.converter":"io.confluent.connect.avro.AvroConverter",
"key.converter.schema.registry.url":"http://localhost:8081",
"value.converter":"io.confluent.connect.avro.AvroConverter",
"value.converter.schema.registry.url":"http://localhost:8081",
"confluent.topic.bootstrap.servers":"localhost:9092",
"confluent.topic.replication.factor":"1",
"reporter.bootstrap.servers": "localhost:9092"
},
"tasks": []
}
REST ベースの例¶
この構成は通常、 分散ワーカー で使用されます。以下の JSON を connector.json
に書き込み、すべての必要な値を構成します。それから以下のコマンドを使用して、分散されたコネクトワーカーのいずれかに構成をポストします。Kafka Connect REST インターフェイス の詳細については、こちらを参照してください。
注釈
このコネクターを Kafka Connect Reporter と併せて使用する方法の詳細については、「Connect Reporter」を参照してください。
{
"name" : "datadog-metrics-sink-connector",
"config" : {
"connector.class": "io.confluent.connect.datadog.metrics.DatadogMetricsSinkConnector",
"tasks.max": "1",
"datadog.domain": "COM",
"datadog.api.key": "< your-api-key >",
"behavior.on.error": "fail",
"reporter.bootstrap.servers": "localhost:9092",
"confluent.topic.bootstrap.servers": "localhost:9092",
"confluent.topic.replication.factor": "1"
}
}
いずれかの Kafka Connect ワーカーに構成をポストするには、curl を使用します。http://localhost:8083/
を、いずれかの Kafka Connect ワーカーのエンドポイントに変更します。
curl -s -X POST -H 'Content-Type: application/json' --data @connector.json http://localhost:8083/connectors
curl -s -X PUT -H 'Content-Type: application/json' --data @connector.json \
http://localhost:8083/connectors/datadog-metrics-sink-connector/config