重要

このページの日本語コンテンツは古くなっている可能性があります。最新の英語版コンテンツをご覧になるには、こちらをクリックしてください。

InfluxDB 2 Sink Connector for Confluent Cloud

注釈

Confluent Platform 用にコネクターをローカルにインストールする場合は、「InfluxDB Sink Connector for Confluent Platform」を参照してください。

フルマネージド型の Kafka Connect InfluxDB 2 Sink Connector は、データを Apache Kafka® トピックから InfluxDB バケットに書き込みます。

機能

InfluxDB 2 Sink Connector は、次の機能をサポートしています。

  • 少なくとも 1 回のデリバリー: コネクターによって、Kafka のトピックからのレコードが少なくとも 1 回は配信されることが保証されます。
  • 複数のタスクのサポート: 1 つまたは複数のタスクの実行をサポートしています。タスクが多いほどパフォーマンスが向上する可能性があります。

Confluent Cloud API for Connect の詳細と使用例については、「Confluent Cloud API for Connect」セクションを参照してください。

制限

以下の情報を確認してください。

レコード構造

各レコードは JSON フォーマットです。複数の InfluxdDB フィールドと、タグセクション("tags")1 つ、measurement セクション("measurement")1 つを含めることができます。以下の例は、コネクター用に必要なレコード構造を示しています。

{
 "measurement":"measurement-name",
  "tags": {
    "tag1":"value1",
    "tag2":"value2"
  },
 "time-field":<timestamp-in-epochs>,
 "field1":<value>,
 "field2":<value>,
 ...
}

以下の点に注意してください。

  • "tags" セクションは省略可能です。このセクションには、一連のフィールドと関連付けられているタグのリストを指定します。各タグは、文字列型のキーと値ペアである必要があります。
  • "measurement" フィールドには、InfluxDB measurement の名前を指定します。このフィールドは省略可能です。ただし、ここで measurement の名前を指定しない場合は、measurement.name.format 構成プロパティ で measurement の名前を指定する必要があります。また、このフィールドを指定すると、Kafka レコードの指定はすべてオーバーライドされます。
  • 1 つのレコードに複数のフィールドを含めることができます。フィールドには、int、float、boolean、string のいずれかの型を使用できます。
  • event.time.fieldname 構成プロパティ を使用して、レコードのタイムスタンプ情報が格納されるフィールドを 1 つ指定することができます。未指定の場合、使用されるタイムスタンプは Kafka レコードのタイムスタンプになります。
  • AVRO、PROTOBUF、JSON_SR の構造は同じです。対応するスキーマが スキーマレジストリ に存在する必要があることに注意してください。

クイックスタート

このクイックスタートを使用して、Confluent Cloud InfluxDB 2 Sink Connector の利用を開始することができます。このクイックスタートでは、コネクターを選択し、InfluxDB バケットにイベントをストリーミングするようにコネクターを構成するための基本的な方法について示します。

前提条件
  • アマゾンウェブサービス (AWS)、Microsoft Azure (Azure)、または Google Cloud Platform (GCP)上の Confluent Cloud クラスターへのアクセスを許可されていること。

  • Confluent CLI がインストールされ、クラスター用に構成されていること。「Confluent CLI のインストール」を参照してください。

  • InfluxDB へのデータの書き込みアクセスが許可されていること。詳細については、InfluxDB へのデータの書き込み を参照してください。

    注釈

    コネクターには、データを送信するバケットに対する --read-bucket および --write-bucket のアクセス許可が必要です。詳細については、「influx auth create」を参照してください。

  • スキーマレジストリ ベースのフォーマット(Avro、JSON_SR(JSON スキーマ)、Protobuf など)を使用するには、Schema Registry を有効にしておく必要があります。

  • シンクコネクターを作成する前に、Confluent Cloud クラスター上にソース Kafka トピックが 1 つ以上存在している必要があります。

Confluent Cloud Console の使用

ステップ 1: Confluent Cloud クラスターを起動します。

インストール手順については、「Quick Start for Confluent Cloud」を参照してください。

ステップ 2: コネクターを追加します。

左のナビゲーションメニューの Data integration をクリックし、Connectors をクリックします。クラスター内に既にコネクターがある場合は、+ Add connector をクリックします。

ステップ 3: コネクターを選択します。

Click the InfluxDB 2 Sink connector card.

InfluxDB 2 Sink Connector Card

Step 4: Enter the connector details.

注釈

  • すべての 前提条件 を満たしていることを確認してください。
  • アスタリスク( * )は必須項目であることを示しています。

At the Add InfluxDB 2 Sink Connector screen, complete the following:

If you've already populated your Kafka topics, select the topic(s) you want to connect from the Topics list.

To create a new topic, click +Add new topic.

ステップ 5: ファイルを確認します。

InfluxDB ホストでデータが生成されていることを確認します。

Confluent Cloud API for Connect の詳細と使用例については、「Confluent Cloud API for Connect」セクションを参照してください。

ちなみに

コネクターを起動すると、デッドレターキューのトピックが自動的に作成されます。詳細については、「Confluent Cloud デッドレターキュー」を参照してください。

Confluent CLI の使用

Confluent CLI でコネクターをセットアップして実行するには、次の手順を実行します。

注釈

すべての 前提条件 を満たしていることを確認してください。

ステップ 1: 使用可能なコネクターをリスト表示します。

以下のコマンドを入力して、使用可能なコネクターをリスト表示します。

confluent connect plugin list

ステップ 2: コネクターの必須の構成プロパティを表示します。

以下のコマンドを実行して、コネクターの必須プロパティを表示します。

confluent connect plugin describe <connector-catalog-name>

例:

confluent connect plugin describe InfluxDB2Sink

出力例:

The following are required configs:
connector.class : InfluxDB2Sink
topics
input.data.format
name
kafka.api.key
kafka.api.secret
influxdb.url
influxdb.token
influxdb.org.id
influxdb.bucket

ステップ 3: コネクターの構成ファイルを作成します。

コネクター構成プロパティを含む JSON ファイルを作成します。以下の例は、コネクターの必須プロパティを示しています。

{
  "connector.class": "InfluxDB2Sink",
  "topics": "orders",
  "input.data.format": "JSON",
  "name": "InfluxDB2Sink_0",
  "kafka.api.key": "****************",
  "kafka.api.secret": "*********************************",
  "influxdb.url": "http://influxdb-test.com:8086",
  "influxdb.token": "***************************",
  "influxdb.org.id": "<organization-id>",
  "influxdb.bucket": "<bucket-name>",
  "tasks.max": "1",
}

以下のプロパティ定義に注意してください。

  • "connector.class": コネクターのプラグイン名を指定します。
  • "topics": 特定のトピック名を指定するか、複数のトピック名をコンマ区切りにしたリストを指定します。
  • input.data.format (Kafka トピックから送られるデータ): AVRO、PROTOBUF、JSON_SR(JSON スキーマ)、JSON(スキーマレス)がサポートされています。スキーマベースのメッセージフォーマット(Avro、JSON_SR(JSON スキーマ)、Protobuf など)を使用するには、有効なスキーマが Schema Registry に存在する必要があります。
  • "name": 新しいコネクターの名前を設定します。
  • "kafka.auth.mode": 使用するコネクターの認証モードを指定します。オプションは SERVICE_ACCOUNT または KAFKA_API_KEY (デフォルト)です。API キーとシークレットを使用するには、構成プロパティ kafka.api.keykafka.api.secret を構成例(前述)のように指定します。サービスアカウント を使用するには、プロパティ kafka.service.account.id=<service-account-resource-ID>リソース ID を指定します。使用できるサービスアカウントのリソース ID のリストを表示するには、次のコマンドを使用します。

    confluent iam service-account list
    

    例:

    confluent iam service-account list
    
       Id     | Resource ID |       Name        |    Description
    +---------+-------------+-------------------+-------------------
       123456 | sa-l1r23m   | sa-1              | Service account 1
       789101 | sa-l4d56p   | sa-2              | Service account 2
    
  • "influxdb.url": 接続の確立に使用する完全修飾 InfluxDB API URL。たとえば、http://influxdb-test.com:8086 です

  • "influxdb.token": InfluxDB ホストでの認証に使用するトークン。

  • "influxdb.org.id": InfluxDB 組織 ID。

    注釈

    コネクターには、データを送信するバケットに対する --read-bucket および write-bucket のアクセス許可が必要です。詳細については、「influx auth create」を参照してください。

    詳細については、InfluxDB へのデータの書き込み を参照してください。

  • "influxdb.bucket": コネクターからデータを送信するバケット。

  • "tasks.max": このコネクターで使用できる タスク の最大数を入力します。タスクが多いほどパフォーマンスが向上する可能性があります。

Single Message Transforms: CLI を使用した SMT の追加の詳細については、Single Message Transforms(SMT) のドキュメントを参照してください。

すべてのプロパティの値と説明については、「構成プロパティ」を参照してください。

ステップ 3: プロパティファイルを読み込み、コネクターを作成します。

以下のコマンドを入力して、構成を読み込み、コネクターを起動します。

confluent connect create --config <file-name>.json

例:

confluent connect create --config influxdb2-sink-config.json

出力例:

Created connector InfluxDB2Sink_0 lcc-do6vzd

ステップ 4: コネクターのステータスを確認します。

以下のコマンドを入力して、コネクターのステータスを確認します。

confluent connect list

出力例:

ID           |             Name          | Status  | Type | Trace
+------------+---------------------------+---------+------+-------+
lcc-do6vzd   | InfluxDB2Sink_0           | RUNNING | sink |       |

ステップ 5: ファイルを確認します。

InfluxDB 2 ホストでデータが生成されていることを確認します。

Confluent Cloud API for Connect の詳細と使用例については、「Confluent Cloud API for Connect」セクションを参照してください。

ちなみに

コネクターを起動すると、デッドレターキューのトピックが自動的に作成されます。詳細については、「Confluent Cloud デッドレターキュー」を参照してください。

構成プロパティ

このコネクターでは、以下のコネクター構成プロパティを使用します。

データの取得元とするトピック(Which topics do you want to get data from?)

topics

特定のトピック名を指定するか、複数のトピック名をコンマ区切りにしたリストを指定します。

  • 型: list
  • 重要度: 高

入力メッセージ(Input messages)

input.data.format

Kafka 入力レコード値のフォーマットを設定します。指定可能なエントリは、AVRO、JSON_SR、PROTOBUF、またはプレーン JSON です。スキーマベースのメッセージフォーマット(AVRO、JSON_SR、PROTOBUF など)を使用する場合は、Confluent Cloud Schema Registry を構成しておく必要がある点に注意してください。

  • 型: string
  • 重要度: 高

データへの接続方法(How should we connect to your data?)

name

コネクターの名前を設定します。

  • 型: string
  • 指定可能な値: 最大 64 文字の文字列
  • 重要度: 高

Kafka クラスターの認証情報(Kafka Cluster credentials)

kafka.auth.mode

Kafka の認証モード。KAFKA_API_KEY または SERVICE_ACCOUNT を指定できます。デフォルトは KAFKA_API_KEY モードです。

  • 型: string
  • デフォルト: KAFKA_API_KEY
  • 指定可能な値: KAFKA_API_KEY、SERVICE_ACCOUNT
  • 重要度: 高
kafka.api.key
  • 型: password
  • 重要度: 高
kafka.service.account.id

Kafka クラスターとの通信用の API キーを生成するために使用されるサービスアカウント。

  • 型: string
  • 重要度: 高
kafka.api.secret
  • 型: password
  • 重要度: 高

InfluxDB

influxdb.url

接続の確立に使用する完全修飾 InfluxDB API URL。

  • 型: string
  • 重要度: 高
influxdb.token

influx db での認証に使用するトークン。

  • 型: password
  • 重要度: 高
influxdb.org.id

組織 ID。

  • 型: string
  • 重要度: 高

書き込みの構成(Write Configuration)

influxdb.bucket

このコネクターからのデータの送信先となるバケット。

  • 型: string
  • 重要度: 高
write.precision

InfluxDB タイムスタンプの書き込みの精度。指定可能な値は、Seconds、Milliseconds、Microseconds、Nanoseconds です。'event.time.fieldname' を使用してタイムスタンプフィールドを指定せず、Kafka レコードのタイムスタンプが使用される場合、Kafka のタイムスタンプ(ミリ秒)はここで指定された精度に変換されます。ここで精度を指定しない場合は、「event.time.fieldname」で適切な時間の単位を指定する必要があります。

  • 型: string
  • デフォルト: Milliseconds
  • 重要度: 中
event.time.fieldname

InfluxDB データポイントに書き込まれるイベント時刻が含まれている Kafka レコードのフィールドの名前。デフォルトでは(この構成が未指定の場合)、InfluxDB に書き込まれるタイムスタンプは、イベントが処理された時刻に対応する Kafka レコードのタイムスタンプ(Kafka レコードが作成された時刻)です。

  • 型: string
  • 重要度: 中
measurement.name.format

マップ先 measurement 名のフォーマット制御文字列。マップ元のトピック名を表すプレースホルダーとして「${topic}」を含めることができます。

たとえば、トピック「orders」の kafka_${topic} は、measurement 名「kafka_orders」にマップされます。measurement 名のフォーマットが指定されていない場合、コネクターでは、Kafka メッセージ内にある「measurement」フィールドの値が使用されます。対応するフィールドがメッセージ内に存在しない場合、そのメッセージは dlq に送信されます。

  • 型: string
  • 重要度: 中
influxdb.gzip.enable

gzip を有効にするかどうかを指定するフラグ。

  • 型: boolean
  • デフォルト: false
  • 重要度: 低

再試行

retry.backoff.ms

再試行まで待機するバックオフ時間。

  • 型: int
  • デフォルト: 1000(1 秒)
  • 重要度: 中
max.retries

エラー時に再試行する最大回数。これを超えるとタスクは失敗します。

  • 型: int
  • デフォルト: 10
  • 重要度: 中

このコネクターのタスク数(Number of tasks for this connector)

tasks.max
  • 型: int
  • 指定可能な値: [1,...]
  • 重要度: 高

次のステップ

参考

フルマネージド型の Confluent Cloud コネクターが Confluent Cloud ksqlDB でどのように動作するかを示す例については、「Cloud ETL のデモ」を参照してください。この例では、Confluent CLI を使用して Confluent Cloud のリソースを管理する方法についても説明しています。

../_images/topology.ja.png