重要

このページの日本語コンテンツは古くなっている可能性があります。最新の英語版コンテンツをご覧になるには、こちらをクリックしてください。

InfluxDB 2 Source Connector for Confluent Cloud

注釈

Confluent Platform 用にコネクターをローカルにインストールする場合は、「InfluxDB Source Connector for Confluent Platform」を参照してください。

フルマネージド型の Kafka Connect InfluxDB 2 Source Connector を使用すると、データを InfluxDB ホストから Apache Kafka® トピックにインポートできます。

コネクターは、InfluxDB クエリを定期的に実行し、結果セットの各行の出力レコードを作成することによって、データを読み込みます。デフォルトでは、データベース内のすべての measurement は、それぞれの出力トピックにコピーされます。コネクターはデータベースで新しい measurement をモニタリングし、自動的に調整します。measurement からデータをコピーする際、コネクターは新しいレコードのみを読み込みます。

機能

InfluxDB 2 Source Connector は、次の機能をサポートしています。

  • 少なくとも 1 回のデリバリー: コネクターによって、Kafka のトピックからのレコードが少なくとも 1 回は配信されることが保証されます。
  • Supports one task: コネクターは単一のタスクの実行をサポートしており、このタスクは、クエリモードの場合に開始されます。それ以外の場合は、構成されている measurement または max-tasks の最小数に基づいてタスクが開始されます。

Confluent Cloud API for Connect の詳細と使用例については、「Confluent Cloud API for Connect」セクションを参照してください。

制限

以下の情報を確認してください。

クイックスタート

このクイックスタートを使用して、Confluent Cloud InfluxDB 2 Source Connector の利用を開始することができます。このクイックスタートでは、コネクターを選択し、Apache Kafka® にイベントをストリーミングするようにコネクターを構成するための基本的な方法について説明します。

前提条件
  • アマゾンウェブサービス (AWS)、Microsoft Azure (Azure)、または Google Cloud Platform (GCP)上の Confluent Cloud クラスターへのアクセスを許可されていること。

  • Confluent CLI がインストールされ、クラスター用に構成されていること。「Confluent CLI のインストール」を参照してください。

  • InfluxDB バケットに対してクエリを実行するためのアクセスが許可されていること。詳しくは、「Query data」を参照してください。

    注釈

    コネクターには、データを送信するバケットに対する --read-bucket のアクセス許可が必要です。詳しくは、「Query data」を参照してください。

  • スキーマレジストリ ベースのフォーマット(Avro、JSON_SR(JSON スキーマ)、Protobuf など)を使用するには、Schema Registry を有効にしておく必要があります。

Confluent Cloud Console を使用する場合

ステップ 1: Confluent Cloud クラスターを起動します。

インストール手順については、「Quick Start for Confluent Cloud」を参照してください。

ステップ 2: コネクターを追加します。

左のナビゲーションメニューの Data integration をクリックし、Connectors をクリックします。クラスター内に既にコネクターがある場合は、+ Add connector をクリックします。

ステップ 3: コネクターを選択します。

Click the InfluxDB 2 Source connector card.

InfluxDB 2 Source Connector Card

Step 4: Enter the connector details.

注釈

  • すべての 前提条件 を満たしていることを確認してください。
  • アスタリスク( * )は必須項目であることを示しています。

At the Add InfluxDB 2 Source Connector screen, complete the following:

In the Kafka Topic Name Prefix field, define a topic prefix your connector will use to publish to Kafka topics. The connector publishes Kafka topics using the following naming convention: <topic.prefix><tableName>.

ステップ 5: ファイルを確認します。

InfluxDB ホストでデータが生成されていることを確認します。

Confluent Cloud API for Connect の詳細と使用例については、「Confluent Cloud API for Connect」セクションを参照してください。

ちなみに

コネクターを起動すると、デッドレターキューのトピックが自動的に作成されます。詳細については、「Confluent Cloud デッドレターキュー」を参照してください。

Confluent CLI の使用

Confluent CLI でコネクターをセットアップして実行するには、次の手順を実行します。

注釈

すべての 前提条件 を満たしていることを確認してください。

ステップ 1: 使用可能なコネクターをリスト表示します。

以下のコマンドを入力して、使用可能なコネクターをリスト表示します。

confluent connect plugin list

ステップ 2: コネクターの必須の構成プロパティを表示します。

以下のコマンドを実行して、コネクターの必須プロパティを表示します。

confluent connect plugin describe <connector-catalog-name>

例:

confluent connect plugin describe InfluxDB2Source

出力例:

The following are required configs:
connector.class : InfluxDB2Source
name
kafka.api.key : ["kafka.api.key" is required when "kafka.auth.mode==KAFKA_API_KEY"]
kafka.api.secret : ["kafka.api.secret" is required when "kafka.auth.mode==KAFKA_API_KEY"]
influxdb.url
influxdb.token
influxdb.org.id
influxdb.bucket
topic.prefix
output.data.format
tasks.max

ステップ 3: コネクターの構成ファイルを作成します。

コネクター構成プロパティを含む JSON ファイルを作成します。以下の例は、コネクターの必須プロパティを示しています。

{
  "connector.class": "InfluxDB2Source",
  "name": "InfluxDB2Source_0",
  "kafka.api.key": "****************",
  "kafka.api.secret": "*********************************",
  "influxdb.url": "http://influxdb-test.com:8086",
  "influxdb.token": "***************************",
  "influxdb.org.id": "<organization-id>",
  "influxdb.bucket": "<bucket-name>",
  "topic.prefix": "<topic-prefix>",
  "output.data.format": "JSON",
  "tasks.max": "1",
}

以下のプロパティ定義に注意してください。

  • "connector.class": コネクターのプラグイン名を指定します。
  • "name": 新しいコネクターの名前を設定します。
  • "kafka.auth.mode": 使用するコネクターの認証モードを指定します。オプションは SERVICE_ACCOUNT または KAFKA_API_KEY (デフォルト)です。API キーとシークレットを使用するには、構成プロパティ kafka.api.keykafka.api.secret を構成例(前述)のように指定します。サービスアカウント を使用するには、プロパティ kafka.service.account.id=<service-account-resource-ID>リソース ID を指定します。使用できるサービスアカウントのリソース ID のリストを表示するには、次のコマンドを使用します。

    confluent iam service-account list
    

    例:

    confluent iam service-account list
    
       Id     | Resource ID |       Name        |    Description
    +---------+-------------+-------------------+-------------------
       123456 | sa-l1r23m   | sa-1              | Service account 1
       789101 | sa-l4d56p   | sa-2              | Service account 2
    
  • "influxdb.url": 接続の確立に使用する完全修飾 InfluxDB API URL。たとえば、http://influxdb-test.com:8086 です

  • "influxdb.token": InfluxDB ホストでの認証に使用するトークン。

  • "influxdb.org.id": InfluxDB 組織 ID。

    注釈

    コネクターには、データを送信するバケットに対する --read-bucket のアクセス許可が必要です。詳しくは、「Query data」を参照してください。

    詳細については、InfluxDB へのデータの書き込み を参照してください。

  • "influxdb.bucket": コネクターからデータを送信するバケット。

  • "output.data.format": サポートされているフォーマットは、AVRO、PROTOBUF、JSON_SR(JSON スキーマ)、JSON(スキーマレス)です。スキーマベースのメッセージフォーマット(Avro、JSON_SR(JSON スキーマ)、Protobuf など)を使用するには、有効なスキーマが Schema Registry に存在する必要があります。

  • "tasks.max": このコネクターで使用する タスク の数を入力します。コネクターは、単一のタスクの実行をサポートしており、このタスクは、クエリモードの場合に開始されます。それ以外の場合は、構成されている measurement または max-tasks の最小数に基づいてタスクが開始されます。

Single Message Transforms: CLI を使用した SMT の追加の詳細については、Single Message Transforms(SMT) のドキュメントを参照してください。

すべてのプロパティの値と説明については、「構成プロパティ」を参照してください。

ステップ 3: プロパティファイルを読み込み、コネクターを作成します。

以下のコマンドを入力して、構成を読み込み、コネクターを起動します。

confluent connect create --config <file-name>.json

例:

confluent connect create --config influxdb2-source-config.json

出力例:

Created connector InfluxDB2Source_0 lcc-do6vzd

ステップ 4: コネクターのステータスを確認します。

以下のコマンドを入力して、コネクターのステータスを確認します。

confluent connect list

出力例:

ID           |             Name        | Status  | Type   | Trace
+------------+-------------------------+---------+--------+-------+
lcc-do6vzd   | InfluxDB2Source_0       | RUNNING | Source |       |

ステップ 5: ファイルを確認します。

Kafka トピックでデータが生成されていることを確認します。

Confluent Cloud API for Connect の詳細と使用例については、「Confluent Cloud API for Connect」セクションを参照してください。

ちなみに

コネクターを起動すると、デッドレターキューのトピックが自動的に作成されます。詳細については、「Confluent Cloud デッドレターキュー」を参照してください。

構成プロパティ

このコネクターでは、以下のコネクター構成プロパティを使用します。

データへの接続方法(How should we connect to your data?)

name

コネクターの名前を設定します。

  • 型: string
  • 指定可能な値: 最大 64 文字の文字列
  • 重要度: 高

Kafka クラスターの認証情報(Kafka Cluster credentials)

kafka.auth.mode

Kafka の認証モード。KAFKA_API_KEY または SERVICE_ACCOUNT を指定できます。デフォルトは KAFKA_API_KEY モードです。

  • 型: string
  • デフォルト: KAFKA_API_KEY
  • 指定可能な値: SERVICE_ACCOUNT、KAFKA_API_KEY
  • 重要度: 高
kafka.api.key
  • 型: password
  • 重要度: 高
kafka.service.account.id

Kafka クラスターとの通信用の API キーを生成するために使用されるサービスアカウント。

  • 型: string
  • 重要度: 高
kafka.api.secret
  • 型: password
  • 重要度: 高

InfluxDB

influxdb.url

接続の確立に使用する完全修飾 InfluxDB API URL。

  • 型: string
  • 重要度: 高
influxdb.token

influx db での認証に使用するトークン。

  • 型: password
  • 重要度: 高
influxdb.org.id

組織 ID。

  • 型: string
  • 重要度: 高
influxdb.bucket

このコネクターがデータを読み取るバケット。

  • 型: string
  • 重要度: 中

読み取りの構成(Read Configuration)

query

指定した場合、このクエリが実行され、結果のレコードが目的の Apache Kafka トピックにプッシュされます。フィールドまたはタグのサブセットを選択したり、集約を実行したり、データをフィルター処理したりする必要がある場合は、この設定を使用します。クエリはテンプレートに従う必要があります(import "influxdata/influxdb/schema"\n from(bucket:$influxdb.bucket)\n |> range(start: $startTimestamp, stop: $endTimestamp)\n |> <Your custom query criteria here>\n |> schema.fieldsAsCols()\n |> limit(n: $batch.size)\n)。mode=bulk の場合、コネクターはポーリングのたびにクエリをそのまま実行します。範囲の条件は、ユーザーが指定する必要があります。Flux ではクエリを無制限に実行することはできません。無制限のクエリはリソース使用率が高くなるためです。mode=timestamp を使用すると、$startTimestamp$endTimestamp の値は、適切なソースオフセットを使用してコネクターによって自動的に入力されます。ユーザーは記載されているその他の条件を <Your custom query criteria here> の部分と置き換える必要があります。コネクターによって $influxdb.bucket$batch.size が、対応する構成の値に置き換えられます。

  • 型: string
  • 重要度: 中
mode

InfluxDB の measurement をポーリングする場合のモード。サポートされているモードには次のものがあります。bulk では、ポーリングのたびに、目的の Apache Kafka トピックへの measurement 全体の一括読み込みが実行されます。timestamp では、タイムスタンプを使用して、新しく作成された行が検出され、それらの行が目的の Apache Kafka トピックに書き込まれます。

  • 型: string
  • デフォルト: timestamp
  • 重要度: 中
topic.mapper

トピックのマップ方法を指定する構成。サポートされているオプションには次のものがあります。bucket の場合、トピック名は「トピックのプレフィックス + バケット名」になります。すべてのレコードが同じトピックにマッピングされます。または measurement の場合、トピック名は「トピックのプレフィックス + measurement 名」になります。同じ measurement のレコードがすべて同じトピックにマッピングされます。

  • 型: string
  • デフォルト: bucket
  • 重要度: 中
topic.prefix

データのパブリッシュ先である Apache Kafka トピックの名前(カスタムクエリの場合には Apache Kafka トピックのフルネーム)を指定するために measurement 名の先頭に付けるプレフィックス。

  • 型: string
  • 重要度: 中
batch.size

新しいデータのポーリング時に単一のバッチに含める point の最大数。この設定を使用して、コネクターの内部にバッファリングするデータの量を制限できます。

  • 型: int
  • デフォルト: 5000
  • 重要度: 中
timestamp.delay.interval.ms

特定の timestamp を持つレコードが出現してからそれを結果に含めるまで待機する時間。より早いタイムスタンプを持つトランザクションが完了できるように、遅延を追加できます。最初の実行では、現在の時刻から遅延を引いた時刻までの利用可能なすべての(Unix エポック時間からの)レコードをフェッチします。それ以降の各実行では、前のバッチでフェッチされた最後のレコードの時刻を起点に、現在の時刻から遅延を引いた時刻までのデータを取得します。

  • 型: int
  • デフォルト: 0
  • 重要度: 中
influxdb.measurement.whitelist

コピーに含める measurement のコンマ区切りのリスト。指定した場合は、Measurements Excluded を設定できません。空白のままにすると、すべての measurement が対象に含まれます。

  • 型: string
  • 重要度: 中
influxdb.measurement.blacklist

コピーから除外する measurement のコンマ区切りのリスト。指定した場合は、Measurements Included を設定できません。

  • 型: string
  • 重要度: 中

再試行(Retries)

retry.backoff.ms

再試行まで待機するバックオフ時間。

  • 型: int
  • デフォルト: 1000(1 秒)
  • 重要度: 中
max.retries

エラー時に再試行する最大回数。これを超えるとタスクは失敗します。

  • 型: int
  • デフォルト: 10
  • 重要度: 中

出力メッセージ(Output messages)

output.data.format

Kafka 出力レコード値のフォーマットを設定します。指定可能なエントリは、AVRO、JSON_SR、PROTOBUF、または JSON です。スキーマベースのメッセージフォーマット(AVRO、JSON_SR、PROTOBUF など)を使用する場合は、Confluent Cloud Schema Registry を構成しておく必要がある点に注意してください

  • 型: string
  • 重要度: 高

このコネクターのタスク数(Number of tasks for this connector)

tasks.max
  • 型: int
  • 指定可能な値: [1,...]
  • 重要度: 高

次のステップ

参考

フルマネージド型の Confluent Cloud コネクターが Confluent Cloud ksqlDB でどのように動作するかを示す例については、「Cloud ETL のデモ」を参照してください。この例では、Confluent CLI を使用して Confluent Cloud のリソースを管理する方法についても説明しています。

../_images/topology.ja.png