InfluxDB Sink Connector for Confluent Platform¶
Kafka Connect InfluxDB Sink Connector は、データを Apache Kafka® トピックから InfluxDB ホストに書き込みます。バッチ内の複数のレコードに同じ measurement、時間、およびタグがある場合、それらは結合されて InfluxDB に書き込まれます。
InfluxDB Sink Connector には、以下の機能があります。
少なくとも 1 回のデリバリー¶
このコネクターによって、Kafka のトピックのレコードが少なくとも 1 回は配信されることが保証されます。
複数のタスク¶
InfluxDB Sink Connector は、1 つまたは複数のタスクの実行をサポートしています。タスクの数は tasks.max
構成パラメーターで指定できます。これにより、複数のファイルを解析する必要がある場合に、パフォーマンスが大幅に向上する可能性があります。
構成プロパティ¶
このコネクターの構成プロパティの網羅的なリストについては、「InfluxDB Sink Connector 構成プロパティ」を参照してください。
クイックスタート¶
このクイックスタートでは、単一の Kafka トピックから、Docker で実行されているローカル Influx database 上の measurement にデータをコピーします。
この例では、Kafka および スキーマレジストリ がデフォルトのポートでローカルに実行されていることを前提としています。また、Docker がインストールされ実行されていることも前提としています。
注釈
InfluxDB Docker は、インストールされている InfluxDB サーバーに置き換えることができます。
まず、以下の Docker コマンドを実行して、Influx database を開始します。
docker run -d -p 8086:8086 --name influxdb-local influxdb:1.7.7
Influx database が開始され、localhost
上のポート 8086 にマップされます。デフォルトでは、ユーザー名とパスワードは空白です。database 接続 URL は http://localhost:8086
です。
以下の Confluent CLI コマンドを使用して Confluent Platform を開始します。
ちなみに
Confluent CLI 開発用コマンドのコマンド構文は、5.3.0 で変更されました。該当するコマンドは confluent local
に移動されました。たとえば、confluent start
の構文は、confluent local services start
になりました。詳細については、「confluent local 」を参照してください。
confluent local services start
プロパティベースの例¶
次に、コネクターの構成ファイルを作成します。この構成は、通常、 スタンドアロンワーカー と併せて使用されます。このファイルは、コネクターの ./etc/kafka-connect-influxdb/influxdb-sink-connector.properties
に含まれ、以下の設定があります。
name=InfluxDBSinkConnector
connector.class=io.confluent.influxdb.InfluxDBSinkConnector
tasks.max=1
topics=orders
influxdb.url=http://localhost:8086
influxdb.db=influxTestDB
measurement.name.format=${topic}
value.converter=io.confluent.connect.avro.AvroConverter
value.converter.schema.registry.url=http://localhost:8081
この例のようにシンクコネクターに固有のトピックを除き、最初のいくつかの設定は、すべてのコネクターに対して指定する共通の設定です。
influxdb.url
では、influxDB サーバーの接続 URL を指定します。influxdb.db
、influxdb.username
、および influxdb.password
では、それぞれ InfluxDB サーバーの database 名、ユーザー名、パスワードを指定します。上記の InfluxDB サーバーのユーザー名とパスワードは、デフォルトで空白であるため、構成に追加されていません。
この構成でコネクターを実行します。
confluent local services connect connector load InfluxDBSinkConnector --config etc/kafka-connect-influxdb/influxdb-sink-connector.properties
REST ベースの例¶
この構成は通常、 分散ワーカー と併せて使用されます。以下の JSON を influxdb-sink-connector.json
に書き込み、すべての必要な値を構成し、以下のコマンドを使用して構成を分散された Connect ワーカーのいずれかにポストします。詳細については、Kafka Connect REST API を参照してください。
{
"name" : "InfluxDBSinkConnector",
"config" : {
"connector.class" : "io.confluent.influxdb.InfluxDBSinkConnector",
"tasks.max" : "1",
"topics" : "orders",
"influxdb.url" : "http://localhost:8086",
"influxdb.db" : "influxTestDB",
"measurement.name.format" : "${topic}",
"value.converter": "io.confluent.connect.avro.AvroConverter",
"value.converter.schema.registry.url": "http://localhost:8081"
}
}
いずれかの Kafka Connect ワーカーに構成をポストするには、curl を使用します。http://localhost:8083/
を、Kafka Connect ワーカーのいずれかのエンドポイントに変更します。
この構成でコネクターを実行します。
curl -X POST -d @influxdb-sink-connector.json http://localhost:8083/connectors -H "Content-Type: application/json"
次に、orders
トピックにレコードを作成します。
bin/kafka-avro-console-producer \
--broker-list localhost:9092 --topic orders \
--property value.schema='{"type":"record","name":"myrecord","fields":[{"name":"id","type":"int"},{"name":"product", "type": "string"}, {"name":"quantity", "type": "int"}, {"name":"price",
"type": "float"}]}'
コンソールプロデューサーは、入力を待機しています。以下のレコードをターミナルにコピーアンドペーストします。
{"id": 999, "product": "foo", "quantity": 100, "price": 50}
InfluxDB のデータを検証するには、以下のコマンドを使用して Docker コンテナーにログインします。
docker exec -it <containerid> bash
ちなみに
コンテナー ID を調べるには、docker ps
コマンドを使用します。
Docker コンテナーにログインしたら、InfluxDB シェルにログインします。
influx
出力は以下のようになります。
Connected to http://localhost:8086 version 1.7.7
InfluxDB shell version: 1.7.7
最後に、以下のクエリを実行して、レコードを検証します。
> USE influxTestDB;
Using database influxTestDB
> SELECT * FROM orders;
name: orders
time id price product quantity
---- -- ----- ------- --------
1567164248415000000 999 50 foo 100
スキーマレス JSON タグの例¶
この例では、以下のコネクター構成を使用します。
name=InfluxDBSinkConnector
connector.class=io.confluent.influxdb.InfluxDBSinkConnector
tasks.max=1
topics=test
influxdb.url=http://localhost:8086
influxdb.db=influxTestDB
measurement.name.format=${topic}
key.converter=org.apache.kafka.connect.json.JsonConverter
value.converter=org.apache.kafka.connect.json.JsonConverter
key.converter.schemas.enable=false
value.converter.schemas.enable=false
test
というトピックのスキーマレス JSON タグを作成するプロデューサーコマンドを以下に示します。
kafka-console-producer \
--broker-list localhost:9092 \
--topic test
コンソールプロデューサーは、入力を待機しています。以下のレコードをターミナルにコピーアンドペーストします。
{"name":"influx","age":23,"tags":{"id":"5"}}
以下のクエリは、結果にタグとして id
を示しています。これは、プロデューサーコマンドの "payload":{"tags":{"id":"5"}
に基づいています。
> select * from test;
name: test
time age id name
---- --- -- ----
1579307684366000000 23 5 influx
> show tag keys from test;
name: test
tagKey
------
id
- Kafka トピックのレコードに、既存の InfluxDB measurement に存在しないフィールドが含まれている場合、それらのフィールドは measurement に作成されます。
- Kafka トピックのレコードに、既存の InfluxDB measurement に既に存在するフィールドが含まれていない場合、それらのフィールドの値は空になります。
JSON タグの例¶
この例では、以下のコネクター構成を使用します。
name=InfluxDBSinkConnector
connector.class=io.confluent.influxdb.InfluxDBSinkConnector
tasks.max=1
topics=test
influxdb.url=http://localhost:8086
influxdb.db=influxTestDB
measurement.name.format=${topic}
value.converter=org.apache.kafka.connect.json.JsonConverter
value.converter.schemas.enable=true
test
というトピックの JSON タグを作成するプロデューサーコマンドを以下に示します。
kafka-console-producer \
--broker-list localhost:9092 \
--topic test
--property value.schema='{"schema":{"type":"struct","fields":[{"type":"map","keys":{"type":"string","optional":false},"values":{"type":"string","optional":false},"optional":false,"field":"tags"},{"type":"string","optional":false,"field":"time"},{"type":"double","optional":true,"field":"value"}],"optional":false,"version":1},"payload":{"tags":{"id":"5"},"time":"2019-07-24T11:43:19.201040841Z","value":500.0}}'
以下のクエリは、結果にタグとして id
を示しています。これは、プロデューサーコマンドの "payload":{"tags":{"id":"5"}
に基づいています。
> select * from test;
name: test
time id value
---- -- -----
1579307684366000000 5 500
1579307701088000000 5 500
> show tag keys from test;
name: test
tagKey
------
id
Avro タグの例¶
この例では、以下のコネクター構成を使用します。
name=InfluxDBSinkConnector
connector.class=io.confluent.influxdb.InfluxDBSinkConnector
tasks.max=1
topics=products
influxdb.url=http://localhost:8086
influxdb.db=influxTestDB
measurement.name.format=${topic}
value.converter=io.confluent.connect.avro.AvroConverter
value.converter.schema.registry.url=http://localhost:8081
products
というトピックの Avro タグを作成するプロデューサーコマンドを以下に示します。
kafka-avro-console-producer \
--broker-list localhost:9092 \
--topic products \
--property value.schema='{"name": "myrecord","type": "record","fields": [{"name":"id","type":"int"}, {"name": "product","type": "string"}, {"name": "quantity","type": "int"},{"name": "price","type": "float"}, {"name": "tags","type": {"name": "tags","type": "record","fields": [{"name": "DEVICE","type": "string"},{"name": "location","type": "string"}]}}]}'
コンソールプロデューサーは、入力を待機しています。以下のレコードをターミナルにコピーアンドペーストします。
{"id": 1, "product": "pencil", "quantity": 100, "price": 50, "tags" : {"DEVICE": "living", "location": "home"}}
{"id": 2, "product": "pen", "quantity": 200, "price": 60, "tags" : {"DEVICE": "living", "location": "home"}}
データが InfluxDB に存在することを確認します。
トピックと database 間の例¶
measurement.name.format
が構成に存在しない場合、コネクターでは、database 名として Kafka トピック名が使用され、measurement 名はメッセージ内のフィールドから取得されます。
この例では、以下のコネクター構成を使用します。
name=InfluxDBSinkConnector
connector.class=io.confluent.influxdb.InfluxDBSinkConnector
tasks.max=1
topics=products
influxdb.url=http://localhost:8086
value.converter=io.confluent.connect.avro.AvroConverter
value.converter.schema.registry.url=http://localhost:8081
products
というトピックの Avro レコードを作成するプロデューサーコマンドを以下に示します。
kafka-avro-console-producer \
--broker-list localhost:9092 \
--topic products \
--property value.schema='{"name": "myrecord","type": "record","fields": [{"name":"id","type":"int"}, {"name": "measurement","type":"string"}]}'
コンソールプロデューサーは、入力を待機します。
{"id": 1, "measurement": "test"}
{"id": 2, "measurement": "test2"}
以下のクエリは、InfluxDB に書き込まれている measurement と point を示しています。
> use products;
> show measurements;
name: measurements
name
----
test
test2
> select * from test;
name: test
time id
---- --
1601464614638 1
カスタム timestamp の例¶
この例では、以下のコネクター構成を使用します。
name=InfluxDBSinkConnector
connector.class=io.confluent.influxdb.InfluxDBSinkConnector
tasks.max=1
topics=products
influxdb.url=http://localhost:8086
influxdb.db=influxTestDB
measurement.name.format=${topic}
event.time.fieldname=time
value.converter=io.confluent.connect.avro.AvroConverter
value.converter.schema.registry.url=http://localhost:8081
products
というトピックの Avro レコードを作成するプロデューサーコマンドを以下に示します。
kafka-avro-console-producer \
--broker-list localhost:9092 \
--topic products \
--property value.schema='{"name": "myrecord","type": "record","fields": [{"name":"id","type":"int"}, {"name": "time","type":"long"}]}'
コンソールプロデューサーは、入力を待機します。timestamp は、UNIX エポックからの経過時間(ミリ秒)(UNIX 時間)である必要があります。
{"id": 1, "time": 123412341234}
以下のクエリは、InfluxDB に書き込まれている timestamp スタンプを示しています。
> precision ms
> select * from products;
name: products
time id
---- --
123412341234 1
レコード構造¶
各 InfluxDB レコードは、measurement
、tags
(オプション)、定義した値フィールド、および timestamp で構成されます。
{
"measurement": "cpu",
"tags": {
"hostname": "test",
"ip": "10.2.3.4"
},
"cpu1": 10,
"cpu2": 5,
"cpu3": 15
}
measurement
は必須フィールドで、String
型である必要があります。ただし、コネクターのmeasurement.name.format
およびinfluxdb.db
が指定されている場合、measurement
はオプションです。つまり、レコードに必須ではありません。tags
はオプションのフィールドで、map
型(Avro ではrecords
)である必要があります。- 他のすべてのフィールドは値フィールドと見なされ、
Float
、Integer
、String
、またはBoolean
型にすることができます。 - レコードには少なくとも 1 つの値フィールドが必要です。
- レコードのヘッダーのタイムスタンプは、InfluxDB の timestamp として使用されます。
詳細については、InfluxDB のドキュメント を参照してください。