InfluxDB Source Connector 構成プロパティ¶
このコネクターを使用するには、connector.class
構成プロパティで、コネクタークラスの名前を指定します。
connector.class=io.confluent.influxdb.source.InfluxdbSourceConnector
コネクター固有の構成プロパティについて、以降で説明します。
InfluxDB¶
influxdb.url
接続を確立するための完全修飾 InfluxDB API URL。
- 型: string
- 指定可能な値: 完全修飾 HTTP URL
- 重要度: 高
influxdb.username
database 接続を確立する必要がある InfluxDB ユーザー名。
- 型: string
- デフォルト: null
- 重要度: 中
influxdb.password
database 接続を確立するために使用する InfluxDB ユーザーのパスワード。
- 型: password
- デフォルト: [hidden]
- 重要度: 中
topic.prefix
データのパブリッシュ先である Apache Kafka® トピックの名前(カスタムクエリの場合には Kafka トピックのフルネーム)を指定するために measurement 名の先頭に付けるプレフィックス。
- 型: string
- デフォルト: ""
- 重要度: 低
topic.mapper
トピックのマップ方法を決定する構成。サポートされるオプションを以下に示します。
db
- トピック名はトピックのプレフィックス + database 名です。同じ database のレコードはすべて同じトピックになります。
measurement
またはseries
- トピック名はトピックのプレフィックス + measurement 名です。同じ measurement のレコードはすべて同じトピックになります。
- 型: string
- デフォルト: db
- 指定可能な値: [db、measurement、series]
- 重要度: 低
influxdb.db
レコードの読み取り元の InfluxDB database 名で、データを構成済み Apache Kafka® トピックにパブリッシュします。
- 型: string
- 重要度: 高
モード¶
mode
InfluxDB の measurement をポーリングする場合のモード。サポートされるモードを以下に示します。
bulk
- ポーリングのたびに目的の Kafka トピックに measurement 全体を一括読み込みします。
timestamp
- timestamp を使用して、新しく作成された行を検出し、目的の Kafka トピックに書き込みます。
- 型: string
- デフォルト: timestamp
- 指定可能な値: [timestamp、bulk]
- 重要度: 中
クエリ¶
query
指定した場合、このクエリが実行され、結果のレコードが目的の Kafka トピックにプッシュされます。フィールドまたはタグのサブセットを選択したり、集約を実行したり、データをフィルターしたりする必要がある場合は、この設定を使用します。
mode=bulk
の場合、コネクターはポーリングのたびにクエリをそのまま実行します。mode=timestamp
を使用する場合、このコネクターはオフセットからの適切なタイムスタンプ値を使用して(オフセットが存在する場合)、ポーリングするたびに新しいレコードをフェッチします。オフセットがない場合は、タイムスタンプ1970-01-01T00:00:00.000Z
からデータのフェッチを開始します。- 型: string
- デフォルト: null
- 重要度: 低
batch.size
新しいデータのポーリング時に単一のバッチに含める point の最大数。この設定を使用して、コネクターの内部にバッファリングするデータの量を制限できます。
- 型: int
- デフォルト: 5000
- 重要度: 低
timestamp.delay.interval.ms
特定の timestamp を持つレコードが出現してからそれを結果に含めるまで待機する時間。より早い timestamp を持つトランザクションが完了できるように、遅延を追加できます。最初の実行では、利用可能なすべてのレコードを(つまり、timestamp 0 から)、現在の時刻から遅延を引いた時刻までフェッチします。その後の各実行では、最後のフェッチ時刻から、現在の時刻から遅延を引いた時刻までデータを取得します。
- 型: long
- デフォルト: 0
- 重要度: 高
再試行¶
max.retries
エラー時に再試行する最大回数。これを超えるとタスクは失敗します。
- 型: int
- デフォルト: 10
- 指定可能な値: [0,…]
- 重要度: 中
retry.backoff.ms
再試行まで待機するバックオフ時間(ミリ秒)。
- 型: int
- デフォルト: 1000
- 指定可能な値: [0,…]
- 重要度: 中
ホワイトリストとブラックリスト¶
influxdb.measurement.whitelist
コピーに含める measurement のコンマ区切りリスト。指定した場合は、
influxdb.measurement.blacklist
を設定できません。空のままにすると、すべての measurement が含まれます。- 型: string
- デフォルト: ""
- 重要度: 中
influxdb.measurement.blacklist
コピーから除外する measurement のコンマ区切りリスト。指定した場合は、
influxdb.measurement.whitelist
を設定できません。- 型: string
- デフォルト: ""
- 重要度: 中
トピックの自動作成¶
トピックの自動作成の詳細については、「ソースコネクターのトピックの自動作成の構成 」を参照してください。
注釈
構成プロパティには、Java regex として定義されている正規表現(regex)を使用できます。
topic.creation.groups
一致するトピックにグループ別のトピック構成を定義するために使用するグループ別名のリスト。
default
グループは常に存在し、すべてのトピックに一致します。- 型: String 型のリスト
- デフォルト: 空
- 指定可能な値: このプロパティの値は、すべての追加グループを参照します。トピックの構成には常に
default
グループが定義されています。
topic.creation.$alias.replication.factor
ココネクターで作成する新規トピックのレプリケーション係数。この値は、Kafka クラスターのブローカーの数を超えてはなりません。この値が Kafka ブローカーの数よりも大きい場合、コネクターがトピックの生成を試行するとエラーが発生します。
default
グループの場合、これは 必須のプロパティ です。topic.creation.groups
で定義されている他のグループの場合、このプロパティは任意です。他のグループは、Kafka ブローカーのデフォルト値を使用します。- 型: int
- デフォルト: なし
- 指定可能な値 : 具体的な有効値を指定する場合は
>= 1
で指定し、Kafka ブローカーのデフォルト値を使用する場合は-1
を指定します。
topic.creation.$alias.partitions
このコネクターで作成するトピックのパーティション数。
default
グループの場合、これは 必須のプロパティ です。topic.creation.groups
で定義されている他のグループの場合、このプロパティは任意です。他のグループは、Kafka ブローカーのデフォルト値を使用します。- 型: int
- デフォルト: なし
- 指定可能な値 : 具体的な有効値を指定する場合は
>= 1
で指定し、Kafka ブローカーのデフォルト値を使用する場合は-1
を指定します。
topic.creation.$alias.include
トピック名に一致する正規表現を表す文字列のリスト。このリストは、一致する値のトピックを組み込み、このグループの特定の構成を一致するトピックに適用するために使用します。
topic.creation.groups
で定義されているすべてのグループに、$alias
が適用されます。default
グループには、このプロパティは適用されません。- 型: String 型のリスト
- デフォルト: 空
- 指定可能な値: 正確なトピック名、または正規表現のコンマ区切りのリスト。
topic.creation.$alias.exclude
トピック名に一致する正規表現を表す文字列のリスト。このリストは、一致する値のトピックを、グループの特定の構成の適用から除外するために使用します。
topic.creation.groups
で定義されているすべてのグループに、$alias
が適用されます。default
グループには、このプロパティは適用されません。トピックの除外ルールは、すべての組み込みルールをオーバーライドすることに注意してください。- 型: String 型のリスト
- デフォルト: 空
- 指定可能な値: 正確なトピック名、または正規表現のコンマ区切りのリスト。
topic.creation.$alias.${kafkaTopicSpecificConfigName}
レコードの書き込み先の Kafka ブローカーのバージョンに対するすべての ブローカー構成の動的な変更 。ルールに対して構成が指定されていない場合、ブローカーのトピックレベルの構成値が使用されます。
default
グループ、およびtopic.creation.groups
で定義されているすべてのグループに、$alias
が適用されます。- 型: プロパティ値
- デフォルト : Kafka ブローカー値
Confluent Platform ライセンス¶
confluent.topic.bootstrap.servers
ライセンス供与に使用される Kafka クラスターとの初期接続を確立するために使用するホストとポートのペアのリスト。初期接続で、クラスター内のすべてのサーバーが検出されます。このリストは、
host1:port1,host2:port2,...
という形式にする必要があります。これらのサーバーは、初期接続ですべてのクラスターメンバーシップを検出するためにのみ使用されます。これは動的に変わる可能性があるので、このリストにすべてのサーバーセットを含める必要はありません(ただし、サーバーの障害に備えて、複数指定しておくこともできます)。- 型: list
- 重要度: 高
confluent.topic
Confluent Platform の構成で使用される Kafka のトピックの名前。ライセンス情報を含みます。
- 型: string
- デフォルト: _confluent-command
- 重要度: 低
confluent.topic.replication.factor
Confluent Platform の構成(ライセンス情報など)で使用される Kafka のトピックのレプリケーション係数。これは、トピックがまだ存在しない場合にのみ使用されます。デフォルトの 3 は、本稼働環境での使用に適しています。ブローカー数が 3 未満の開発環境で使用する場合は、そのブローカー数をこのプロパティに設定する必要があります(多くの場合、1 です)。
- 型: int
- デフォルト: 3
- 重要度: 低
Confluent ライセンスのプロパティ¶
ちなみに
コネクターの構成にライセンス関連のプロパティを含めることができますが、Confluent Platform バージョン 6.0 以降では、各コネクターの構成ではなく、Connect ワーカーの構成 でライセンス関連のプロパティを設定できるようになりました。
注釈
このコネクターはプロプライエタリであり、ライセンスが必要です。ライセンス情報は、_confluent-command
トピックに保管されています。ブローカーが SSL 接続を必要とする場合は、以下で説明するセキュリティ関連の confluent.topic.*
プロパティを含める必要があります。
confluent.license
Confluent では、各契約者に Confluent エンタープライズライセンスキーを発行します。ライセンスキーはテキストであるため、コピーアンドペーストで
confluent.license
の値として使用できます。試用ライセンスでは、コネクターを 30 日間使用できます。開発者ライセンスでは、ブローカーが 1 つの開発環境で、コネクターを無期限に使用できます。既にご契約されている場合は、詳細について Confluent サポートにお問い合わせください。
- 型: string
- デフォルト: ""
- 指定可能な値: Confluent Platform ライセンス
- 重要度: 高
confluent.topic.ssl.truststore.location
トラストストアファイルの場所。
- 型: string
- デフォルト: null
- 重要度: 高
confluent.topic.ssl.truststore.password
トラストストアファイルのパスワード。パスワードが設定されていなくてもトラストストアにアクセスできますが、整合性チェックが無効になります。
- 型: password
- デフォルト: null
- 重要度: 高
confluent.topic.ssl.keystore.location
キーストアファイルの場所。クライアントでは省略可能です。クライアントの相互認証に使用できます。
- 型: string
- デフォルト: null
- 重要度: 高
confluent.topic.ssl.keystore.password
キーストアファイルのストアパスワード。クライアントでは省略可能です。ssl.keystore.location を構成した場合にのみ必要となります。
- 型: password
- デフォルト: null
- 重要度: 高
confluent.topic.ssl.key.password
キーストアファイル内のプライベートキーのパスワード。クライアントでは省略可能です。
- 型: password
- デフォルト: null
- 重要度: 高
confluent.topic.security.protocol
ブローカーとの通信に使用されるプロトコル。指定可能な値は、PLAINTEXT、SSL、SASL_PLAINTEXT、SASL_SSL です。
- 型: string
- デフォルト: "PLAINTEXT"
- 重要度: 中
ライセンストピックの構成¶
Confluent エンタープライズライセンスは、_confluent-command
トピックに保管されています。このトピックは、デフォルトで作成され、confluent.license
プロパティで指定されたライセンスキーに対応するライセンスが含まれます。
注釈
パブリックキーは Kafka のトピックには保管されません。
以下では、デフォルトの _confluent-command
トピックがさまざまなシナリオでどのように生成されるかを説明します。
confluent.license
プロパティを追加していない場合、またはこのプロパティを(confluent.license=
などで)空にした場合は、_confluent command
トピックに、30 日間の試用ライセンスが自動的に生成されます。- 有効なライセンスキーを(
confluent.license=<valid-license-key>
などで)追加すると、有効なライセンスが_confluent-command
トピックに追加されます。
以下は、開発およびテストのための最小限のプロパティの例です。
_confluent-command
トピックの名前は confluent.topic
プロパティを使用して変更できます(たとえば、環境に厳格な命名規則がある場合など)。以下の例は、この変更と、構成された Kafka ブートストラップサーバーを示しています。
confluent.topic=foo_confluent-command
confluent.topic.bootstrap.servers=localhost:9092
上の例は、開発およびテストで使用できるブートストラップサーバーの必要最小限のプロパティを示しています。本稼働環境の場合は、プレフィックスとして confluent.topic.
を付けて、通常のプロデューサー、コンシューマー、およびトピックの各構成プロパティをコネクターのプロパティに追加します。
ライセンストピックの ACL¶
_confluent-command
トピックには、confluent.license
プロパティで指定されたライセンスキーに対応するライセンスが含まれます。このトピックはデフォルトで作成されます。このトピックにアクセスするコネクターには、以下の ACL を構成する必要があります。
- コネクターでトピックの作成が必要な場合は、リソースクラスターでの CREATE および DESCRIBE。
_confluent-command
トピックでの DESCRIBE、READ、および WRITE。
ライセンスを使用する各プリンシパルに個別にアクセス許可を付与することができます。または、 ワイルドカード入力 を使用すると、すべてのクライアントを許可できます。以下の例は、リソースクラスターおよび _confluent-command
トピックの ACL を構成するために使用できるコマンドを示しています。
リソースクラスターの CREATE および DESCRIBE の ACL を設定します。
kafka-acls --bootstrap-server localhost:9092 --command-config adminclient-configs.conf \ --add --allow-principal User:<principal> \ --operation CREATE --operation DESCRIBE --cluster
_confluent-command
トピックの DESCRIBE、READ、および WRITE の ACL を設定します。kafka-acls --bootstrap-server localhost:9092 --command-config adminclient-configs.conf \ --add --allow-principal User:<principal> \ --operation DESCRIBE --operation READ --operation WRITE --topic _confluent-command
デフォルトの構成プロパティのオーバーライド¶
confluent.topic.replication.factor
を使用することにより、レプリケーション係数をオーバーライドできます。たとえば、(開発およびテスト用の)ブローカー数 3 未満の環境で送信先として Kafka クラスターを使用する場合、confluent.topic.replication.factor
プロパティに 1
を設定する必要があります。
プロデューサー固有のプロパティは、confluent.topic.producer.
プレフィックスを使用することによりオーバーライドできます。コンシューマー固有のプロパティは、confluent.topic.consumer.
プレフィックスを使用することによりオーバーライドできます。
デフォルト値を使用することも、他のプロパティをカスタマイズすることもできます。たとえば、confluent.topic.client.id
プロパティのデフォルトは、コネクターの名前に -licensing
サフィックスを付けたものです。クライアント接続に SSL または SASL を必要とするブローカーの構成設定では、このプレフィックスを使用します。
トピックのクリーンアップのポリシーはオーバーライドできません。トピックは、常に 1 パーティションであり、圧縮されるからです。また、このプレフィックスを使用してシリアライザーおよびデシリアライザーを指定しないでください。追加されても無視されます。