Prometheus Metrics Sink Connector for Confluent Platform¶
注意
プレビューコネクターは現在サポート対象外であり、本稼働環境での使用は推奨されていません。
Prometheus Metrics Sink Connector は、Apache Kafka® の複数のトピックからデータをエクスポートし、Prometheus サーバーによりスクレイピングされるエンドポイントにデータを提供します。コネクターは、構造体およびスキーマレス JSON を Kafka レコードの値として受け取ります。name
および values
の各フィールドは必須です。
values
フィールドでは、メトリックの値が参照されます。このフィールドは、Kafka レコードの値が構造体型である場合は構造体型オブジェクト、Kafka レコードの値がスキーマレス JSON 型である場合はネスト化された JSON であることが想定されます。入力された構造体オブジェクトまたはスキーマレス JSON オブジェクトは、レコードの値として使用され、以下のようになります。
{
"name": string,
"type": string,
"timestamp": long,
"dimensions": {
"<dimension-1>": string,
...
},
"values": {
"<datapoint-1>": double,
"<datapoint-2>": double,
...
}
}
機能¶
Prometheus Sink Connector には、以下の機能があります。
少なくとも 1 回のデリバリー¶
このコネクターによって、レコードが Kafka のトピックに少なくとも 1 回は配信されることが保証されます。コネクターを再起動した場合、Kafka のトピックに重複レコードが存在している可能性があります。
複数のタスク¶
Prometheus Metrics Sink Connector は、1 つまたは複数のタスクの実行をサポートしています。タスクの数は tasks.max
構成パラメーターで指定できます。大容量のデータを移動する際には、複数のタスクがあるとパフォーマンスが向上する場合があります。
メトリクスとスキーマ¶
コネクターでは、Gauge、Meter、Histogram、Timer タイプのメトリクスがサポートされています。それぞれのメトリックタイプのスキーマは異なります。これらのメトリクスを含む Kafka のトピックは、これらのスキーマに準拠するレコードを保持する必要があります。
Gauge スキーマ¶
{
"doubleValue": double
}
Meter スキーマ¶
{
"count": double,
"oneMinuteRate": double,
"fiveMinuteRate": double,
"fifteenMinuteRate": double,
"meanRate": double
}
Histogram スキーマ¶
{
"count": double,
"max": double,
"min": double,
"mean": double,
"stdDev": double,
"sum": double,
"median": double,
"percentile75th": double,
"percentile95th": double,
"percentile98th": double,
"percentile99th": double,
"percentile999th": double,
}
Timer スキーマ¶
{
"count": double,
"oneMinuteRate": double,
"fiveMinuteRate": double,
"fifteenMinuteRate": double,
"meanRate": double,
"max": double,
"min": double,
"mean": double,
"stdDev": double,
"sum": double,
"median": double,
"percentile75th": double,
"percentile95th": double,
"percentile98th": double,
"percentile99th": double,
"percentile999th": double
}
レコードマッピング¶
values
構造体またはネスト化された JSON のオブジェクトのそれぞれの値は、Prometheus で読み取り可能な形式に変換されます。たとえば、以下に元の形式を示します。
{
"name": "sample_meter_metric",
"type": "meter",
"timestamp": 23480239402348234,
"dimensions": {
"service": "ec2-2312",
"method": "update"
},
"values": {
"count": 12,
"oneMinuteRate": 5.2,
"fiveMinuteRate": 4.7,
"fifteenMinuteRate": 4.9,
"meanRate": 5.1"
}
}
以下の例は、Prometheus で読み取り可能な変換済みの形式を示します。
# HELP sample_meter_metric_count
# TYPE sample_meter_metric_count counter
sample_meter_metric_count{service="ec2-2312",method="update"} 12
# HELP sample_meter_metric_oneMinuteRate
# TYPE sample_meter_metric_oneMinuteRate gauge
sample_meter_metric_oneMinuteRate{service="ec2-2312",method="update"} 5.2
# HELP sample_meter_metric_fiveMinuteRate
# TYPE sample_meter_metric_fiveMinuteRate gauge
sample_meter_metric_fiveMinuteRate{service="ec2-2312",method="update"} 4.7
# HELP sample_meter_metric_fifteenMinuteRate
# TYPE sample_meter_metric_fifteenMinuteRate gauge
sample_meter_metric_fifteenMinuteRate{service="ec2-2312",method="update"} 4.9
# HELP sample_meter_metric_meanRate
# TYPE sample_meter_metric_meanRate gauge
sample_meter_metric_meanRate{service="ec2-2312",method="update"} 5.1
前提条件¶
Prometheus Metrics Sink Connector を実行するには、以下が必要です。
- Kafka ブローカー : Confluent Platform 3.3.0 以上
- Connect: Confluent Platform 4.1.0 以上
- Java 1.8
- Prometheus がインストール済みで、 prometheus.yml が構成済みである。
制限¶
Prometheus コネクターには、以下の制限があります。
- タイムスタンプ非対応 : Prometheus ではメトリクスのスクレイピング時にタイムスタンプが使用されます。Kafka レコードのタイムスタンプは無視されます。
- プルベースのコネクター : Prometheus はプルベースのシステムです。コネクターによって、HTTP サーバーがワーカーノード上で起動されます。コネクターによって処理されるメトリクスはすべて、ワーカー HTTP エンドポイントで利用可能になります。エンドポイントは、
prometheus.listener.url
プロパティを使用して構成されます。prometheus.yml
構成ファイルにprometheus.listener.url
HTTP エンドポイントを追加する必要があります。 - メトリックのタイプ : ほぼすべてのメトリクスが
gauge
タイプと解釈されます。例外は、増分値に使用されるcounter
タイプです。以下のセクションで、これらのタイプの例を示します。詳細については、『Prometheus Metric Types 』を参照してください。 - バッファ制限 : コネクターでは、Kafka トピックからのメトリクスをバッファし、Prometheus による HTTP サーバーエンドポイントのスクレイピングで利用できるようにします。バッファサイズは 300 万メトリック項目で設定され、Prometheus で連続するスクレイピングの間のメトリクスを読み取って処理するのに十分な時間が確保されています。
Prometheus メトリクスコネクターのインストール¶
このコネクターは、Confluent Hub クライアントのインストール手順 に従うか、手作業で ZIP ファイルをダウンロードしてインストールします。
前提条件¶
注釈
このコネクターは、Connect が実行されるすべてのマシンにインストールする必要があります。
Confluent Hub クライアント のインストール。
注釈
これは、Confluent Enterprise とともにデフォルトでインストールされます。
コネクターの最新(
latest
)バージョンのインストール。コネクターの
latest
バージョンをインストールするには、Confluent Platform のインストールディレクトリに移動し、次のコマンドを実行します。confluent-hub install confluentinc/kafka-connect-prometheus-metrics:latest
特定のバージョンをインストールするには、次の例に示すように
latest
をバージョン番号に置き換えます。confluent-hub install confluentinc/kafka-connect-prometheus-metrics:1.1.1-preview
コネクターの手動インストール¶
コネクターの ZIP ファイル をダウンロードして展開し、コネクターの手動インストール 手順 に従ってください。
ライセンス¶
このコネクターは、ライセンスキーがなくても 30 日間試用できます。
30 日間経過後は、コネクターのサブスクリプションを購入する必要があります。これには、サブスクライバー用の Confluent エンタープライズライセンス キーと、Confluent Platform およびコネクターに対する `エンタープライズレベルのサポート<https://www.confluent.io/subscription/>`__ が含まれています。サブスクリプションをご購入済みの場合、詳細については Confluent サポート(support@confluent.io)にお問い合わせください。
ライセンスのプロパティとライセンストピックの詳細については、「Confluent ライセンスのプロパティ 」を参照してください。
構成プロパティ¶
このコネクターの構成プロパティの網羅的なリストについては、「Prometheus Metrics Sink Connector 構成プロパティ」を参照してください。
注釈
Kafka Connect を Confluent Cloud に接続する方法の例については、「分散クラスター」を参照してください。
クイックスタート¶
このクイックスタートでは、Kafka のトピックからレコードを読み取り、HTTP サーバーエンドポイントからアクセス可能になるように、Prometheus Metrics Sink Connector を構成します。アクセス可能になると、Prometheus によってサーバーエンドポイントがスクレイピングされます。
前提条件¶
- Confluent Platform がインストールされている。
- Confluent CLI がインストールされている。
- Prometheus がインストールされている。「Prometheus Installation」を参照してください。
- prometheus.yml ファイル が、 Connect ワーカーによって公開された HTTP サーバーエンドポイントをモニタリングして構成済みである。
Confluent の開始¶
以下の Confluent CLI コマンドを使用して Confluent サービスを開始します。
confluent local start
重要
Confluent CLI は、本稼働環境では使用しないでください。
プロパティベースの例¶
構成ファイル prometheus-metrics-sink.properties
を以下の内容で作成します。このファイルは、Confluent Platform のインストールディレクトリ内に配置します。この構成は、通常、スタンドアロンワーカー で使用されます。
注釈
このコネクターを Kafka Connect Reporter と併せて使用する方法の詳細については、「Connect Reporter」を参照してください。
name=prometheus-connector
topics=test-topic
tasks.max=1
connector.class=io.confluent.connect.prometheus.PrometheusMetricsSinkConnector
confluent.topic.bootstrap.servers=localhost:9092
prometheus.listener.url=http://localhost:8889/metrics
key.converter=io.confluent.connect.avro.AvroConverter
key.converter.schema.registry.url=http://localhost:8081
value.converter=io.confluent.connect.avro.AvroConverter
value.converter.schema.registry.url=http://localhost:8081
reporter.result.topic.replication.factor=1
reporter.error.topic.replication.factor=1
behavior.on.error=log
この構成プロパティファイルを使用してコネクターを実行します。
confluent local load prometheus-connector --config prometheus-metrics-sink.properties
出力は以下のようになります。
{
"name": "prometheus-connector",
"config": {
"topics": "test-topic",
"tasks.max": "1",
"connector.class": "io.confluent.connect.prometheus.PrometheusMetricsSinkConnector",
"confluent.topic.bootstrap.servers": "localhost:9092",
"prometheus.listener.url": "http://localhost:8889/metrics",
"key.converter": "io.confluent.connect.avro.AvroConverter",
"key.converter.schema.registry.url": "http://localhost:8081",
"value.converter": "io.confluent.connect.avro.AvroConverter",
"value.converter.schema.registry.url": "http://localhost:8081",
"reporter.result.topic.replication.factor": "1",
"reporter.error.topic.replication.factor": "1",
"behavior.on.error": "log",
"name": "prometheus-connector"
},
"tasks": [],
"type": "sink"
}
コネクターが RUNNING
ステートであることを確認します。
confluent local status prometheus-connector
出力は以下のようになります。
{
"name": "prometheus-connector",
"connector": {
"state": "RUNNING",
"worker_id": "127.0.1.1:8083"
},
"tasks": [
{
"id": 0,
"state": "RUNNING",
"worker_id": "127.0.1.1:8083",
}
],
"type": "sink"
}
REST ベースの例¶
REST ベースの構成は、 分散ワーカー に使用されます。詳細については、Kafka Connect REST API のドキュメントを参照してください。
注釈
このコネクターを Kafka Connect Reporter と併せて使用する方法の詳細については、「Connect Reporter」を参照してください。
次の JSON を
config.json
に書き込み、すべての必要な値を構成します。{ "name" : "prometheus-connector", "config" : { "topics":"test-topic", "connector.class" : "io.confluent.connect.prometheus.PrometheusMetricsSinkConnector", "tasks.max" : "1", "confluent.topic.bootstrap.servers":"localhost:9092", "prometheus.listener.url": "http://localhost:8889/metrics", "key.converter": "io.confluent.connect.avro.AvroConverter", "key.converter.schema.registry.url": "http://localhost:8081", "value.converter": "io.confluent.connect.avro.AvroConverter", "value.converter.schema.registry.url": "http://localhost:8081", "reporter.result.topic.replication.factor": "1", "reporter.error.topic.replication.factor": "1", "behavior.on.error": "log" } }
注釈
confluent.topic.bootstrap.servers
プロパティを変更してブローカーのアドレスを含めます。また、本稼働環境での使用では、confluent.topic.replication.factor
を3
に変更します。いずれかの Kafka Connect ワーカーに構成をポストするには、以下の curl コマンドを入力します。
http://localhost:8083/
を、Kafka Connect ワーカーのいずれかのエンドポイントに変更します。curl -sS -X POST -H 'Content-Type: application/json' --data @config.json http://localhost:8083/connectors
以下の curl コマンドを入力して、コネクターの構成をアップデートします。
curl -s -X PUT -H 'Content-Type: application/json' --data @config.json http://localhost:8083/connectors/prometheus-connector/config
以下の curl コマンドを入力して、コネクターが
RUNNING
ステートであることを確認します。curl http://localhost:8083/connectors/prometheus-connector/status | jq
出力は以下のようになります。
{ "name": "prometheus-connector", "connector": { "state": "RUNNING", "worker_id": "127.0.1.1:8083" }, "tasks": [ { "id": 0, "state": "RUNNING", "worker_id": "127.0.1.1:8083", } ], "type": "sink" }
エンドポイント
/connectors/prometheus-connector/status
を検索します。コネクターおよびタスクのステートはRUNNING
である必要があります。Kafka トピック
test-topic
に Avro データを生成するには、以下のコマンドを使用します。./bin/kafka-avro-console-producer \ --broker-list localhost:9092 --topic test-topic \ --property value.schema='{"name": "metric","type": "record","fields": [{"name": "name","type": "string"},{"name": "type","type": "string"},{"name": "timestamp","type": "long"},{"name": "values","type": {"name": "values","type": "record","fields": [{"name":"doubleValue", "type": "double"}]}}]}'
コンソールが入力待機中に、以下の 3 つのレコードを使用し、それぞれをコンソールに貼り付けます。
{"name":"kafka_gaugeMetric1", "type":"gauge","timestamp": 1576236481,"values": {"doubleValue": 5.639623848362502}} {"name":"kafka_gaugeMetric1", "type":"gauge","timestamp": 1576236481,"values": {"doubleValue": 5.639623848362502}} {"name":"kafka_gaugeMetric2", "type":"gauge","timestamp": 1576236481,"values": {"doubleValue": 5.639623848362502}} {"name":"kafka_gaugeMetric3", "type":"gauge","timestamp": 1576236481,"values": {"doubleValue": 5.639623848362502}}
localhost:9090
の Prometheus ポータルをチェックし、メトリクスが作成されたことを検証します。
複数のタスク用の Prometheus 構成ファイル¶
このコネクターでは、複数のタスクを処理できます。1 つのワーカーとして発生するタスクでは、そのメトリクスが、prometheus.listener.url
構成で指定された 1 つのエンドポイントを介して公開されます。コネクターのタスクをホストしないワーカーがある場合、Prometheus は、そのワーカーのエンドポイントからのメトリクスの収集をスキップします。
マルチクラスター環境では、Connect クラスターがホスト c1.svc.local
、c2.svc.local
、および c3.svc.local
上にあるとし、prometheus.listener.url
が http://localhost:8889/metrics
であるとすると、prometheus.yml
構成ファイルは、次のようになります。
# global config
global:
scrape_interval: 10s # Set the scrape interval to every 15 seconds. Default is every 1 minute.
evaluation_interval: 20s # Evaluate rules every 15 seconds. The default is every 1 minute.
# scrape_timeout is set to the global default (10s).
scrape_configs:
# The job name is added as a label `job=<job_name>` to any timeseries scraped from this config.
- job_name: 'prometheus'
static_configs:
- targets: ['localhost:9090']
# The job name is added as a label `job=<job_name>` to any timeseries scraped from this config.
- job_name: 'connect'
static_configs:
- targets: ['http://c1.svc.local:8889/metrics', 'http://c2.svc.local:8889/metrics','http://c3.svc.local:8889/metrics']
セキュリティ¶
Prometheus コネクターでセキュアなサーバーエンドポイントを作成できます。手順については、「セキュリティ付き Prometheus コネクターの構成」を参照してください。