JDBC Connector Source Connector 構成プロパティ

このコネクターを使用するには、connector.class 構成プロパティでこのコネクタークラスの名前を指定します。

connector.class=io.confluent.connect.jdbc.JdbcSourceConnector

コネクター固有の構成プロパティについて、以降で説明します。

データベース接続のセキュリティ

コネクターの構成にセキュリティのパラメーターがないことに気づかれるでしょう。これは、SSL が JDBC 標準に含まれておらず、使用中の JDBC ドライバに依存するからです。通常は、SSL を connection.url パラメーターで構成する必要があります。たとえば、MySQL では以下のように構成します。

connection.url="jdbc:mysql://127.0.0.1:3306/sample?verifyServerCertificate=false&useSSL=true&requireSSL=true"

サポートおよび構成については、ご使用の JDBC ドライバのドキュメントを確認してください。

データベース

connection.url

JDBC 接続 URL。

jdbc:oracle:thin:@localhost:1521:orclpdb1jdbc:mysql://localhost/db_namejdbc:sqlserver://localhost;instance=SQLEXPRESS;databaseName=db_name

  • 型: string
  • 重要度: 高
  • 依存関係 : table.whitelisttable.blacklist
connection.user

JDBC 接続ユーザー。

  • 型: string
  • デフォルト: null
  • 重要度: 高
connection.password

JDBC 接続パスワード。

  • 型: password
  • デフォルト: null
  • 重要度: 高
connection.attempts

有効な JDBC 接続の取得を再試行する最大回数。正の整数でなければなりません。

  • 型: int
  • デフォルト: 3
  • 指定可能な値: [1,…]
  • 重要度: 低
connection.backoff.ms

接続再試行の間のバックオフ時間(ミリ秒)。

  • 型: long
  • デフォルト: 10000
  • 重要度: 低
catalog.pattern

データベースからテーブルメタデータをフェッチするためのカタログパターン。

  • 型: string
  • デフォルト: null
    • "" の場合、カタログなしでテーブルメタデータが取得されます。
    • null(デフォルト)の場合、検索の絞り込みにスキーマ名は使用されず、カタログに関係なくすべてのテーブルメタデータがフェッチされます。
  • 重要度: 中
table.whitelist

コピーに含めるテーブルのリスト。指定した場合、table.blacklist を設定できません。複数のテーブルを指定するには、コンマ区切りのリストを使用します(たとえば、table.whitelist: "User, Address, Email" )。

  • 型: list
  • デフォルト: ""
  • 重要度: 中
table.blacklist

コピーから除外するテーブルのリスト。指定した場合、table.whitelist を設定できません。複数のテーブルを指定するには、コンマ区切りのリストを使用します(たとえば、table.blacklist: "User, Address, Email" )。

  • 型: list
  • デフォルト: ""
  • 重要度: 中
schema.pattern

データベースからテーブルメタデータをフェッチするためのスキーマパターン。

  • 型: string

  • デフォルト: null

    • "" の場合、スキーマなしでテーブルメタデータが取得されます。
    • null(デフォルト)の場合、検索の絞り込みにスキーマ名は使用されず、スキーマに関係なくすべてのテーブルメタデータがフェッチされます。
  • 重要度: 高

    重要

    これをデフォルトの null 設定のままにすると、取得するテーブルメタデータの量が多すぎるため、コネクターはタイムアウトして失敗する可能性があります。大規模なデータベースの場合は、このパラメーターを設定してください。

numeric.precision.mapping

NUMERIC 値を精度に基づいて整数型にマップするかどうかを指定します。このオプションは、現在は非推奨となっています。将来のバージョンで完全に削除される可能性があります。代わりに numeric.mapping を使用してください。

  • 型: boolean
  • デフォルト: false
  • 重要度: 低
numeric.mapping

NUMERIC 値を精度およびオプションでスケールに基づいて整数型または小数型にマップします。

  • 型: string
  • デフォルト: null
  • 指定可能な値: [none、precision_only、best_fit]
    • すべての NUMERIC 列を Connect の DECIMAL 論理型で表す場合は、none を使用します。
    • 列の精度とスケールに基づいて NUMERIC 列を Connect の INT8、INT16、INT32、INT64、または FLOAT64 にキャストする必要がある場合は、best_fit を使用します。このオプションを指定しても、精度を失わずにネイティブ型にキャストすることができない場合には、NUMERIC 値を Connect DECIMAL に変換します。たとえば、精度 20 の NUMERIC(20) 型は、オーバーフローせずにネイティブの INT64 に変換することができません。したがって、DECIMAL として保持されます。
    • 前記の best_fit のプロパティに加えて、精度が失われる可能性があっても、スケールが指定された NUMERIC 列を常に Connect FLOAT64 型にキャストする必要がある場合は、best_fit_eager_double を使用します。
    • 列の精度にのみ基づいて(列のスケールは 0 と仮定して)NUMERIC 列をマップするには、precision_only を使用します。
    • none オプションがデフォルトですが、Connect の DECIMAL 型はそのバイナリ表現にマップされるため、Avro のシリアル化の問題につながる可能性があります。best_fit は、最も適切なプリミティブ型にマップするため、多くの場合に選択されます。
  • 重要度: 低
dialect.name

このコネクターで使用する必要があるデータベース言語の名前。デフォルトでは、これは空です。言語は、JDBC 接続 URL に基づいて、コネクターによって自動的に決定されます。これは、動作をオーバーライドしたり特定の言語を使用したりする必要がある場合に使用します。JDBC 接続プラグインの適切にパッケージ化されたすべての言語を使用できます。

  • 型: string
  • デフォルト: ""
  • 指定可能な値 : [, Db2DatabaseDialect, MySqlDatabaseDialect, SybaseDatabaseDialect, GenericDatabaseDialect, OracleDatabaseDialect ,SqlServerDatabaseDialect, PostgreSqlDatabaseDialect, SqliteDatabaseDialect, DerbyDatabaseDialect , SapHanaDatabaseDialect, MockDatabaseDialect, VerticaDatabaseDialect]
  • 重要度: 低

モード

mode

ポーリングのたびにテーブルをアップデートするためのモード。オプションには、次のものがあります。

  • 型: string
  • デフォルト: ""
  • 指定可能な値: [、bulk、timestamp、incrementing、timestamp+incrementing]
    • bulk: ポーリングのたびにテーブル全体を一括読み込みします
    • incrementing: 各テーブルの厳密な増分列を使用して、新しい行のみを検出します。既存の行の変更または削除は検出されません。
    • timestamp: タイムスタンプ(またはそれに準ずる)列を使用して、新規の行と変更済みの行を検出します。この列は書き込まれるたびにアップデートされる列であること、また、その値は単純増加する値であっても必ずしも一意の値ではないことが想定されます。
    • timestamp+incrementing: 新規および変更済み行を検出するタイムスタンプ列と、アップデートに対するグローバルに一意の ID を提供する厳密な増分列の 2 つの列を使用します。これにより、各行を一意のストリームオフセットに割り当てることができます。
  • 重要度: 高
  • 依存関係 : incrementing.column.nametimestamp.column.namevalidate.non.null
incrementing.column.name

新しい行の検出に使用する厳密な増分列の名前。空の値は、自動増分列を探すことによって、列を自動検出する必要があることを示します。この列を Null 許容にすることはできません。

  • 型: string
  • デフォルト: ""
  • 重要度: 中
timestamp.column.name

COALESCE SQL 関数を使用して、新規または変更済みの行を検出するための 1 つ以上のタイムスタンプ列のコンマ区切りリスト。ポーリングのたびに、最初の null 以外のタイムスタンプ値が前回見つかった最大のタイムスタンプ値より大きい行が検出されます。少なくとも 1 つの列は NULL 許容でないことが必要です。

  • 型: string
  • デフォルト: ""
  • 重要度: 中
timestamp.initial

タイムスタンプ基準を使用する最初のクエリで使用するエポックタイムスタンプ。現在の時刻を使用するには、-1 を使用します。指定しない場合、すべてのデータが取得されます。

  • 型: long
  • デフォルト: null
  • 重要度: 低
validate.non.null

デフォルトでは、JDBC コネクターは、すべての増分テーブルとタイムスタンプテーブルで、ID/タイムスタンプとして使用する列に NOT NULL が設定されていることを検証します。設定されていないテーブルがあった場合、JDBC コネクターは起動に失敗します。これを false に設定すると、このチェックは無効になります。

  • 型: boolean
  • デフォルト値: true
  • 重要度: 低
query

指定した場合は、query で新規行または更新行を選択できます。テーブルを結合したり、テーブル内の列のサブセットを選択したり、データをフィルターしたりする場合は、この設定を使用します。これを使用している場合は、このクエリを使用してコネクターがデータをコピーし、テーブル全体のコピーは無効になります。

  • 型: string
  • デフォルト: ""
  • 重要度: 中

ちなみに

mode=bulk を有効にした場合は、WHERE 句を含む クエリ を使用できます。これにより、各繰り返し処理でテーブルからすべての行が読み込まれます。その他のクエリモード で WHERE 句を指定するには、クエリへの WHERE 句の追加が可能である必要があります。詳しい説明については、Specifying a WHERE clause with query modes を参照してください。

quote.sql.identifiers

SQL ステートメントで、テーブル名、列名、その他の識別子をいつクォートするかを指定します。後方互換性のため、デフォルトは always となっています。

  • 型: string
  • デフォルト: always
  • 重要度: 中
query.suffix

生成されたクエリの最後に追加するサフィックス。

  • 型: string
  • デフォルト: ""
  • 重要度: 低
connection.attempts

The number of times to retry SQL exceptions encountered when executing queries.

  • 型: int
  • デフォルト: 100
  • 重要度: 低
transaction.isolation.mode

The mode to control which transaction isolation level is used when running queries against the database. By default, no explicit transaction isolation mode is set. SQL_SERVER_SNAPSHOT will only work against a connector configured to write to SQL Server. Options include: DEFAULT, READ_UNCOMMITTED, READ_COMMITTED, REPEATABLE_READ, SERIALIZABLE, and SQL_SERVER_SNAPSHOT.

  • 型: string
  • デフォルト: false
  • Valid Values: [DEFAULT, READ_UNCOMMITED, READ_COMMITED, REPEATABLE_READ, SERIALIZABLE, SQL_SERVER_SNAPSHOT]
  • 重要度: 低

コネクター

table.types

デフォルトでは、JDBC コネクターはソースデータベースからタイプが TABLE のテーブルのみを検出します。この構成では、抽出するテーブルタイプをコンマ区切りのリストで指定できます。

  • 型: list。

    • TABLE
    • VIEW
    • SYSTEM TABLE
    • GLOBAL TEMPORARY
    • LOCAL TEMPORARY
    • ALIAS
    • SYNONYM

    ほとんどの場合、TABLE または VIEW のみ設定するのが合理的です。

  • デフォルト: TABLE

  • 重要度: 低

poll.interval.ms

各テーブルの新しいデータをポーリングする頻度(ミリ秒)。

  • 型: int
  • デフォルト: 5000
  • 重要度: 高
batch.max.rows

新しいデータのポーリング時に単一のバッチに含める最大行数。この設定を使用して、コネクターの内部にバッファリングするデータの量を制限できます。

  • 型: int
  • デフォルト: 100
  • 重要度: 低
table.poll.interval.ms

新しいテーブルや削除されたテーブルをポーリングする頻度(ミリ秒)。これにより、追加されたテーブル内のデータのポーリングの開始、または削除されたテーブル内のデータのポーリングの停止のタスクの構成がアップデートされる可能性があります。

  • 型: long
  • デフォルト: 60000
  • 重要度: 低
topic.prefix

データのパブリッシュ先である Apache Kafka® トピックの名前、またはカスタムクエリの場合にパブリッシュ先トピックのフルネームを生成するためにテーブル名の先頭に付けるプレフィックス。

  • 型: string
  • 重要度: 高
timestamp.delay.interval.ms

特定のタイムスタンプを持つ行が出現してからそれを結果に含めるまでに待機する時間。より早いタイムスタンプを持つトランザクションが完了できるように、遅延を追加できます。最初の実行では、利用可能なすべてのレコードを(つまり、タイムスタンプ 0 から)、現在の時刻から遅延を引いた時刻までフェッチします。その後の各実行では、最後のフェッチ時刻から、現在の時刻から遅延を引いた時刻までのデータを取得します。

  • 型: long
  • デフォルト: 0
  • 重要度: 高
db.timezone

Name of the JDBC timezone used in the connector when querying with time-based criteria. Defaults to UTC.

  • 型: string
  • Default: "UTC"
  • 指定可能な値: 有効な JDK タイムゾーン
  • 重要度: 中
timestamp.granularity

タイムスタンプ列の粒度を定義します。オプションには、次のものがあります。

  • 型: string
  • デフォルト: connect_logical
  • 指定可能な値: [connect_logical、nanos_long、nanos_string、nanos_iso_datetime_string]
    • connect_logical(デフォルト): Kafka Connect に組み込まれている表現を使用してタイムスタンプの値を表します。
    • nanos_long: エポックからのナノ秒としてタイムスタンプの値を表します。
    • nanos_string: エポックからのナノ秒としてタイムスタンプの値を文字列で表します。
    • nanos_iso_datetime_string: ISO フォーマット「yyyy-MM-dd'T'HH:mm:ss.n」を使用します。
  • 重要度: 低

トピックの自動作成

自動トピック作成の詳細については、「ソースコネクターの自動トピック作成の構成|userguide.html#connect-source-auto-topic-creation」を参照してください。