InfluxDB Source Connector for Confluent Platform¶
Kafka Connect InfluxDB Source Connector を使用すると、データを InfluxDB ホストから Apache Kafka® トピックにインポートできます。
Influx クエリを定期的に実行し、結果セットの各行の出力レコードを作成することによって、データが読み込まれます。デフォルトでは、database 内のすべての measurement は、それぞれの出力トピックにコピーされます。新しい measurement の場合は database がモニタリングされ、自動的に調整されます。データを measurement からコピーすると、コネクターは新しいレコードのみを読み込みます。
機能¶
ソースコネクターでは、さまざまな InfluxDB データ型の measurement のコピー、database からの measurement の動的な追加、ホワイトリストとブラックリスト、さまざまなポーリング間隔などの設定がサポートされています。ただし、ほとんどのユーザーにとって最も重要な機能は、データを database から増分コピーする方法を制御する設定です。
Kafka Connect では、各 measurement から取得した最新のレコードを追跡するため、次回の反復時(またはクラッシュ時)に正しい場所から開始できます。ソースコネクターでは、この機能を使用して、反復ごとに measurement (またはカスタムクエリの出力)からアップデートされたレコードのみを取得します。いくつかのモードがサポートされており、それぞれで、変更された行の検出方法が異なります。
InfluxDB Source Connector には、以下の機能があります。
少なくとも 1 回のデリバリー¶
このコネクターによって、レコードが Kafka のトピックに少なくとも 1 回は配信されることが保証されます。コネクターを再起動した場合、Kafka のトピックに重複レコードが存在している可能性があります。
1 つのタスクをサポート¶
InfluxDB Source Connector は、1 つのタスクの実行をサポートしており、このタスクは、クエリモードのときに開始されます。それ以外のときは、構成されている measurement または max-tasks
の最小数に基づいてタスクが開始されます。
構成プロパティ¶
このコネクターの構成プロパティの網羅的なリストについては、「InfluxDB Source Connector 構成プロパティ」を参照してください。
クイックスタート¶
このクイックスタートでは、単一の measurement のデータを、Docker で実行されているローカル Influx database から Kafka トピックにコピーします。
This example assumes you are running Kafka and Schema Registry locally on the default ports. It also assumes your have Docker installed and running.
まず、以下の Docker コマンドを実行して、Influx database を開始します。
docker run -d -p 8086:8086 --name influxdb-local influxdb:1.7.7
Influx database が開始され、localhost
上のポート 8086 にマップされます。デフォルトでは、ユーザー名とパスワードは空白です。database 接続 URL は http://localhost:8086
です。
Influx database にサンプルデータを作成するには、以下のコマンドを使用して Docker コンテナーにログインします。
docker exec -it <containerid> bash
ちなみに
コンテナー ID を調べるには、docker ps
コマンドを使用します。
Docker コンテナーにログインしたら、InfluxDB シェルにログインします。
influx
出力は以下のようになります。
Connected to http://localhost:8086 version 1.7.7
InfluxDB shell version: 1.7.7
Influx database の作成とデータの読み込み¶
以下のコマンドで Influx database を作成します。
> create database testdb;
以下を実行して新しい database を確認します。
> show databases;
InfluxDB コマンドプロンプトで、measurement を作成していくつかのデータを格納します。
> use testdb; Using database testdb > INSERT coin,id=1 value=100
ちなみに
SELECT * from coin;
を実行して、measurement データを検証できます。
InfluxDB Source Connector の開始¶
以下の Confluent CLI コマンドを使用して Confluent Platform を開始します。
ちなみに
Confluent CLI 開発用コマンドのコマンド構文は、5.3.0 で変更されています。該当するコマンドは confluent local
に移行されています。たとえば、confluent start
の構文は、confluent local services start
に変わりました。詳しくは、「confluent local」を参照してください。
confluent local services start
次に、コネクターの構成ファイルを作成します。このファイルは、コネクターの ./etc/kafka-connect-influxdb/influxdb-source-connector.properties
に含まれ、以下の設定があります。
name=InfluxDBSourceConnector
connector.class=io.confluent.influxdb.source.InfluxdbSourceConnector
tasks.max=1
topic.prefix=influx_
influxdb.url=http://localhost:8086
influxdb.db=testdb
mode=timestamp
value.converter=org.apache.kafka.connect.json.JsonConverter
value.converter.schemas.enable=false
この構成で InfluxDB Source Connector を読み込みます。
confluent local services connect connector load influx-source-connector --config etc/kafka-connect-influxdb/influxdb-source-connector.properties
Kafka Connect を開始したときにあったデータがコピーされていることを確認するには、トピックの最初から読み取ってコンソールコンシューマーを起動します。
kafka-console-consumer --bootstrap-server localhost:9092 --topic influx_coin --from-beginning
出力は以下のようになります。
{"measurement":"coin","tags":{"id":"1"},"time":"2019-07-24T10:14:04.979737851Z","value":100.0}
デフォルトのポーリング間隔は 5 秒であるため、表示されるまで数秒かかる場合があります。想定されるアップデートレートまたは必要なレイテンシに応じて、短いポーリング間隔を使用して、アップデートをより早く配信することもできます。
オフセット管理やフォールトトレランスなど、Kafka Connect のすべての機能 はソースコネクターで動作します。プロセスを再開したり強制終了したりすることができます。プロセスは、中断したところから再開して、新しいデータのみをコピーします( mode
設定で定義)。
クエリモード¶
各増分クエリモードでは、処理済みのレコードと新規のレコードを追跡するために使用する各 point の一連のフィールドを追跡します。この動作は、mode
設定で制御します。以下のオプションがサポートされています。
- Timestamp: このモードでは、timestamp を含む単一のレコードを使用して、データが最後に処理された時間を追跡し、その時間以降に追加されたレコードに対してのみクエリを実行します。
- カスタムクエリ: ソースコネクターでは、measurement 全体をコピーする代わりに、カスタムクエリを使用することができます。
WHERE
句がクエリに正しく追加されている限り、カスタムクエリとともに他のいずれかの自動アップデートモードを使用できます。または、指定したクエリ自体で新規アップデートに対するフィルターを処理できます。ただし、(レコードごとにtimestamp
列の値が記録される自動モードとは異なり)オフセットトラッキングは実行されないため、クエリ自体でオフセットを追跡する必要があります。 - Bulk: このモードはフィルター処理されないため、増分ではありません。反復ごとに measurement からすべてのレコードが読み込まれます。これは、エントリが最終的に削除され、ダウンストリームシステムで安全に重複を処理できる measurement 全体を定期的にダンプする場合に役に立ちます。
構成¶
ソースコネクターには、データのインポート元とデータのインポート方法についてかなり柔軟性があります。すべての構成オプションは InfluxDB Source Connector 構成プロパティ に記載されていますが、ここでは、一般的な使用シナリオを示すいくつかのテンプレート構成について説明します。
ホワイトリストを使用して、Influx database 内の measurement のサブセットに対する変更を制限します。ホワイトリストに登録されているすべての measurement に対して標準となっている timestamp
フィールドを使用して、作成されたレコードを検出します。このモードは、timestamp を使用して、プロセスがクエリの途中で終了しても変更が失われないようにすることができるため、最も堅牢性があります。
name=influx-whitelist-timestamp-source
connector.class=io.confluent.influxdb.source.InfluxdbSourceConnector
tasks.max=10
influxdb.url=http://localhost:8086
influxdb.db=testdb
influxdb.measurement.whitelist=users,products,transactions
mode=timestamp
topic.prefix=influx_
ブラックリストを使用して、Influx database からのデータのコピーから measurement を除外します。ブラックリストに登録されている measurement を除くすべての measurement をモニタリングします。
name=influx-blacklist-timestamp-source
connector.class=io.confluent.influxdb.source.InfluxdbSourceConnector
tasks.max=10
influxdb.url=http://localhost:8086
influxdb.db=testdb
influxdb.measurement.blacklist=users,products
mode=timestamp
topic.prefix=influx_
measurement を読み込むのではなくカスタムクエリを使用して、複数の measurement からデータを読み込むことができます。
name=influx-whitelist-timestamp-source
connector.class=io.confluent.influxdb.source.InfluxdbSourceConnector
tasks.max=10
influxdb.url=http://localhost:8086
influxdb.db=testdb
topic.prefix=influx_
query=Your-Custom-query