JDBC Sink Connector 構成プロパティ¶
このコネクターを使用するには、connector.class
構成プロパティでこのコネクタークラスの名前を指定します。
connector.class=io.confluent.connect.jdbc.JdbcSinkConnector
コネクター固有の構成プロパティについて、以降で説明します。
データベース接続のセキュリティ¶
コネクターの構成にセキュリティのパラメーターがないことに気づかれるでしょう。これは、SSL が JDBC 標準に含まれておらず、使用中の JDBC ドライバに依存するからです。通常は、SSL を connection.url
パラメーターで構成する必要があります。たとえば、MySQL では以下のように構成します。
connection.url="jdbc:mysql://127.0.0.1:3306/sample?verifyServerCertificate=false&useSSL=true&requireSSL=true"
サポートおよび構成については、ご使用の JDBC ドライバのドキュメントを確認してください。
接続¶
connection.user
有効な JDBC 接続の取得を再試行する最大回数。値は正の整数でなければなりません。
- 型: int
- デフォルト: 3000
- 重要度: 低
retry.backoff.ms
接続再試行の間のバックオフ時間(ミリ秒)。
- 型: string
- デフォルト: 3000
- 重要度: 低
connection.url
JDBC 接続 URL。
例 :
jdbc:oracle:thin:@localhost:1521:orclpdb1
、jdbc:mysql://localhost/db_name
、jdbc:sqlserver://localhost;instance=SQLEXPRESS;databaseName=db_name
- 型: string
- 重要度: 高
connection.user
JDBC 接続ユーザー。
- 型: string
- デフォルト: null
- 重要度: 高
connection.password
JDBC 接続パスワード。
- 型: password
- デフォルト: null
- 重要度: 高
dialect.name
このコネクターで使用する必要があるデータベース言語の名前。デフォルトでは、これは空です。言語は、JDBC 接続 URL に基づいて、コネクターによって自動的に決定されます。これは、動作をオーバーライドしたり特定の言語を使用したりする必要がある場合に使用します。JDBC 接続プラグインの適切にパッケージ化されたすべての言語を使用できます。
- 型: string
- デフォルト: ""
- 指定可能な値: [、Db2DatabaseDialect、MySqlDatabaseDialect、SybaseDatabaseDialect、GenericDatabaseDialect、OracleDatabaseDialect、SqlServerDatabaseDialect、PostgreSqlDatabaseDialect、SqliteDatabaseDialect、DerbyDatabaseDialect、SapHanaDatabaseDialect、MockDatabaseDialect、VerticaDatabaseDialect]
- 重要度: 低
書き込み¶
insert.mode
使用する挿入モード。
- 型: string
- デフォルト: insert
- 指定可能な値: [insert、upsert、update]
- 重要度: 高
サポートされているモードは以下のとおりです。
insert
標準の SQL
INSERT
ステートメントを使用します。upsert
INSERT OR IGNORE
など、ターゲットデータベースに適切なアップサートセマンティクスを使用します(コネクターでサポートされている場合)。upsert
モードを使用する場合は、コネクター構成にpk.mode
プロパティとpk.fields
プロパティを追加し、定義する必要があります。以下に例を示します。{ ... "pk.mode": "record_value", "pk.fields": "id" ... }
この例の
pk.fields
には、プライマリキーを含める必要があります。update
UPDATE
など、ターゲットデータベースに適したアップデートセマンティクスがコネクターでサポートされている場合は、それを使用します。
batch.size
送信先テーブルへの挿入で、バッチ処理を試みる際のレコード数を指定します(可能な場合)。
- 型: int
- デフォルト: 3000
- 指定可能な値: [0,…]
- 重要度: 中
delete.enabled
null
レコード値を削除として扱うかどうか。pk.mode
をrecord_key
に設定する必要があります。- 型: boolean
- デフォルト: false
- 重要度: 中
データマッピング¶
table.name.format
マップ先テーブル名のフォーマット制御文字列。マップ元のトピック名を表すプレースホルダーとして「${topic}」を含めることができます。
たとえば、トピック「orders」の
kafka_${topic}
は、テーブル名「kafka_orders」にマップされます。- 型: string
- デフォルト: ${topic}
- 重要度: 中
pk.mode
プライマリキーモード。相互作用について
pk.fields
のドキュメントも参照してください。サポートされるモードを以下に示します。none
- キーを使用しません。
kafka
Apache Kafka® 座標を PK として使用します。
重要
一部の JDBC 言語(Oracle 言語や MySQL 言語など)では、
pk.mode
をkafka
に、auto.create
をtrue
に設定した場合に例外が発生する可能性があります。例外が発生するのは、STRING がコネクターにより、固定長の文字列(VARCHAR(256) など)ではなく、可変長の文字列(TEXT など)にマップされるためです。プライマリキーは固定長である必要があります。この例外を回避するには、以下の事項を考慮します。auto.create
をtrue
に設定しないようにします。- データベーステーブルとプライマリキーのデータ型を事前に作成します。
record_key
- レコードキーのフィールドを使用します。これらはプリミティブ型または構造体のどちらかにすることができます。
record_value
- レコード値のフィールドを使用します。これは構造体である必要があります。
- 型: string
- デフォルト: none
- 指定可能な値: [none、kafka、record_key、record_value]
- 重要度: 高
pk.fields
プライマリキーのフィールド名のコンマ区切りのリスト。この構成の実行時の解釈は、
pk.mode
に依存します。none
- このモードでは、プライマリキーとして使用されるフィールドはないため、無視されます。
kafka
- Kafka 座標を表す 3 つの値である必要があります。空の場合はデフォルトで
__connect_topic,__connect_partition,__connect_offset
になります。このモードで設定されたカスタムフィールド名によって、デフォルト列名が変更されますが、Kafka 座標がプライマリキーとして維持されます。 record_key
- 空の場合、キー構造体のすべてのフィールドが利用されます。指定されている場合は、目的のフィールドの抽出に使用されます。プリミティブキーの場合は、単一のフィールド名のみが構成されている必要があります。
record_value
- 空の場合は、値構造体からすべてのフィールドが利用されます。指定されている場合は、目的のフィールドの抽出に使用されます。
- 型: list
- デフォルト: none
- 重要度: 中
fields.whitelist
レコード値フィールド名のコンマ区切りのリスト。空の場合は、レコード値からすべてのフィールドが利用されます。リストを設定した場合は、目的のフィールドのフィルター処理に使用されます。
pk.fields
はどのフィールドで送信先データベースのプライマリキー列を構成するかというコンテキストで独立して適用されますが、この構成はその他の列に適用されることに注意してください。- 型: list
- デフォルト: ""
- 重要度: 中
db.timezone
コネクターで時間ベースの値を挿入する場合に使用する必要がある JDBC タイムゾーンの名前。デフォルトは UTC です。
- 型: string
- デフォルト: "UTC"
- 指定可能な値: 有効な JDK タイムゾーン
- 重要度: 中
DDL サポート¶
auto.create
送信先テーブルが存在しない場合に、
CREATE
を発行して、レコードスキーマに基づくテーブルを自動的に作成するかどうかを指定します。- 型: boolean
- デフォルト: false
- 重要度: 中
重要
- Kafka Connect によってテーブルが自動作成され、ターゲットデータベースに対する効率の低いデータ型が使用されていると、データベースのパフォーマンスが低下する可能性があります。Confluent では、データベース管理者との連携により、使用するデータ型を確認するか、テーブルを読み込む前に事前作成しておくことをお勧めしています。
- 一部の JDBC 言語(Oracle 言語や MySQL 言語など)では、
pk.mode
をkafka
に、auto.create
をtrue
に設定すると例外が発生する可能性があります。例外が発生するのは、STRING がコネクターにより、固定長の文字列(VARCHAR(256) など)ではなく、可変長の文字列(TEXT など)にマッピングされるためです。プライマリキーは固定長である必要があります。この例外を回避するには、以下を考慮してください。auto.create
をtrue
に設定しないようにします。- データベーステーブルとプライマリキーのデータ型を事前に作成します。
auto.evolve
レコードスキーマに関連する列がテーブルスキーマに存在しない場合に、
ALTER
を発行してレコードスキーマに関連する列を自動的に追加するかどうかを指定します。- 型: boolean
- デフォルト: false
- 重要度: 中
quote.sql.identifiers
SQL ステートメントで、テーブル名、列名、その他の識別子をいつクォートするかを指定します。後方互換性のため、デフォルトは
always
となっています。- 型: string
- デフォルト: always
- 重要度: 中
再試行¶
max.retries
エラー時に再試行する最大回数。これを超えるとタスクは失敗します。
- 型: int
- デフォルト: 10
- 指定可能な値: [0,…]
- 重要度: 中
retry.backoff.ms
エラーの後、再試行するまでの待ち時間(ミリ秒)。
- 型: int
- デフォルト: 3000
- 指定可能な値: [0,…]
- 重要度: 中