重要
このページの日本語コンテンツは古くなっている可能性があります。最新の英語版コンテンツをご覧になるには、こちらをクリックしてください。
InfluxDB 2 Sink Connector for Confluent Cloud¶
注釈
Confluent Platform 用にコネクターをローカルにインストールする場合は、「InfluxDB Sink Connector for Confluent Platform」を参照してください。
フルマネージド型の Kafka Connect InfluxDB 2 Sink Connector は、データを Apache Kafka® トピックから InfluxDB バケットに書き込みます。
機能¶
InfluxDB 2 Sink Connector は、次の機能をサポートしています。
- 少なくとも 1 回のデリバリー: コネクターによって、Kafka のトピックからのレコードが少なくとも 1 回は配信されることが保証されます。
- 複数のタスクのサポート: 1 つまたは複数のタスクの実行をサポートしています。タスクが多いほどパフォーマンスが向上する可能性があります。
Confluent Cloud API for Connect の詳細と使用例については、「Confluent Cloud API for Connect」セクションを参照してください。
制限¶
以下の情報を確認してください。
- コネクターの制限事項については、InfluxDB 2 Sink Connector の制限事項を参照してください。
- 1 つ以上の Single Message Transforms(SMT)を使用する場合は、「SMT の制限」を参照してください。
- Confluent Cloud Schema Registry を使用する場合は、「スキーマレジストリ Enabled Environments」を参照してください。
レコード構造¶
各レコードは JSON フォーマットです。複数の InfluxdDB フィールドと、タグセクション("tags"
)1 つ、measurement セクション("measurement"
)1 つを含めることができます。以下の例は、コネクター用に必要なレコード構造を示しています。
{
"measurement":"measurement-name",
"tags": {
"tag1":"value1",
"tag2":"value2"
},
"time-field":<timestamp-in-epochs>,
"field1":<value>,
"field2":<value>,
...
}
以下の点に注意してください。
"tags"
セクションは省略可能です。このセクションには、一連のフィールドと関連付けられているタグのリストを指定します。各タグは、文字列型のキーと値ペアである必要があります。"measurement"
フィールドには、InfluxDB measurement の名前を指定します。このフィールドは省略可能です。ただし、ここで measurement の名前を指定しない場合は、measurement.name.format
構成プロパティ で measurement の名前を指定する必要があります。また、このフィールドを指定すると、Kafka レコードの指定はすべてオーバーライドされます。- 1 つのレコードに複数のフィールドを含めることができます。フィールドには、int、float、boolean、string のいずれかの型を使用できます。
event.time.fieldname
構成プロパティ を使用して、レコードのタイムスタンプ情報が格納されるフィールドを 1 つ指定することができます。未指定の場合、使用されるタイムスタンプは Kafka レコードのタイムスタンプになります。- AVRO、PROTOBUF、JSON_SR の構造は同じです。対応するスキーマが スキーマレジストリ に存在する必要があることに注意してください。
クイックスタート¶
このクイックスタートを使用して、Confluent Cloud InfluxDB 2 Sink Connector の利用を開始することができます。このクイックスタートでは、コネクターを選択し、InfluxDB バケットにイベントをストリーミングするようにコネクターを構成するための基本的な方法について示します。
- 前提条件
アマゾンウェブサービス (AWS)、Microsoft Azure (Azure)、または Google Cloud Platform (GCP)上の Confluent Cloud クラスターへのアクセスを許可されていること。
Confluent CLI がインストールされ、クラスター用に構成されていること。「Confluent CLI のインストール」を参照してください。
InfluxDB へのデータの書き込みアクセスが許可されていること。詳細については、InfluxDB へのデータの書き込み を参照してください。
注釈
コネクターには、データを送信するバケットに対する
--read-bucket
および--write-bucket
のアクセス許可が必要です。詳細については、「influx auth create」を参照してください。スキーマレジストリ ベースのフォーマット(Avro、JSON_SR(JSON スキーマ)、Protobuf など)を使用するには、Schema Registry を有効にしておく必要があります。
シンクコネクターを作成する前に、Confluent Cloud クラスター上にソース Kafka トピックが 1 つ以上存在している必要があります。
Confluent Cloud Console の使用¶
ステップ 1: Confluent Cloud クラスターを起動します。¶
インストール手順については、「Quick Start for Confluent Cloud」を参照してください。
ステップ 2: コネクターを追加します。¶
左のナビゲーションメニューの Data integration をクリックし、Connectors をクリックします。クラスター内に既にコネクターがある場合は、+ Add connector をクリックします。
Step 4: Enter the connector details.¶
注釈
- すべての 前提条件 を満たしていることを確認してください。
- アスタリスク( * )は必須項目であることを示しています。
At the Add InfluxDB 2 Sink Connector screen, complete the following:
If you've already populated your Kafka topics, select the topic(s) you want to connect from the Topics list.
To create a new topic, click +Add new topic.
- Select the way you want to provide Kafka Cluster credentials. You can
choose one of the following options:
- Global Access: Allows your connector to access everything you have access to. With global access, connector access will be linked to your account. This option is not recommended for production.
- Granular access: Limits the access for your connector. You will be able to manage connector access through a service account. This option is recommended for production.
- Use an existing API key: Allows you to enter an API key and secret part you have stored. You can enter an API key and secret (or generate these in the Cloud Console).
- Click Continue.
- Add the following database connection details:
- InfluxDB API URL: 接続の確立に使用する完全修飾 InfluxDB API URL。たとえば、
http://influxdb-test.com:8086
です。 - InfluxDB Token: InfluxDB ホストでの認証に使用するトークン。
- InfluxDB Organization ID: InfluxDB 組織 ID。
- InfluxDB API URL: 接続の確立に使用する完全修飾 InfluxDB API URL。たとえば、
- Click Continue.
- Select an Input Kafka record value format (data coming from the Kafka topic): AVRO, PROTOBUF, JSON_SR (JSON Schema), or JSON (schemaless). A valid schema must be available in Schema Registry to use a schema-based message format (for example, Avro, JSON_SR, or Protobuf).
- In the Bucket Name field, enter the bucket where the connector sends data.
Show advanced configurations
Backoff Time: コネクターが再試行まで待機するバックオフ時間(ミリ秒)。デフォルトは
1000
ミリ秒です。Max retries: エラーの発生時にタスクを再試行する最大回数。これを超えるとタスクは失敗となります。デフォルトは
10
です。Enable compression: gzip を有効にするかどうかを指定します。デフォルトは
false
です。Event Time field name: コネクターが InfluxDB データポイントへの書き込みの際に使用するイベント時刻が格納されている Kafka レコードのフィールドの名前。何も入力されていない場合、使用されるデフォルト値は、Kafka レコードが作成された時刻を示す Kafka レコードのタイムスタンプになります。これは、イベントが処理された時刻と対応します。
Measurement Name Format: A format string for the destination measurement name, which may contain
${topic}
as a placeholder for the originating topic name.Write Precision: Influx DB タイムスタンプの書き込みの精度。指定可能な値は、
microseconds
、milliseconds
、nanoseconds
、seconds
です。デフォルト値はmilliseconds
です。Transforms and Predicates: For details, see the Single Message Transforms (SMT) documentation.
For all property values and definitions, see 構成プロパティ.
- Click Continue.
Based on the number of topic partitions you select, you will be provided with a recommended number of tasks.
- To change the number of recommended tasks, enter the number of tasks for the connector to use in the Tasks field.
- Click Continue.
Verify the connection details.
Click Launch.
コネクターのステータスが Provisioning から Running に変わります。
ステップ 5: ファイルを確認します。¶
InfluxDB ホストでデータが生成されていることを確認します。
Confluent Cloud API for Connect の詳細と使用例については、「Confluent Cloud API for Connect」セクションを参照してください。
ちなみに
コネクターを起動すると、デッドレターキューのトピックが自動的に作成されます。詳細については、「Confluent Cloud デッドレターキュー」を参照してください。
Confluent CLI の使用¶
Confluent CLI でコネクターをセットアップして実行するには、次の手順を実行します。
注釈
すべての 前提条件 を満たしていることを確認してください。
ステップ 2: コネクターの必須の構成プロパティを表示します。¶
以下のコマンドを実行して、コネクターの必須プロパティを表示します。
confluent connect plugin describe <connector-catalog-name>
例:
confluent connect plugin describe InfluxDB2Sink
出力例:
The following are required configs:
connector.class : InfluxDB2Sink
topics
input.data.format
name
kafka.api.key
kafka.api.secret
influxdb.url
influxdb.token
influxdb.org.id
influxdb.bucket
ステップ 3: コネクターの構成ファイルを作成します。¶
コネクター構成プロパティを含む JSON ファイルを作成します。以下の例は、コネクターの必須プロパティを示しています。
{
"connector.class": "InfluxDB2Sink",
"topics": "orders",
"input.data.format": "JSON",
"name": "InfluxDB2Sink_0",
"kafka.api.key": "****************",
"kafka.api.secret": "*********************************",
"influxdb.url": "http://influxdb-test.com:8086",
"influxdb.token": "***************************",
"influxdb.org.id": "<organization-id>",
"influxdb.bucket": "<bucket-name>",
"tasks.max": "1",
}
以下のプロパティ定義に注意してください。
"connector.class"
: コネクターのプラグイン名を指定します。"topics"
: 特定のトピック名を指定するか、複数のトピック名をコンマ区切りにしたリストを指定します。input.data.format
(Kafka トピックから送られるデータ): AVRO、PROTOBUF、JSON_SR(JSON スキーマ)、JSON(スキーマレス)がサポートされています。スキーマベースのメッセージフォーマット(Avro、JSON_SR(JSON スキーマ)、Protobuf など)を使用するには、有効なスキーマが Schema Registry に存在する必要があります。"name"
: 新しいコネクターの名前を設定します。
"kafka.auth.mode"
: 使用するコネクターの認証モードを指定します。オプションはSERVICE_ACCOUNT
またはKAFKA_API_KEY
(デフォルト)です。API キーとシークレットを使用するには、構成プロパティkafka.api.key
とkafka.api.secret
を構成例(前述)のように指定します。サービスアカウント を使用するには、プロパティkafka.service.account.id=<service-account-resource-ID>
に リソース ID を指定します。使用できるサービスアカウントのリソース ID のリストを表示するには、次のコマンドを使用します。confluent iam service-account list
例:
confluent iam service-account list Id | Resource ID | Name | Description +---------+-------------+-------------------+------------------- 123456 | sa-l1r23m | sa-1 | Service account 1 789101 | sa-l4d56p | sa-2 | Service account 2
"influxdb.url"
: 接続の確立に使用する完全修飾 InfluxDB API URL。たとえば、http://influxdb-test.com:8086
です"influxdb.token"
: InfluxDB ホストでの認証に使用するトークン。"influxdb.org.id"
: InfluxDB 組織 ID。注釈
コネクターには、データを送信するバケットに対する
--read-bucket
およびwrite-bucket
のアクセス許可が必要です。詳細については、「influx auth create」を参照してください。詳細については、InfluxDB へのデータの書き込み を参照してください。
"influxdb.bucket"
: コネクターからデータを送信するバケット。"tasks.max"
: このコネクターで使用できる タスク の最大数を入力します。タスクが多いほどパフォーマンスが向上する可能性があります。
Single Message Transforms: CLI を使用した SMT の追加の詳細については、Single Message Transforms(SMT) のドキュメントを参照してください。
すべてのプロパティの値と説明については、「構成プロパティ」を参照してください。
ステップ 3: プロパティファイルを読み込み、コネクターを作成します。¶
以下のコマンドを入力して、構成を読み込み、コネクターを起動します。
confluent connect create --config <file-name>.json
例:
confluent connect create --config influxdb2-sink-config.json
出力例:
Created connector InfluxDB2Sink_0 lcc-do6vzd
ステップ 4: コネクターのステータスを確認します。¶
以下のコマンドを入力して、コネクターのステータスを確認します。
confluent connect list
出力例:
ID | Name | Status | Type | Trace
+------------+---------------------------+---------+------+-------+
lcc-do6vzd | InfluxDB2Sink_0 | RUNNING | sink | |
ステップ 5: ファイルを確認します。¶
InfluxDB 2 ホストでデータが生成されていることを確認します。
Confluent Cloud API for Connect の詳細と使用例については、「Confluent Cloud API for Connect」セクションを参照してください。
ちなみに
コネクターを起動すると、デッドレターキューのトピックが自動的に作成されます。詳細については、「Confluent Cloud デッドレターキュー」を参照してください。
構成プロパティ¶
このコネクターでは、以下のコネクター構成プロパティを使用します。
データの取得元とするトピック(Which topics do you want to get data from?)¶
topics
特定のトピック名を指定するか、複数のトピック名をコンマ区切りにしたリストを指定します。
- 型: list
- 重要度: 高
入力メッセージ(Input messages)¶
input.data.format
Kafka 入力レコード値のフォーマットを設定します。指定可能なエントリは、AVRO、JSON_SR、PROTOBUF、またはプレーン JSON です。スキーマベースのメッセージフォーマット(AVRO、JSON_SR、PROTOBUF など)を使用する場合は、Confluent Cloud Schema Registry を構成しておく必要がある点に注意してください。
- 型: string
- 重要度: 高
データへの接続方法(How should we connect to your data?)¶
name
コネクターの名前を設定します。
- 型: string
- 指定可能な値: 最大 64 文字の文字列
- 重要度: 高
Kafka クラスターの認証情報(Kafka Cluster credentials)¶
kafka.auth.mode
Kafka の認証モード。KAFKA_API_KEY または SERVICE_ACCOUNT を指定できます。デフォルトは KAFKA_API_KEY モードです。
- 型: string
- デフォルト: KAFKA_API_KEY
- 指定可能な値: KAFKA_API_KEY、SERVICE_ACCOUNT
- 重要度: 高
kafka.api.key
- 型: password
- 重要度: 高
kafka.service.account.id
Kafka クラスターとの通信用の API キーを生成するために使用されるサービスアカウント。
- 型: string
- 重要度: 高
kafka.api.secret
- 型: password
- 重要度: 高
InfluxDB¶
influxdb.url
接続の確立に使用する完全修飾 InfluxDB API URL。
- 型: string
- 重要度: 高
influxdb.token
influx db での認証に使用するトークン。
- 型: password
- 重要度: 高
influxdb.org.id
組織 ID。
- 型: string
- 重要度: 高
書き込みの構成(Write Configuration)¶
influxdb.bucket
このコネクターからのデータの送信先となるバケット。
- 型: string
- 重要度: 高
write.precision
InfluxDB タイムスタンプの書き込みの精度。指定可能な値は、Seconds、Milliseconds、Microseconds、Nanoseconds です。'event.time.fieldname' を使用してタイムスタンプフィールドを指定せず、Kafka レコードのタイムスタンプが使用される場合、Kafka のタイムスタンプ(ミリ秒)はここで指定された精度に変換されます。ここで精度を指定しない場合は、「event.time.fieldname」で適切な時間の単位を指定する必要があります。
- 型: string
- デフォルト: Milliseconds
- 重要度: 中
event.time.fieldname
InfluxDB データポイントに書き込まれるイベント時刻が含まれている Kafka レコードのフィールドの名前。デフォルトでは(この構成が未指定の場合)、InfluxDB に書き込まれるタイムスタンプは、イベントが処理された時刻に対応する Kafka レコードのタイムスタンプ(Kafka レコードが作成された時刻)です。
- 型: string
- 重要度: 中
measurement.name.format
マップ先 measurement 名のフォーマット制御文字列。マップ元のトピック名を表すプレースホルダーとして「${topic}」を含めることができます。
たとえば、トピック「orders」の
kafka_${topic}
は、measurement 名「kafka_orders」にマップされます。measurement 名のフォーマットが指定されていない場合、コネクターでは、Kafka メッセージ内にある「measurement」フィールドの値が使用されます。対応するフィールドがメッセージ内に存在しない場合、そのメッセージは dlq に送信されます。- 型: string
- 重要度: 中
influxdb.gzip.enable
gzip を有効にするかどうかを指定するフラグ。
- 型: boolean
- デフォルト: false
- 重要度: 低
再試行¶
retry.backoff.ms
再試行まで待機するバックオフ時間。
- 型: int
- デフォルト: 1000(1 秒)
- 重要度: 中
max.retries
エラー時に再試行する最大回数。これを超えるとタスクは失敗します。
- 型: int
- デフォルト: 10
- 重要度: 中
このコネクターのタスク数(Number of tasks for this connector)¶
tasks.max
- 型: int
- 指定可能な値: [1,...]
- 重要度: 高
次のステップ¶
参考
フルマネージド型の Confluent Cloud コネクターが Confluent Cloud ksqlDB でどのように動作するかを示す例については、「Cloud ETL のデモ」を参照してください。この例では、Confluent CLI を使用して Confluent Cloud のリソースを管理する方法についても説明しています。