JDBC Sink Connector 構成プロパティ

このコネクターを使用するには、connector.class 構成プロパティでこのコネクタークラスの名前を指定します。

connector.class=io.confluent.connect.jdbc.JdbcSinkConnector

コネクター固有の構成プロパティについて、以降で説明します。

データベース接続のセキュリティ

コネクターの構成にセキュリティのパラメーターがないことに気づかれるでしょう。これは、SSL が JDBC 標準に含まれておらず、使用中の JDBC ドライバに依存するからです。通常は、SSL を connection.url パラメーターで構成する必要があります。たとえば、MySQL では以下のように構成します。

connection.url="jdbc:mysql://127.0.0.1:3306/sample?verifyServerCertificate=false&useSSL=true&requireSSL=true"

サポートおよび構成については、ご使用の JDBC ドライバのドキュメントを確認してください。

接続

connection.user

有効な JDBC 接続の取得を再試行する最大回数。値は正の整数でなければなりません。

  • 型: int
  • デフォルト: 3000
  • 重要度: 低
retry.backoff.ms

接続再試行の間のバックオフ時間(ミリ秒)。

  • 型: string
  • デフォルト: 3000
  • 重要度: 低
connection.url

JDBC 接続 URL。

例 : jdbc:oracle:thin:@localhost:1521:orclpdb1jdbc:mysql://localhost/db_namejdbc:sqlserver://localhost;instance=SQLEXPRESS;databaseName=db_name

  • 型: string
  • 重要度: 高
connection.user

JDBC 接続ユーザー。

  • 型: string
  • デフォルト: null
  • 重要度: 高
connection.password

JDBC 接続パスワード。

  • 型: password
  • デフォルト: null
  • 重要度: 高
dialect.name

このコネクターで使用する必要があるデータベース言語の名前。デフォルトでは、これは空です。言語は、JDBC 接続 URL に基づいて、コネクターによって自動的に決定されます。これは、動作をオーバーライドしたり特定の言語を使用したりする必要がある場合に使用します。JDBC 接続プラグインの適切にパッケージ化されたすべての言語を使用できます。

  • 型: string
  • デフォルト: ""
  • 指定可能な値: [、Db2DatabaseDialect、MySqlDatabaseDialect、SybaseDatabaseDialect、GenericDatabaseDialect、OracleDatabaseDialect、SqlServerDatabaseDialect、PostgreSqlDatabaseDialect、SqliteDatabaseDialect、DerbyDatabaseDialect、SapHanaDatabaseDialect、MockDatabaseDialect、VerticaDatabaseDialect]
  • 重要度: 低

書き込み

insert.mode

使用する挿入モード。

  • 型: string
  • デフォルト: insert
  • 指定可能な値: [insert、upsert、update]
  • 重要度: 高

サポートされているモードは以下のとおりです。

  • insert

    標準の SQL INSERT ステートメントを使用します。

  • upsert

    INSERT OR IGNORE など、ターゲットデータベースに適切なアップサートセマンティクスを使用します(コネクターでサポートされている場合)。upsert モードを使用する場合は、コネクター構成に pk.mode プロパティと pk.fields プロパティを追加し、定義する必要があります。以下に例を示します。

    {
    
         ...
    
         "pk.mode": "record_value",
         "pk.fields": "id"
    
         ...
    
     }
    

    この例の pk.fields には、プライマリキーを含める必要があります。

  • update

    UPDATE など、ターゲットデータベースに適したアップデートセマンティクスがコネクターでサポートされている場合は、それを使用します。

batch.size

送信先テーブルへの挿入で、バッチ処理を試みる際のレコード数を指定します(可能な場合)。

  • 型: int
  • デフォルト: 3000
  • 指定可能な値: [0,…]
  • 重要度: 中
delete.enabled

null レコード値を削除として扱うかどうか。pk.moderecord_key に設定する必要があります。

  • 型: boolean
  • デフォルト: false
  • 重要度: 中

データマッピング

table.name.format

マップ先テーブル名のフォーマット制御文字列。マップ元のトピック名を表すプレースホルダーとして「${topic}」を含めることができます。

たとえば、トピック「orders」の kafka_${topic} は、テーブル名「kafka_orders」にマップされます。

  • 型: string
  • デフォルト: ${topic}
  • 重要度: 中
pk.mode

プライマリキーモード。相互作用について pk.fields のドキュメントも参照してください。サポートされるモードを以下に示します。

none
キーを使用しません。
kafka

Apache Kafka® 座標を PK として使用します。

重要

一部の JDBC 言語(Oracle 言語や MySQL 言語など)では、pk.modekafka に、auto.createtrue に設定した場合に例外が発生する可能性があります。例外が発生するのは、STRING がコネクターにより、固定長の文字列(VARCHAR(256) など)ではなく、可変長の文字列(TEXT など)にマップされるためです。プライマリキーは固定長である必要があります。この例外を回避するには、以下の事項を考慮します。

  • auto.createtrue に設定しないようにします。
  • データベーステーブルとプライマリキーのデータ型を事前に作成します。
record_key
レコードキーのフィールドを使用します。これらはプリミティブ型または構造体のどちらかにすることができます。
record_value
レコード値のフィールドを使用します。これは構造体である必要があります。
  • 型: string
  • デフォルト: none
  • 指定可能な値: [none、kafka、record_key、record_value]
  • 重要度: 高
pk.fields

プライマリキーのフィールド名のコンマ区切りのリスト。この構成の実行時の解釈は、pk.mode に依存します。

none
このモードでは、プライマリキーとして使用されるフィールドはないため、無視されます。
kafka
Kafka 座標を表す 3 つの値である必要があります。空の場合はデフォルトで __connect_topic,__connect_partition,__connect_offset になります。このモードで設定されたカスタムフィールド名によって、デフォルト列名が変更されますが、Kafka 座標がプライマリキーとして維持されます。
record_key
空の場合、キー構造体のすべてのフィールドが利用されます。指定されている場合は、目的のフィールドの抽出に使用されます。プリミティブキーの場合は、単一のフィールド名のみが構成されている必要があります。
record_value
空の場合は、値構造体からすべてのフィールドが利用されます。指定されている場合は、目的のフィールドの抽出に使用されます。
  • 型: list
  • デフォルト: none
  • 重要度: 中
fields.whitelist

レコード値フィールド名のコンマ区切りのリスト。空の場合は、レコード値からすべてのフィールドが利用されます。リストを設定した場合は、目的のフィールドのフィルター処理に使用されます。

pk.fields はどのフィールドで送信先データベースのプライマリキー列を構成するかというコンテキストで独立して適用されますが、この構成はその他の列に適用されることに注意してください。

  • 型: list
  • デフォルト: ""
  • 重要度: 中
db.timezone

コネクターで時間ベースの値を挿入する場合に使用する必要がある JDBC タイムゾーンの名前。デフォルトは UTC です。

  • 型: string
  • デフォルト: "UTC"
  • 指定可能な値: 有効な JDK タイムゾーン
  • 重要度: 中

DDL サポート

auto.create

送信先テーブルが存在しない場合に、CREATE を発行して、レコードスキーマに基づくテーブルを自動的に作成するかどうかを指定します。

  • 型: boolean
  • デフォルト: false
  • 重要度: 中

重要

  • Kafka Connect によってテーブルが自動作成され、ターゲットデータベースに対する効率の低いデータ型が使用されていると、データベースのパフォーマンスが低下する可能性があります。Confluent では、データベース管理者との連携により、使用するデータ型を確認するか、テーブルを読み込む前に事前作成しておくことをお勧めしています。
  • 一部の JDBC 言語(Oracle 言語や MySQL 言語など)では、pk.modekafka に、auto.createtrue に設定すると例外が発生する可能性があります。例外が発生するのは、STRING がコネクターにより、固定長の文字列(VARCHAR(256) など)ではなく、可変長の文字列(TEXT など)にマッピングされるためです。プライマリキーは固定長である必要があります。この例外を回避するには、以下を考慮してください。
    • auto.createtrue に設定しないようにします。
    • データベーステーブルとプライマリキーのデータ型を事前に作成します。
auto.evolve

レコードスキーマに関連する列がテーブルスキーマに存在しない場合に、ALTER を発行してレコードスキーマに関連する列を自動的に追加するかどうかを指定します。

  • 型: boolean
  • デフォルト: false
  • 重要度: 中
quote.sql.identifiers

SQL ステートメントで、テーブル名、列名、その他の識別子をいつクォートするかを指定します。後方互換性のため、デフォルトは always となっています。

  • 型: string
  • デフォルト: always
  • 重要度: 中

再試行

max.retries

エラー時に再試行する最大回数。これを超えるとタスクは失敗します。

  • 型: int
  • デフォルト: 10
  • 指定可能な値: [0,…]
  • 重要度: 中
retry.backoff.ms

エラーの後、再試行するまでの待ち時間(ミリ秒)。

  • 型: int
  • デフォルト: 3000
  • 指定可能な値: [0,…]
  • 重要度: 中