Datadog Metrics Sink Connector for Confluent Platform

Kafka Connect Datadog Metrics Sink Connector は、Post timeseries API を使用して Apache Kafka® のトピックから Datadog にデータをエクスポートするために使用されます。このコネクターは、Kafka レコードの値として構造体を受け取ります。ここでは、nametimestampvalues の各フィールドが必要です。values フィールドでは、メトリクスの値が参照されます。

入力データは以下のようになります。

{
    "name": string,
    "type": string,               -- optional (DEFAULT = "gauge")
    "timestamp": long,
    "dimensions": {               -- optional
      "host": string,             -- optional
      "interval": int,            -- optional (DEFAULT = 0)
      <tag1-key>: <tag1-value>,   -- optional
      <tag2-key>: <tag2-value>,
      .....
    },
    "values": {
      "doubleValue": double
    }
}

このコネクターは、データの全エクスポートをサポートする 1 つ以上のタスクから開始して、さらにタスクを追加して水平に拡張できます。ただし、パフォーマンスは、Datadog により制限されます。詳細については、API のレート制限 を参照してください。

機能

Datadog Metrics Sink Connector には、以下の機能があります。

少なくとも 1 回のデリバリー

このコネクターによって、Kafka のトピックのレコードが少なくとも 1 回は配信されることが保証されます。

デッドレターキュー

このコネクターは、デッドレターキュー(DLQ)機能をサポートします。DLQ へのアクセスとその使用方法の詳細については、「デッドレターキュー」を参照してください。

複数のタスク

The Datadog Metrics Sink connector supports running one or more tasks. You can specify the number of tasks in the tasks.max configuration parameter. This can lead to huge performance gains when multiple files need to be parsed.

構造体型、スキーマレス JSON 型、JSON 文字列型の Kafka レコード値をサポート

このコネクターは、Kafka レコードの構造体型、スキーマレス JSON 型、JSON 文字列型の values を、type フィールドに応じて定義済みの 3 つのメトリックタイプ(gaugerate、または count)のいずれかに当てはめようとします。また、type の値が前述の 3 つ以外のタイプである場合、Datadog では gauge として扱われます。

複数のメトリクスのバッチ処理

このコネクターでは、API リクエストごとに最大サイズ 3.2 MB の単一ペイロードで、メトリクスのバッチ処理を試行します。詳細については、『Submit metrics』を参照してください。

メトリクスとスキーマ

コネクターでは、メトリクスタイプとして Gauge、Rate、Count がサポートされています。それぞれのメトリクス タイプのスキーマは異なります。これらのメトリクスを含む Kafka のトピックは、これらのスキーマに準拠するレコードを保持する必要があります。

Gauge スキーマ

gauge メトリック送信タイプは、長い時間をかけて継続的にレポートされる、システムエンティティやパラメーターに関連付けられた値を表します。

{
  "doubleValue": double
}

Rate スキーマ

rate メトリック送信タイプは、定義した時間間隔(フラッシュ間隔)のイベント数を秒あたりに正規化した数を表します。

{
  "doubleValue": double
}

Count スキーマ

count メトリック送信タイプは、定義した時間間隔の間に発生したイベント数を表します。定義された時間間隔は、フラッシュ間隔とも呼ばれます。

{
  "doubleValue": double
}

レコードマッピング

提供される Kafka レコード値の個々のデータは、Datadog Post Timeseries Metric API リクエスト本文にマップされます。以下はコネクターによって実行されるマッピングの例です。

{
  "name": "metric-1",
  "type": "rate",
  "timestamp": 1575366904,
  "dimensions": {
    "host" : "host-1",
    "interval" : 1,
    "tag1" : "test",
    "tag2" : "linux"
  },
  "host": "host-1",
  "values": {
    "doubleValue": 10.832442530901606
  }
}

上の例のレコードは、下に示すように Datadog TimeSeries Metric Post API リクエスト本文にマップされます。

{
  "series":
  [
    {
      "host":"host-1",
      "metric":"metric-1",
      "points":
      [
        [
          "1575366904",
          "10.832442530901606"
        ]
      ],
      "tags":["host:host1", "interval:1", "tag1:test", "tag2:linux"],
      "type":"rate",
      "interval":1
    }
  ]
}

制限

現在、コネクターではトピック名を変更する Single Message Transformations(SMT) をサポートしていません。また、以下の変換は許可されていません。

  • io.debezium.transforms.ByLogicalTableRouter
  • io.debezium.transforms.outbox.EventRouter
  • org.apache.kafka.connect.transforms.RegexRouter
  • org.apache.kafka.connect.transforms.TimestampRouter
  • io.confluent.connect.transforms.MessageTimestampRouter
  • io.confluent.connect.transforms.ExtractTopic$Key
  • io.confluent.connect.transforms.ExtractTopic$Value

前提条件

Kafka Connect Datadog コネクターを実行するには、以下が必要です。

  • Kafka ブローカー: Confluent Platform 3.3.0 以上
  • Connect: Confluent Platform 4.1.0 以上
  • Java 1.8
  • Post Timeseries Metric API でデータを送信するために、少なくとも レポートアクセス を持つ Datadog アカウント。詳細については、Datadog のドキュメント を参照してください。

Datadog Metrics Connector のインストール

このコネクターは、Confluent Hub クライアントのインストール手順 に従うか、手作業で ZIP ファイルをダウンロードしてインストールします。

前提条件

注釈

このコネクターは、Connect が実行されるすべてのマシンにインストールする必要があります。

  • Confluent Hub クライアント のインストール。

    注釈

    これは、Confluent Enterprise とともにデフォルトでインストールされます。

  • コネクターの最新(latest)バージョンのインストール。

    コネクターの latest バージョンをインストールするには、Confluent Platform のインストールディレクトリに移動し、次のコマンドを実行します。

    confluent-hub install confluentinc/kafka-connect-datadog-metrics:latest
    

    特定のバージョンをインストールするには、次の例に示すように latest をバージョン番号に置き換えます。

    confluent-hub install confluentinc/kafka-connect-datadog-metrics:1.1.2
    

コネクターの手動インストール

コネクターの ZIP ファイル をダウンロードして展開し、コネクターの手動インストール 手順 に従ってください。

ライセンス

ライセンスのプロパティについては、「Confluent Platform ライセンス」を参照してください。ライセンストピックの詳細については、「datadog_metrics_sink_license_topic_configuration」を参照してください。

構成プロパティ

このコネクター構成プロパティの網羅的なリストについては、「Datadog Metrics Sink Connector 構成プロパティ」を参照してください。

クイックスタート

このクイックスタートでは、Kafka Connect Datadog Metrics Sink Connector を、Kafka のトピックからレコードを読み取って Datadog にデータをエクスポートするように構成します。

前提条件
  • Confluent Platform がインストールされている。
  • Confluent CLI がインストールされている。
  • Get started with Datadog』を完了している。
  • Datadog API キーを利用できる。API キーは、Datadog ダッシュボードから Integration > APIs > API Keys にアクセスして確認できます。

事前セットアップ

新しいコネクタープラグインを追加する場合は、Connect を再起動する必要があります。Confluent CLI コマンドを使用して Connect を再起動します。

confluent local services connect stop && confluent local services connect start

出力は以下のようになります。

Using CONFLUENT_CURRENT: /Users/username/Sandbox/confluent-snapshots/var/confluent.NuZHxXfq
Starting Zookeeper
Zookeeper is [UP]
Starting Kafka
Kafka is [UP]
Starting Schema Registry
Schema Registry is [UP]
Starting Kafka REST
Kafka REST is [UP]
Starting Connect
Connect is [UP]

Datadog プラグインが正しくインストールされ、プラグインローダーによって選択されていることを確認します。

curl -sS localhost:8083/connector-plugins | jq '.[].class' | grep datadog

出力は以下のようになります。

"io.confluent.connect.datadog.metrics.DatadogMetricsSinkConnector"

シンクコネクターの構成

Confluent CLI を使用してサービスを開始します。

confluent local services start

datadog-metrics-sink-config.json という名前の構成ファイルを以下の内容で作成します。

 {
  "name": "datadog-metrics-sink",
  "config": {
    "topics": "datadog-metrics-topic",
    "connector.class": "io.confluent.connect.datadog.metrics.DatadogMetricsSinkConnector",
    "tasks.max": "1",
    "key.converter": "io.confluent.connect.string.StringConverter",
    "key.converter.schema.registry.url": "http://localhost:8081",
    "value.converter": "io.confluent.connect.json.JsonConverter",
    "value.converter.schema.registry.url": "http://localhost:8081",
    "datadog.api.key": "< your-api-key >"
    "datadog.domain": "COM"
    "behavior.on.error": "fail",
    "confluent.topic.bootstrap.servers": "localhost:9092",
    "confluent.topic.replication.factor": "1"
  }
}

このコマンドを実行して、Datadog Metrics Sink Connector を起動します。

confluent local services connect connector load datadog-metrics-sink --config datadog-metrics-sink-config.json

コネクターが正常に開始されたことを確認するには、以下を実行して Connect ワーカーのログを表示します。

confluent local services connect log

Confluent CLI confluent local services kafka produce コマンドを使用して、テストデータを Kafka の datadog-metrics-topic トピックに生成します。

  kafka-avro-console-producer \
--broker-list localhost:9092 --topic datadog-metrics-topic \
--property value.schema='{"name": "metric","type": "record","fields": [{"name": "name","type": "string"},{"name": "type","type": "string"},{"name": "timestamp","type": "long"}, {"name": "dimensions", "type": {"name": "dimensions", "type": "record", "fields": [{"name": "host", "type":"string"}, {"name":"interval", "type":"int"}, {"name": "tag1", "type":"string"}]}},{"name": "values","type": {"name": "values","type": "record","fields": [{"name":"doubleValue", "type": "double"}]}}]}'

重要

タイムスタンプは、Unix エポック秒形式としての "現在時刻" にする必要があります。現在時刻は、未来の 10 分以内、過去 1 時間以内と定義されます。

Unix エポック秒形式

{"name":"perf.metric", "type":"rate","timestamp": 1575875976, "dimensions": {"host": "metric.host1", "interval": 1, "tag1": "testing-data"},"values": {"doubleValue": 5.639623848362502}}

Datadog ダッシュボードを使用して、生成されるメトリクスを表示できます。このコネクター用に、AVRO および JSON データを Kafka のトピックに生成することができます。

完了したら、以下のコマンドを使用して Confluent サービスを停止します。

confluent local stop

プロパティベースの例

コネクターの構成ファイルを作成します。このファイルは、コネクターの etc/kafka-connect-datadog-metrics/datadog-metrics-sink-connector.properties に含まれています。この構成は通常、スタンドアロンワーカー で使用されます。

注釈

このコネクターを Kafka Connect Reporter と併せて使用する方法の詳細については、「Connect Reporter」を参照してください。

name=datadog-metrics-sink
topics=datadog-metrics-topic
connector.class=io.confluent.connect.datadog.metrics.DatadogMetricsSinkConnector
tasks.max=1
datadog.api.key=< Your Datadog Api key >
datadog.domain=< anyone of COM/EU >
behavior.on.error=< Optional Configuration >
reporter.bootstrap.servers=localhost:9092

key.converter=io.confluent.connect.avro.AvroConverter
key.converter.schema.registry.url=http://localhost:8081
value.converter=io.confluent.connect.avro.AvroConverter
value.converter.schema.registry.url=http://localhost:8081

confluent.topic.bootstrap.servers=localhost:9092
confluent.topic.replication.factor=1
confluent.license=

コネクターを起動する前に、datadog.properties の構成が正しく設定されていることを確認します。

注釈

datadog.api.keydatadog.domain、および behavior.on.error を指定して、コネクターを起動してください。

以下のコマンドを使用して構成を読み込んで、Datadog Metrics Sink Connector を起動します。

confluent local services connect connector load datadog-metrics-sink --config datadog-metrics-sink-connector.properties
{
 "name": "datadog-metrics-sink",
 "config": {
     "connector.class": "io.confluent.connect.datadog.metrics.DatadogMetricsSinkConnector",
     "tasks.max":"1",
     "topics":"datadog-metrics-topic",
     "datadog.api.key": "< your-api-key > "
     "datadog.domain": "COM"
     "behavior.on.error": "fail",
     "key.converter":"io.confluent.connect.avro.AvroConverter",
     "key.converter.schema.registry.url":"http://localhost:8081",
     "value.converter":"io.confluent.connect.avro.AvroConverter",
     "value.converter.schema.registry.url":"http://localhost:8081",
     "confluent.topic.bootstrap.servers":"localhost:9092",
     "confluent.topic.replication.factor":"1",
     "reporter.bootstrap.servers": "localhost:9092"
 },
  "tasks": []
}

REST ベースの例

この構成は通常、 分散ワーカー で使用されます。以下の JSON を connector.json に書き込み、すべての必要な値を構成します。それから以下のコマンドを使用して、分散されたコネクトワーカーのいずれかに構成をポストします。Kafka Connect REST インターフェイス の詳細については、こちらを参照してください。

注釈

このコネクターを Kafka Connect Reporter と併せて使用する方法の詳細については、「Connect Reporter」を参照してください。

{
  "name" : "datadog-metrics-sink-connector",
  "config" : {
   "connector.class": "io.confluent.connect.datadog.metrics.DatadogMetricsSinkConnector",
   "tasks.max": "1",
   "datadog.domain": "COM",
   "datadog.api.key": "< your-api-key >",
   "behavior.on.error": "fail",
   "reporter.bootstrap.servers": "localhost:9092",
   "confluent.topic.bootstrap.servers": "localhost:9092",
   "confluent.topic.replication.factor": "1"
  }
}

いずれかの Kafka Connect ワーカーに構成をポストするには、curl を使用します。http://localhost:8083/ を、いずれかの Kafka Connect ワーカーのエンドポイントに変更します。

curl -s -X POST -H 'Content-Type: application/json' --data @connector.json http://localhost:8083/connectors
curl -s -X PUT -H 'Content-Type: application/json' --data @connector.json \
http://localhost:8083/connectors/datadog-metrics-sink-connector/config