Vertica Sink Connector 構成プロパティ¶
このコネクターを使用するには、connector.class
構成プロパティでコネクタークラスの名前を指定します。
connector.class=io.confluent.vertica.VerticaSinkConnector
コネクター固有の構成プロパティについて、以降で説明します。
接続¶
vertica.database
Vertica システム上のデータベース。これは、JDBC URL のビルドに使用されます。
- 型: string
- 重要度: 高
vertica.host
接続先 Vertica ホスト。これは、JDBC URL のビルドに使用されます。
- 型: string
- 重要度: 高
vertica.password
Vertica への認証で使用するパスワード。
- 型: password
- 重要度: 高
vertica.username
Vertica への認証で使用するユーザー名。
- 型: string
- 重要度: 高
vertica.port
接続先 Vertica ポート。JDBC URL のビルドに使用されます。
- 型: int
- デフォルト: 5433
- 指定可能な値: ValidPort{start=1025、end=65535}
- 重要度: 中
max.hikari.connection.pool.size
HikariCp プールに含まれる接続の最大数。
- 型: int
- デフォルト: 10
- 指定可能な値: [1,…]
- 重要度: 中
書き込み¶
stream.builder.cache.ms
テーブル構造の定義に使用されるストリームビルダーオブジェクトがキャッシュされる時間(ミリ秒)。
- 型: int
- デフォルト: 300000
- 指定可能な値: [1000,…,2147483647]
- 重要度: 高
vertica.buffer.size.bytes
Vertica Copy Stream によって使用される入力ストリームのバッファ。
- 型: int
- デフォルト: 1048576
- 重要度: 高
vertica.timeout.ms
Vertica への書き込み完了に対するタイムアウト。
- 型: int
- デフォルト: 60000
- 指定可能な値: [10000,…,2147483647]
- 重要度: 高
vertica.compression
データの読み込みに使用される圧縮のタイプ。
- 型: string
- デフォルト: UNCOMPRESSED
- 指定可能な値:
UNCOMPRESSED
、BZIP
、GZIP
、LZO
- 重要度: 中
rejected.record.logging.mode
Vertica によってレコードが拒否された場合のログ記録モード。以下のいずれかに構成する必要があります。
log
- 拒否されたレコードが記録されます。Connect ログで確認できます。
file
- 構成されたファイルに、拒否されたレコードと例外が書き込まれます。
rejected.record.path
とrejected.record.exception.path
を所定のファイルパスに構成する必要があります。 table
- Vertica テーブルの拒否されたレコードと例外がそれぞれ、同じテーブル名の末尾に対応する
rejected.record.table.suffix
値が付加された拒否テーブルに書き込まれます。
- 型: string
- デフォルト: log
- 指定可能な値: [log、file、table]
- 重要度: 中
rejected.record.path
Vertica によって拒否されたレコードが格納されるローカルディレクトリパス。この構成が必要となるのは、
rejected.record.logging.mode
がFILE
に設定された場合のみです。- 型: string
- デフォルト: ""
- 重要度: 中
rejected.record.exception.path
Vertica によって拒否されたレコードの例外が格納されるローカルディレクトリパス。この構成が必要となるのは、
rejected.record.logging.mode
がFILE
に設定された場合のみです。- 型: string
- デフォルト: ""
- 重要度: 中
rejected.record.table.suffix
Vertica によって拒否されたレコードのテーブル名に使用されるサフィックス値。この構成を使用すると、任意のサフィックスを日付フォーマットのプレースホルダーと組み合わせて、たとえば
_rejected_${yyyyddMM}
のように指定できます。プレースホルダーは、指定された日付フォーマットによる現在のタイムスタンプに置き換えられ、テーブル名の末尾に付加されます。デフォルトで、この構成の値は_rejected
で、これが Vertica テーブル名にサフィックスとして追加されます。この構成が必要となるのは、rejected.record.logging.mode
がTABLE
に設定された場合のみです。- 型: string
- デフォルト: _rejected
- 重要度: 中
rejected.record.table.schema
Vertica によって拒否されたレコードのテーブル名に使用されるスキーマ名。この構成が必要となるのは、
rejected.record.logging.mode
がTABLE
に設定された場合のみです。- 型: string
- デフォルト: ""
- 重要度: 中
vertica.load.method
データの読み込み方式です。
- 型: string
- デフォルト: AUTO
- 指定可能な値:
AUTO
、DIRECT
、TRICKLE
- 重要度: 中
expected.records
コネクターによって毎回処理されるレコードの想定数。
- 型: int
- デフォルト: 10000
- 重要度: 低
expected.topics
コネクターによって 1 回のポーリングで処理されるトピックの想定数。
- 型: int
- デフォルト: 500
- 重要度: 低
delete.enabled
tombstone レコードの場合、すなわち Kafka のレコード値が null だった場合に削除リクエストを処理する場合は、この値を
true
に設定します。削除リクエストが Kafka のレコードのキーに存在するフィールドに基づいて処理されるようにする場合は、加えてpk.mode
構成をrecord_key
に設定します。- 型: boolean
- デフォルト: false
- 重要度: 中
table.name.format
送信先スキーマテーブル名のフォーマット制御文字列。送信元のトピック名を表すプレースホルダーとして
${topic}
を含めることができます。たとえば、トピック
orders
のkafka_${topic}
は、テーブル名kafka_orders
とデフォルトスキーマ名public
にマップされます。これをスキーマ名の構成に使用することもできます。たとえば、トピックorders
のkafka_schema.kafka_${topic}
は、テーブル名kafka_orders
とデフォルトスキーマ名kafka_schema
にマップされます。- 型: string
- デフォルト: ${topic}
- 重要度: 中
DDL サポート¶
auto.create
CREATE
を発行して、存在しない送信先テーブルを(レコードスキーマに基づき)自動的に作成するかどうかを指定します。- 型: boolean
- デフォルト: false
- 重要度: 中
auto.evolve
ALTER
を発行して、レコードスキーマに照らして存在しない列をテーブルスキーマに自動的に追加するかどうかを指定します。- 型: boolean
- デフォルト: false
- 重要度: 中
データマッピング¶
pk.mode
プライマリキーモード。相互作用について
pk.fields
の説明も参照してください。サポートされるモードを以下に示します。none
- キーを使用しません。
kafka
- Kafka 座標を PK として使用します。
record_key
- レコードキーのフィールドを使用します。これらはプリミティブ型または構造体のどちらかにすることができます。
record_value
- レコード値のフィールドを使用します。これは構造体である必要があります。
- 型: string
- デフォルト: none
- 指定可能な値: [none、kafka、record_key、record_value]
- 重要度: 高
pk.fields
プライマリキーのフィールド名のコンマ区切りのリスト。この構成の実行時の解釈は、
pk.mode
に依存します。none
- このモードでは、プライマリキーとして使用されるフィールドはないため、無視されます。
kafka
- Kafka 座標を表す 3 つの値である必要があります。空の場合はデフォルトで
__connect_topic,__connect_partition,__connect_offset
になります。 record_key
- 空の場合、キー構造体からすべてのフィールドが利用されます。そうでない場合は、目的のフィールドの抽出に使用されます。プリミティブキーの場合は、単一のフィールド名のみが構成されている必要があります。
record_value
- 空の場合は、値構造体からすべてのフィールドが利用されます。そうでない場合は、目的のフィールドの抽出に使用されます。
- 型: list
- デフォルト: ""
- 重要度: 中
Confluent Platform ライセンス¶
confluent.topic.bootstrap.servers
ライセンス供与に使用される Kafka クラスターとの初期接続を確立するために使用するホストとポートのペアのリスト。初期接続で、クラスター内のすべてのサーバーが検出されます。このリストは、
host1:port1,host2:port2,…
という形式にする必要があります。これらのサーバーは、初期接続ですべてのクラスターメンバーシップを検出するためにのみ使用されます。これは動的に変わる可能性があるので、このリストにすべてのサーバーセットを含める必要はありません(ただし、サーバーの障害に備えて、複数指定しておくこともできます)。- 型: list
- 重要度: 高
confluent.topic
Confluent Platform の構成(ライセンス情報など)で使用される Kafka のトピックの名前。
- 型: string
- デフォルト: _confluent-command
- 重要度: 低
confluent.topic.replication.factor
Confluent Platform の構成(ライセンス情報など)で使用される Kafka のトピックのレプリケーション係数。これは、トピックがまだ存在しない場合にのみ使用されます。デフォルトの 3 は、本稼働環境での使用に適しています。ブローカー数が 3 未満の開発環境で使用する場合は、そのブローカー数をこのプロパティに設定する必要があります(多くの場合、1 です)。
- 型: int
- デフォルト: 3
- 重要度: 低
Confluent ライセンスのプロパティ¶
ちなみに
コネクターの構成にライセンス関連のプロパティを含めることができますが、Confluent Platform バージョン 6.0 以降では、各コネクターの構成ではなく、Connect ワーカーの構成 でライセンス関連のプロパティを設定できるようになりました。
注釈
このコネクターはプロプライエタリであり、ライセンスが必要です。ライセンス情報は、_confluent-command
トピックに保管されています。ブローカーが SSL 接続を必要とする場合は、以下で説明するセキュリティ関連の confluent.topic.*
プロパティを含める必要があります。
confluent.license
Confluent では、各契約者にエンタープライズライセンスキーを発行します。ライセンスキーはテキストであるため、コピーアンドペーストで
confluent.license
の値として使用できます。試用ライセンスでは、コネクターを 30 日間使用できます。開発者ライセンスでは、ブローカーが 1 つの開発環境で、コネクターを無期限に使用できます。既にご契約されている場合は、詳細について Confluent サポートにお問い合わせください。
- 型: string
- デフォルト: ""
- 指定可能な値: Confluent Platform ライセンス
- 重要度: 高
confluent.topic.ssl.truststore.location
トラストストアファイルの場所。
- 型: string
- デフォルト: null
- 重要度: 高
confluent.topic.ssl.truststore.password
トラストストアファイルのパスワード。パスワードが設定されていなくてもトラストストアにアクセスできますが、整合性チェックが無効になります。
- 型: password
- デフォルト: null
- 重要度: 高
confluent.topic.ssl.keystore.location
キーストアファイルの場所。クライアントでは省略可能です。クライアントの相互認証に使用できます。
- 型: string
- デフォルト: null
- 重要度: 高
confluent.topic.ssl.keystore.password
キーストアファイルのストアパスワード。クライアントでは省略可能です。ssl.keystore.location を構成した場合にのみ必要となります。
- 型: password
- デフォルト: null
- 重要度: 高
confluent.topic.ssl.key.password
キーストアファイル内のプライベートキーのパスワード。クライアントでは省略可能です。
- 型: password
- デフォルト: null
- 重要度: 高
confluent.topic.security.protocol
ブローカーとの通信に使用されるプロトコル。指定可能な値は、PLAINTEXT、SSL、SASL_PLAINTEXT、SASL_SSL です。
- 型: string
- デフォルト: "PLAINTEXT"
- 重要度: 中
ライセンストピックの構成¶
Confluent エンタープライズライセンスは、_confluent-command
トピックに格納されます。このトピックは、デフォルトで作成され、confluent.license
プロパティで指定されたライセンスキーに対応するライセンスが格納されます。
注釈
パブリックキーは Kafka のトピックには保管されません。
以下では、デフォルトの _confluent-command
トピックがさまざまなシナリオでどのように生成されるかを説明します。
confluent.license
プロパティを追加していない場合、またはこのプロパティを(confluent.license=
などで)空にした場合は、_confluent command
トピックに、30 日間の試用ライセンスが自動的に生成されます。- 有効なライセンスキーを(
confluent.license=<valid-license-key>
などで)追加すると、有効なライセンスが_confluent-command
トピックに追加されます。
以下は、開発およびテストのための最小限のプロパティの例です。
_confluent-command
トピックの名前は confluent.topic
プロパティを使用して変更できます(たとえば、環境に厳格な命名規則がある場合など)。以下の例は、この変更と、構成される Kafka ブートストラップサーバーを示しています。
confluent.topic=foo_confluent-command
confluent.topic.bootstrap.servers=localhost:9092
上の例は、開発およびテストで使用できるブートストラップサーバーの必要最小限のプロパティを示しています。本稼働環境の場合は、プレフィックスとして confluent.topic.
を付けて、通常のプロデューサー、コンシューマー、およびトピックの各構成プロパティをコネクターのプロパティに追加します。
ライセンストピックの ACL¶
_confluent-command
トピックには、confluent.license
プロパティで指定されたライセンスキーに対応するライセンスが格納されます。このトピックはデフォルトで作成されます。このトピックにアクセスするコネクターには、以下の ACL を構成する必要があります。
- コネクターでトピックの作成が必要な場合は、リソースクラスターでの CREATE および DESCRIBE。
_confluent-command
トピックでの DESCRIBE、READ、および WRITE。
ライセンスを使用する各プリンシパルに個々にアクセス許可を付与することも、 ワイルドカード入力 を使用してすべてのクライアントを許可することもできます。以下の例は、リソースクラスターおよび _confluent-command
トピックの ACL を構成するために使用できるコマンドを示しています。
リソースクラスターの CREATE および DESCRIBE の ACL を設定します。
kafka-acls --bootstrap-server localhost:9092 --command-config adminclient-configs.conf \ --add --allow-principal User:<principal> \ --operation CREATE --operation DESCRIBE --cluster
_confluent-command
トピックでの DESCRIBE、READ、および WRITE の ACL を設定します。kafka-acls --bootstrap-server localhost:9092 --command-config adminclient-configs.conf \ --add --allow-principal User:<principal> \ --operation DESCRIBE --operation READ --operation WRITE --topic _confluent-command
デフォルトの構成プロパティのオーバーライド¶
confluent.topic.replication.factor
を使用することにより、レプリケーション係数をオーバーライドできます。たとえば、(開発およびテスト用の)ブローカー数 3 未満の環境で送信先として Kafka クラスターを使用する場合、confluent.topic.replication.factor
プロパティに 1
を設定する必要があります。
プロデューサー固有のプロパティは、confluent.topic.producer.
プレフィックスを使用してオーバーライドできます。コンシューマー固有のプロパティは、confluent.topic.consumer.
プレフィックスを使用してオーバーライドできます。
デフォルト値を使用することも、他のプロパティをカスタマイズすることもできます。たとえば、confluent.topic.client.id
プロパティのデフォルトは、コネクターの名前に -licensing
サフィックスを付けたものです。クライアント接続に SSL または SASL を必要とするブローカーの構成設定では、このプレフィックスを使用できます。
トピックのクリーンアップのポリシーはオーバーライドできません。トピックは、常にパーティションが単一で、圧縮されるからです。また、このプレフィックスを使用してシリアライザーおよびデシリアライザーを指定しないでください。追加しても無視されます。