重要

このページの日本語コンテンツは古くなっている可能性があります。最新の英語版コンテンツをご覧になるには、こちらをクリックしてください。

Oracle CDC Source Connector for Confluent Cloud

注釈

If you are installing the connector locally for Confluent Platform, see Oracle CDC Source for Confluent Platform.

フルマネージド型の Oracle CDC Source Connector for Confluent Cloud は、各変更をデータベースの行に取り込み、それらの変更を Apache Kafka® トピックの変更イベントレコードとして表します。このコネクターは Oracle LogMiner を使用してデータベースの redo ログを読み取ります。コネクターには、LogMiner を使用する権限と、コネクターによって取り込まれたすべてのテーブルから選択する権限を持つデータベースユーザーが必要です。

コネクターは、単一のデータベース内のテーブルのサブセットを取り込むように構成することができます。これは、ユーザーがアクセスできるすべてのテーブルのうち、include 正規表現に一致するものとして定義されます。また、別の exclude 正規表現に一致するテーブルを取り込まないように構成することもできます。

コネクターは各テーブルの変更を Kafka トピックに書き込みます。これらのトピックでは、table.topic.name.template コネクター 構成プロパティ により table-to-topic マッピングが指定されています。このプロパティのデフォルトは、テーブルのドット区切り完全修飾名です(database_name.schema_name.table_name など)。

コネクターはリテラルといくつかの変数(${tableName}${schemaName} など)を認識して table-to-topic マッピングをカスタマイズします。変数は実行時に解決されます。たとえば、次の構成プロパティにより ORCL.ADMIN.USERS テーブルへの変更が my-prefix.ORCL.ADMIN.USERS という名前の Kafka トピックに書き込まれます。

table.topic.name.template=my-prefix.${databaseName}.${schemaName}.${tableName}

テンプレート変数のリストについては、「テンプレート変数」を参照してください。

コネクターは未加工の Oracle redo ログレコードをすべて、"redo ログトピック" として論理参照される 1 つの Kafka トピックに書き込むように設計されています。redo.log.topic.name 構成プロパティはこのトピックの名前を決定します。コネクターは実際にこのトピックを消費して、テーブル固有のトピックに書き込まれるテーブル固有のすべてのイベントを識別し、生成します。コネクターは、table.topic.name.template プロパティを空の文字列に設定することで、テーブル固有イベントをテーブル固有のトピックに生成することなく、redo ログトピックにのみ書き込むように構成できます。

制限

以下の情報を確認してください。

クイックスタート

このクイックスタートを使用して、Confluent Cloud Oracle CDC Source Connector の利用を開始することができます。このクイックスタートでは、コネクターを選択してから、Oracle データベースの既存データのスナップショットを取得して、それ以降に発生する行レベルの変更をすべてモニタリングして記録するようにコネクターを構成するための基本的な方法について説明します。

重要

コネクターを構成する前に、Oracle データベースの前提条件 を参照して Oracle データベースの構成情報と構成後の検証手順を確認してください。

前提条件
  • アマゾンウェブサービス (AWS)、Microsoft Azure (Azure)、または Google Cloud Platform (GCP)上の Confluent Cloud クラスターへのアクセスを許可されていること。
  • Confluent CLI がインストールされ、クラスター用に構成されていること。「Confluent CLI のインストール」を参照してください。
  • スキーマレジストリ ベースのフォーマット(Avro、JSON_SR(JSON スキーマ)、Protobuf など)を使用するには、Schema Registry を有効にしておく必要があります。詳細については、「スキーマレジストリ Enabled Environments」を参照してください。
  • ネットワークに関する考慮事項については、「Networking and DNS Considerations」を参照してください。静的なエグレス IP を使用する方法については、「静的なエグレス IP アドレス」を参照してください。
  • Kafka クラスターの認証情報。次のいずれかの方法で認証情報を指定できます。
    • 既存の サービスアカウント のリソース ID を入力する。
    • コネクター用の Confluent Cloud サービスアカウント を作成する。サービスアカウントのドキュメント で、必要な ACL エントリを確認してください。一部のコネクターには固有の ACL 要件があります。
    • Confluent Cloud の API キーとシークレットを作成する。キーとシークレットを作成するには、confluent api-key create を使用するか、コネクターのセットアップ時に Cloud Console で直接 API キーとシークレットを自動生成します。

Confluent Cloud Console の使用

ステップ 1: Confluent Cloud クラスターを起動します。

インストール手順については、「Quick Start for Confluent Cloud」を参照してください。

ステップ 2: コネクターを追加します。

左のナビゲーションメニューの Data integration をクリックし、Connectors をクリックします。クラスター内に既にコネクターがある場合は、+ Add connector をクリックします。

ステップ 3: コネクターを選択します。

Click the Oracle Database Source connector card.

Oracle CDC Source Connector Card

ステップ 4: 接続をセットアップします。

以下を実行して、Continue をクリックします。

注釈

  • すべての 前提条件 を満たしていることを確認してください。
  • アスタリスク( * )は必須項目であることを示しています。
  1. コネクターの 名前 を入力します。

  2. Kafka Cluster credentials で Kafka クラスターの認証情報の指定方法を選択します。サービスアカウントのリソース ID を選択するか、API キーとシークレットを入力できます(または、Cloud Console でこれらを生成します)。

  3. データベース接続の詳細情報 を追加します。

    • Oracle server: Oracle サーバーのホスト名またはアドレス。
    • Oracle port: Oracle に接続するために使用されるポート番号。デフォルトは 1521 です。
    • Oracle SID: Oracle システム ID(SID)。
    • Oracle PDB: Oracle PDB 名。マルチテナント CDB/PDB アーキテクチャを使用する場合にのみ設定します。デフォルトでは設定されていません。これは、キャプチャー対象のテーブルが CDB ルートに存在することを意味します。
    • Oracle service: Oracle サービス名。設定されている場合、コネクターは常にこのサービス名を使用してデータベースに接続します。
    • Oracle username: Oracle データベースユーザーの名前。
    • Oracle password: Oracle データベースユーザーのパスワード。
    • Trust store: サーバー CA 証明書情報を含むトラストストアファイル。SSL を使用してデータベースに接続する場合にのみ必要になります。
    • Trust store password: トラストストアのトラストストアパスワード。SSL を使用してデータベースに接続する場合にのみ必要になります。
    • Enable Oracle FAN events: 接続で Oracle RAC Fast Application Notification(FAN) イベントの使用を許可するかどうかを指定します。これはデフォルトでは無効になっており、FAN イベントがデータベースでサポートされていても使用されません。これを有効にするのは、FAN イベントで Oracle RAC をセットアップして使用する場合のみです。この機能を有効にすると、FAN イベントを使用するようにデータベースがセットアップされていない場合に、接続の問題が発生する可能性があります。
  4. データベースの詳細情報 を追加します。

    • Table inclusion regex: SID または PDB、および schema が含まれているテーブルの完全修飾名に一致する正規表現。たとえば、非コンテナーデータベースを使用する場合は、ORCLCDB[.]C##MYUSER[.](table1|orders|customers) または ORCLCDB.C##MYUSER.(table1|orders|customers) のようになります。マルチテナントデータベースを使用する場合は、ORCLPDB1[.]C##MYUSER[.](table1|orders|customers) または ORCLPDB1.C##MYUSER.(table1|orders|customers) のようになります。
    • Table exclusion regex: SID または PDB、および schema が含まれているテーブルの完全修飾名に一致する正規表現。たとえば、非コンテナーデータベースを使用する場合は、ORCLCDB[.]C##MYUSER[.](table1|orders|customers) または ORCLCDB.C##MYUSER.(table1|orders|customers) のようになります。マルチテナントデータベースを使用する場合は、ORCLPDB1[.]C##MYUSER[.](table1|orders|customers) または ORCLPDB1.C##MYUSER.(table1|orders|customers) のようになります。空白の正規表現(デフォルト)は除外がないことを意味します。
    • Start from: 初めて開始するときに、コネクターが実行すべきこと。値は、リテラル snapshot (デフォルト)、リテラル current、リテラル force_current、Oracle システム変更番号(SCN)、または DD-MON-YYYY HH24:MI:SS フォーマットのデータベースタイムスタンプです。snapshot リテラルを指定すると、コネクターは初めて開始するときにキャプチャーしたテーブルのスナップショットを取得し、スナップショットが取得された時点から redo ログイベントの処理を続行します。current リテラルを指定すると、コネクターはスナップショットを作成せずに現在の Oracle SCN から開始します。force_current リテラルは current と同じですが、コネクターが再起動されると、以前に保管されたオフセットが無視されます。このオプションは、オフセットに保管されている SCN が Oracle アーカイブログで利用できなくなり、コネクターを回復する場合にのみ使用する必要があります。
  5. Connector details に情報を追加します(省略可)。

    • Emit tombstone on delete: true に設定した場合、削除操作により null 値の tombstone レコードが出力されます。デフォルトは false です。
    • Behavior on dictionary mismatch: DDL ステートメントに起因するディクショナリの不一致が原因で、コネクターが列の値を解析できない場合の動作を指定します。こうした状況は、ディクショナリモード online が設定されているものの、DDL の変更が発生する前に記録された履歴データをコネクターがストリーミングしている場合に発生する可能性があります。このプロパティのデフォルト値は fail (コネクターのタスクが失敗する)です。log オプションは解析不能なレコードを記録し、問題のあるレコードをスキップします。コネクターのタスクは失敗しません。
    • Behavior on unparsable statement: 解析できなかった SQL ステートメントをコネクターが検出した場合に必要とする動作を指定します。デフォルトオプション fail では、コネクターのタスクは失敗します。log オプションは解析不能なステートメントを記録し、問題のあるレコードをスキップします。コネクターのタスクは失敗しません。
    • Database timezone: タイムゾーン情報が利用できない場合、コネクターで Oracle の DATE 型や TIMESTAMP 型を解析するときにこのプロパティはデフォルトで UTC に設定されます。たとえば、db.timezone=UTC の場合、DATE および TIMESTAMP のデータはどちらも UTC タイムゾーンを使用して解析されます。この値は、有効な java.util.TimeZone ID であることが必要です。
    • Database timezone for DATE type: タイムゾーン情報が利用できない場合、コネクターで Oracle の DATE 型を解析するときに使用されるデフォルトタイムゾーンです。このプロパティが設定されている場合、その値は DATE 型の db.timezone の値を上書きします。たとえば、db.timezone=UTC および db.timezone.date=America/Los_Angeles の場合、TIMESTAMP 型のデータは UTC タイムゾーンを使用して解析されます。DATE 型のデータは、America/Los_Angeles タイムゾーンを使用して解析されます。この値は、有効な java.util.TimeZone ID であることが必要です。
    • Redo log startup polling limit (ms): The amount of time to wait for the redo log to be present on connector startup. This is only relevant when the connector is configured to capture change events. Defaults to five minutes (or 300000 milliseconds). The maximum is one hour (or 3600000 milliseconds).
  6. Connection details に情報を追加します(省略可)。

    • Query timeout (ms): Oracle に送信される任意のクエリのタイムアウト(ミリ秒)。デフォルトは 5 分(300000 ミリ秒)です。負の値に設定した場合、コネクターはクエリにタイムアウトを適用しません。
    • Maximum batch size: コネクターから Kafka に返されるレコードの最大数。追加のレコードがない場合、コネクターはより少ないレコードを返す可能性があります。
    • Poll linger milliseconds: 空のバッチを返す前にレコードの到着を待機する最大時間。デフォルトは 5 秒(5000 ミリ秒)です。
    • Maximum buffer size: すべてのスナップショットスレッドおよび redo ログからバッチへとバッファリングできる最大レコード数。デフォルト値は 0 です。この場合、バッファサイズは最大バッチサイズとスレッド数から計算されます。
    • Database poll interval (ms): データベース redo ログイベントを取得するためのポーリング間の間隔。連続マイニングが使用可能で有効になっている場合、この値は効果がありません。デフォルトは 1 秒(1000 ミリ秒)です。指定可能な最小値は 500 ミリ秒です。
    • Snapshot row fetch size: スナップショットでテーブル行数をフェッチするときに JDBC ドライバにヒントとして提供される行数。デフォルトは 2000 行です。このヒントを無効にするには 0 を使用します。
    • Redo log fetch size: redo ログから行数をフェッチするときに JDBC ドライバにヒントとして提供される行数。デフォルトは 5000 行です。このヒントを無効にするには 0 を使用します。
    • Included fields: redo ログに含めるフィールドのコンマ区切りのリスト。
    • Excluded fields: redo ログから除外するフィールドのコンマ区切りのリスト。
  7. Output messages に詳細情報を追加します(省略可)。

    • Topic name template: コネクターによって変更イベントが書き込まれる Kafka トピックの名前を定義するテンプレート。この値は、コネクターがキャプチャーしたすべてのテーブルからすべてのレコードを 1 つのトピックに書き込む場合は、定数にすることができます。または、サポートされているテンプレート変数(${databaseName}${schemaName}${tableName}${connectorName} など)を値に含めることができます。デフォルトは空の文字列です。この場合、コネクターは変更イベントレコードを生成しません。特別な意味を持つ文字である \${} は、テンプレート変数の一部として意図されていない場合、\ でエスケープする必要があります。redo ログトピックが指定されている場合にのみ、このプロパティを空のままにすることができます。空のままにすると、コネクターは redo ログトピックにのみ書き込みます。

    • Redo log corruption topic: Oracle redo ログ内の破損に関する情報を記述し、欠落したデータを示すイベントをコネクターが記録する Kafka のトピックの名前。デフォルトは空の文字列です。このプロパティを空のままにすると、コネクターはこの情報を Kafka に書き込みません。

    • Redo log topic: コネクターがすべての未加工の redo ログイベントを記録する Kafka のトピックの名前。空のままにすると、このプロパティは、デフォルトで ${connectorName}-${databaseName}-redo-log に設定されます。たとえば、lcc-mycdcconnector-myoracledb-redo-log のように設定されます。

      注釈

      出力トピックの場合は、ACL を設定する必要があります。詳細については、Oracle CDC Source Connector の ACL を参照してください。

    • Key template: 各変更イベントの Kafka レコードキーを定義するテンプレート。デフォルトでは、キーにはプライマリキーフィールドを持つ STRUCT が含まれ、テーブルにプライマリキーまたは一意のキーがない場合は null になります。

    • Dictionary mode: コネクターで使用されるディクショナリの処理モード。autoonlineredo_log が指定できます。デフォルトは auto です。

      • auto: コネクターでは、テーブルスキーマを進化させる DDL ステートメントが検出されるまで、オンラインカタログのディクショナリを使用します。DDL ステートメントが検出されると、コネクターでは、アーカイブ済み redo ログのディクショナリの使用を開始します。DDL ステートメントが処理されると、コネクターでは online カタログの使用に戻ります。DDL ステートメントが想定される場合は、このモードを使用します。
      • online: コネクターでは、常にオンラインディクショナリカタログを使用します。DDL ステートメントが想定されない場合は、このモードを使用します。
      • redo_log: コネクターでは、常にアーカイブ済み redo ログのディクショナリカタログを使用します。オンライン redo ログにアクセスできない場合は、このモードを使用してください。CDC イベントは、オンラインログからアーカイブされるまで遅延され、コネクターで処理されません。
    • Output table name field: Kafka に書き込まれた変更レコード内のフィールドの名前。これには、影響を受ける Oracle テーブルのスキーマ修飾名が含まれます(dbo.Customers など)。空白値は、このフィールドが変更レコードに含まれないことを示します。

    • Output SCN field: Kafka に書き込まれた変更レコード内のフィールドの名前。これには、この変更が行われたときの Oracle システム変更番号(SCN)が含まれます。これを空のままにすると、このフィールドは変更レコードに含まれません。

    • Output operation type field: Kafka に書き込まれた変更レコード内のフィールドの名前。これには、この変更イベントの操作タイプが含まれます。これを空のままにすると、このフィールドは変更レコードに含まれません。

    • Output operation timestamp field: Kafka に書き込まれた変更レコード内のフィールドの名前。これには、この変更イベントの操作タイムスタンプが含まれます。これを空のままにすると、このフィールドは変更レコードに含まれません。

    • Output current timestamp field: Kafka に書き込まれた変更レコード内のフィールドの名前。これには、変更イベントがいつ処理されたかを示すコネクターのタイムスタンプが含まれます。これを空のままにすると、このフィールドは変更レコードに含まれません。

    • Output row ID field: Kafka に書き込まれた変更レコード内のフィールドの名前。これには、このイベントによって変更されたテーブルの行 ID が含まれます。これを空のままにすると、このフィールドは変更レコードに含まれません。

    • Output username field: Kafka に書き込まれた変更レコード内のフィールドの名前。これには、この変更の起因となったトランザクションを実行した Oracle ユーザーの名前が含まれます。これを空のままにすると、このフィールドは変更レコードに含まれません。

    • Output redo field: Kafka に書き込まれた変更レコード内のフィールドの名前。これには、この変更レコードが作成された元の redo DML ステートメントが含まれます。これを空のままにすると、このフィールドは変更レコードに含まれません。

    • Output undo field: Kafka に書き込まれた変更レコード内のフィールドの名前。これには、この変更を有効に取り消し、行の "以前の" ステートを表す元の undo DML ステートメントが含まれます。これを空のままにすると、このフィールドは変更レコードに含まれません。

    • Output operation type read value: 読み取り(スナップショット)変更イベントの操作タイプの値。デフォルトでは、このプロパティ値は R に設定されています。

    • Output operation type insert value: 挿入変更イベントの操作タイプの値。デフォルトでは、このプロパティ値は I に設定されています。

    • Output operation type update value: アップデート変更イベントの操作タイプの値。デフォルトでは、このプロパティ値は U に設定されています。

    • Output operation type delete value: 削除変更イベントの操作タイプの値。デフォルトでは、このプロパティ値は D に設定されています。

    • Output operation type truncate value: 切り捨て変更イベントの操作タイプの値。デフォルトでは、このプロパティ値は T に設定されています。

    • LOB topic template: コネクターによって LOB オブジェクトが書き込まれる Kafka トピックの名前を定義するテンプレート。値は、すべてのキャプチャー対象テーブルからすべての LOB オブジェクトが 1 つのトピックに書き込まれる場合は定数にすることができます。または、サポートされているテンプレート変数(${columnName}${databaseName}${schemaName}${tableName}${connectorName} など)を値に含めることができます。デフォルトは空で、キャプチャー対象テーブルに LOB タイプ列がある場合に、すべての LOB タイプ列を無視します。特別な意味を持つ文字である \${} は、テンプレート変数の一部として意図されていない場合、\ でエスケープする必要があります。

    • Snapshot by partitions: テーブルがパーティションを使用するよう定義されている場合に、コネクターが各テーブルパーティションでスナップショットを実行する必要があるかどうかを指定します。デフォルトでは false に設定されます。この場合、各テーブルの全体のスナップショットが 1 回実行されます。

    • Snapshot threads per task: スナップショットを実行するために各タスクで使用できるスレッド数。タスクに割り当てられているテーブル数よりこの値が大きい場合にのみ、各タスクで使用されます。デフォルトは 4 です。

    • Enable large LOB object support: true の場合、コネクターは、複数の redo ログレコードに分割される、サイズの大きな LOB オブジェクトをサポートします。コネクターは、redo ログトピックにコミットメッセージを送信し、これらのコミットメッセージを使用して、サイズの大きな LOB オブジェクトを LOB トピックに送信できるタイミングを追跡します。

    • Map numeric values...: NUMERIC 値を精度とスケール(省略可)に基づいて、プリミティブ型または小数型にマップします。none オプションがデフォルトですが、Connect の DECIMAL 型はそのバイナリ表現にマップされるため、シリアル化の問題につながる可能性があります。一般的には、best_fit_or... オプションのいずれかが望ましい選択となります。以下のオプションを使用できます。

      • すべての NUMERIC 列を Connect の DECIMAL 論理型で表す場合は、none を使用します。
      • NUMERIC 列を、列の精度とスケールに基づいて、Connect のプリミティブ型にキャストする必要がある場合は、best_fit_or_decimal を使用します。精度とスケールがいずれかのプリミティブ型の境界を超える場合は、代わりに Connect の DECIMAL 論理型が使用されます。
      • NUMERIC 列を、列の精度とスケールに基づいて、Connect のプリミティブ型にキャストする必要がある場合は、best_fit_or_double を使用します。精度とスケールがいずれかのプリミティブ型の境界を超える場合は、代わりに Connect の FLOAT64 型が使用されます。
      • NUMERIC 列を、列の精度とスケールに基づいて、Connect のプリミティブ型にキャストする必要がある場合は、best_fit_or_string を使用します。精度とスケールがいずれかのプリミティブ型の境界を超える場合は、代わりに Connect の STRING 型が使用されます。
      • 列の精度にのみ基づいて(列のスケールは 0 と仮定して)NUMERIC 列をマップするには、precision_only を使用します。
      • 後方互換性上の理由から、best_fit オプションも利用できます。その動作は、best_fit_or_decimal と同じです。
    • Numeric default scale: スケールを決定できない場合に、数値型に対して使用するデフォルトのスケール。

    • Map Oracle DATE type to Connect data type: Oracle DATE 値を Connect の型にマップします。

    • Output Kafka record key format: Kafka 出力レコードキーのフォーマットを設定します。指定可能なエントリは、AVRO、JSON_SR、PROTOBUF、または JSON です。スキーマベースのメッセージフォーマット(AVRO、JSON_SR、PROTOBUF など)を使用する場合は、Confluent Cloud Schema Registry を構成しておく必要がある点に注意してください。

    • Output Kafka record value format: Kafka 出力レコード値のフォーマットを設定します。指定可能なエントリは、AVRO、JSON_SR、PROTOBUF、または JSON です。スキーマベースのメッセージフォーマット(AVRO、JSON_SR、PROTOBUF など)を使用する場合は、Confluent Cloud Schema Registry を構成しておく必要がある点に注意してください。

  8. このコネクターで使用する タスク の数を入力します。タスクが多いほどパフォーマンスが向上する可能性があります。デフォルトのタスク数は 2 です。

  9. Transforms and Predicates: 詳細については、Single Message Transforms(SMT) のドキュメントを参照してください。SMT のいくつかの例については、「SMT の例」を参照してください。

すべてのプロパティと定義については、「構成プロパティ」を参照してください。

ステップ 5: コネクターを起動します。

実行中の構成をプレビューして、接続の詳細情報を確認します。プロパティの構成に問題がないことが確認できたら、Launch をクリックします。

ちなみに

コネクターの出力のプレビューについては、「コネクターのデータプレビュー」を参照してください。

ステップ 6: コネクターのステータスを確認します。

コネクターのステータスが Provisioning から Running に変わります。ステータスが変わるまで数分かかる場合があります。

ステップ 7: Kafka トピックを確認します。

コネクターが実行中になったら、レコードが Kafka トピックに取り込まれていることを確認します。

Confluent Cloud API for Connect の詳細と使用例については、「Confluent Cloud API for Connect」セクションを参照してください。

参考

フルマネージド型の Confluent Cloud コネクターが Confluent Cloud ksqlDB でどのように動作するかを示す例については、「Cloud ETL のデモ」を参照してください。この例では、Confluent CLI を使用して Confluent Cloud のリソースを管理する方法についても説明しています。

../../_images/topology.ja.png

Confluent CLI の使用

以下の手順に従うと、Confluent CLI を使用してコネクターをセットアップし、実行できます。

注釈

  • すべての 前提条件 を満たしていることを確認してください。
  • コマンド例では Confluent CLI バージョン 2 を使用しています。詳細については、「Confluent CLI v2 への移行」を参照してください。

ステップ 1: 使用可能なコネクターをリスト表示します。

以下のコマンドを入力して、使用可能なコネクターをリスト表示します。

confluent connect plugin list

ステップ 2: コネクターの必須の構成プロパティを表示します。

以下のコマンドを実行して、コネクターの必須プロパティを表示します。

confluent connect plugin describe <connector-catalog-name>

例:

confluent connect plugin describe OracleCdcSource

出力例:

The following are required configs:
connector.class : OracleCdcSource
name
kafka.api.key : ["kafka.api.key" is required when "kafka.auth.mode==KAFKA_API_KEY"]
kafka.api.secret : ["kafka.api.secret" is required when "kafka.auth.mode==KAFKA_API_KEY"]
oracle.server
oracle.sid
oracle.username
table.inclusion.regex

ステップ 3: コネクターの構成ファイルを作成します。

コネクター構成プロパティを含む JSON ファイルを作成します。以下の例は、コネクターの必須プロパティを示しています。

{
  "name": "OracleCdcSourceConnector_0",
  "config": {
    "connector.class": "OracleCdcSource",
    "name": "OracleCdcSourceConnector_0",
    "kafka.auth.mode": "KAFKA_API_KEY",
    "kafka.api.key": "****************",
    "kafka.api.secret": "**************************************************",
    "oracle.server": "database-5.abcdefg12345.us-west-2.rds.amazonaws.com",
    "oracle.port": "1521",
    "oracle.sid": "ORCL",
    "oracle.username": "admin",
    "oracle.password": "**********",
    "table.inclusion.regex": "ORCL[.]ADMIN[.]CUSTOMERS",
    "start.from": "SNAPSHOT",
    "query.timeout.ms": "60000",
    "redo.log.row.fetch.size": "1",
    "table.topic.name.template": "${databaseName}.${schemaName}.${tableName}",
    "lob.topic.name.template":"${databaseName}.${schemaName}.${tableName}.${columnName}",
    "numeric.mapping": "BEST_FIT_OR_DOUBLE",
    "output.data.key.format": "JSON_SR",
    "output.data.value.format": "JSON_SR",
    "tasks.max": "2"
  }
}

以下のプロパティ定義に注意してください。

  • "name": 新しいコネクターの名前を設定します。
  • "connector.class": コネクターのプラグイン名を指定します。
  • "kafka.auth.mode": 使用するコネクターの認証モードを指定します。オプションは SERVICE_ACCOUNT または KAFKA_API_KEY (デフォルト)です。API キーとシークレットを使用するには、構成プロパティ kafka.api.keykafka.api.secret を構成例(前述)のように指定します。サービスアカウント を使用するには、プロパティ kafka.service.account.id=<service-account-resource-ID>リソース ID を指定します。使用できるサービスアカウントのリソース ID のリストを表示するには、次のコマンドを使用します。

    confluent iam service-account list
    

    例:

    confluent iam service-account list
    
       Id     | Resource ID |       Name        |    Description
    +---------+-------------+-------------------+-------------------
       123456 | sa-l1r23m   | sa-1              | Service account 1
       789101 | sa-l4d56p   | sa-2              | Service account 2
    
  • "table.inclusion.regex": SID または PDB、および schema が含まれているテーブルの完全修飾名に一致する正規表現。たとえば、上記の構成で示した例に加えて、非コンテナーデータベースを使用する場合は、ORCLCDB[.]C##MYUSER[.](table1|orders|customers) または ORCLCDB.C##MYUSER.(table1|orders|customers) のようになります。マルチテナントデータベースを使用する場合は、ORCLPDB1[.]C##MYUSER[.](table1|orders|customers) または ORCLPDB1.C##MYUSER.(table1|orders|customers) のようになります。

  • "start.from": 初めて開始するときに、コネクターが実行すべきこと。値は、リテラル snapshot (デフォルト)、リテラル current、リテラル force_current、Oracle システム変更番号(SCN)、または DD-MON-YYYY HH24:MI:SS フォーマットのデータベースタイムスタンプです。snapshot リテラルを指定すると、コネクターは初めて開始するときにキャプチャーしたテーブルのスナップショットを取得し、スナップショットが取得された時点から redo ログイベントの処理を続行します。current リテラルを指定すると、コネクターはスナップショットを作成せずに現在の Oracle SCN から開始します。force_current リテラルは current と同じですが、コネクターが再起動されると、以前に保管されたオフセットが無視されます。このオプションは、オフセットに保管されている SCN が Oracle アーカイブログで利用できなくなり、コネクターを回復する場合にのみ使用する必要があります。

  • "query.timeout.ms": Oracle に送信される任意のクエリのタイムアウト(ミリ秒)。デフォルトは 5 分(300000 ミリ秒)です。負の値に設定した場合、コネクターはクエリにタイムアウトを適用しません。

  • "redo.log.row.fetch.size": redo ログから行数をフェッチするときに JDBC ドライバにヒントとして提供される行数。デフォルトは 5000 行です。このヒントを無効にするには 0 を使用します。

  • "table.topic.name.template": コネクターによって変更イベントが書き込まれる Kafka トピックの名前を定義するテンプレート。この値は、コネクターがキャプチャーしたすべてのテーブルからすべてのレコードを 1 つのトピックに書き込む場合は、定数にすることができます。または、サポートされているテンプレート変数(${databaseName}${schemaName}${tableName}${connectorName} など)を値に含めることができます。デフォルトは空の文字列です。この場合、コネクターは変更イベントレコードを生成しません。特別な意味を持つ文字である \${} は、テンプレート変数の一部として意図されていない場合、\ でエスケープする必要があります。redo ログトピックが指定されている場合にのみ、このプロパティを空のままにすることができます。空のままにすると、コネクターは redo ログトピックにのみ書き込みます。

  • "numeric.mapping": NUMERIC 値を精度とスケール(省略可)に基づいて、プリミティブ型または小数型にマッピングします。none オプションがデフォルトですが、Connect の DECIMAL 型はそのバイナリ表現にマップされるため、シリアル化の問題につながる可能性があります。一般的には、best_fit_or... オプションのいずれかが望ましい選択となります。以下のオプションを使用できます。

    • すべての NUMERIC 列を Connect の DECIMAL 論理型で表す場合は、none を使用します。
    • NUMERIC 列を、列の精度とスケールに基づいて、Connect のプリミティブ型にキャストする必要がある場合は、best_fit_or_decimal を使用します。精度とスケールがいずれかのプリミティブ型の境界を超える場合は、代わりに Connect の DECIMAL 論理型が使用されます。
    • NUMERIC 列を、列の精度とスケールに基づいて、Connect のプリミティブ型にキャストする必要がある場合は、best_fit_or_double を使用します。精度とスケールがいずれかのプリミティブ型の境界を超える場合は、代わりに Connect の FLOAT64 型が使用されます。
    • NUMERIC 列を、列の精度とスケールに基づいて、Connect のプリミティブ型にキャストする必要がある場合は、best_fit_or_string を使用します。精度とスケールがいずれかのプリミティブ型の境界を超える場合は、代わりに Connect の STRING 型が使用されます。
    • 列の精度にのみ基づいて(列のスケールは 0 と仮定して)NUMERIC 列をマップするには、precision_only を使用します。
    • 後方互換性上の理由から、best_fit オプションも利用できます。その動作は、best_fit_or_decimal と同じです。
  • "output.data.key.format": Kafka 出力レコードキーのフォーマットを設定します。指定可能なエントリは、AVRO、JSON_SR、PROTOBUF、または JSON です。スキーマベースのメッセージフォーマット(AVRO、JSON_SR、PROTOBUF など)を使用する場合は、Confluent Cloud Schema Registry を構成しておく必要がある点に注意してください。

  • "output.data.value.format": Kafka 出力レコード値のフォーマットを設定します。指定可能なエントリは、AVRO、JSON_SR、PROTOBUF、または JSON です。スキーマベースのメッセージフォーマット(AVRO、JSON_SR、PROTOBUF など)を使用する場合は、Confluent Cloud Schema Registry を構成しておく必要がある点に注意してください。

  • "tasks.max": このコネクターで使用する タスク の最大数を設定します。タスクが多いほどパフォーマンスが向上する可能性があります。デフォルトのタスク数は 2 です。

Single Message Transforms: SMT 追加の詳細については、Single Message Transforms(SMT) のドキュメントを参照してください。SMT のいくつかの例については、「SMT の例」を参照してください。

すべてのプロパティと定義については、「構成プロパティ」を参照してください。

ステップ 4: プロパティファイルを読み込み、コネクターを作成します。

以下のコマンドを入力して、構成を読み込み、コネクターを起動します。

confluent connect create --config <file-name>.json

例:

confluent connect create --config oracle-cdc-source.json

出力例:

Created connector OracleCdcSource_0 lcc-ix4dl

ステップ 5: コネクターのステータスを確認します。

以下のコマンドを入力して、コネクターのステータスを確認します。

confluent connect list

出力例:

ID          |            Name       | Status  |  Type
+-----------+-----------------------+---------+-------+
lcc-ix4dl   | OracleCdcSource_0     | RUNNING | source

ステップ 6: Kafka トピックを確認します。

コネクターが実行中になったら、メッセージが Kafka トピックに取り込まれていることを確認します。

Confluent Cloud API for Connect の詳細と使用例については、「Confluent Cloud API for Connect」セクションを参照してください。

次のステップ

参考

フルマネージド型の Confluent Cloud コネクターが Confluent Cloud ksqlDB でどのように動作するかを示す例については、「Cloud ETL のデモ」を参照してください。この例では、Confluent CLI を使用して Confluent Cloud のリソースを管理する方法についても説明しています。

../../_images/topology.ja.png

構成プロパティ

このコネクターでは、以下のコネクター構成プロパティが使用されます。

データへの接続方法(How should we connect to your data?)

name

コネクターの名前を設定します。

  • 型: string
  • 指定可能な値: 最大 64 文字の文字列
  • 重要度: 高

Kafka クラスターの認証情報(Kafka Cluster credentials)

kafka.auth.mode

Kafka の認証モード。KAFKA_API_KEY または SERVICE_ACCOUNT を指定できます。デフォルトは KAFKA_API_KEY モードです。

  • 型: string
  • デフォルト: KAFKA_API_KEY
  • 指定可能な値: KAFKA_API_KEY、SERVICE_ACCOUNT
  • 重要度: 高
kafka.api.key
  • 型: password
  • 重要度: 高
kafka.service.account.id

Kafka クラスターとの通信用の API キーを生成するために使用されるサービスアカウント。

  • 型: string
  • 重要度: 高
kafka.api.secret
  • 型: password
  • 重要度: 高

データベースへの接続方法(How should we connect to your database?)

oracle.server

Oracle サーバーのホスト名またはアドレス。

  • 型: string
  • 指定可能な値: 正規表現 ^[^\?=%&\(\)]*$ に一致することが必要
  • 重要度: 高
oracle.port

Oracle に接続するために使用されるポート番号。

  • 型: int
  • デフォルト: 1521
  • 指定可能な値: [0,...,65535]
  • 重要度: 高
oracle.sid

Oracle システム ID(SID)。

  • 型: string
  • 指定可能な値: 正規表現 ^[a-zA-Z][a-zA-Z0-9$#_]*$ に一致することが必要
  • 重要度: 高
oracle.pdb.name

Oracle PDB 名。マルチテナント CDB/PDB アーキテクチャを使用する場合にのみ設定します。デフォルトでは設定されていません。これは、キャプチャー対象のテーブルが CDB ルートに存在することを意味します。

  • 型: string
  • Valid Values: Must match the regex ^([a-zA-Z][a-zA-Z0-9$#_]*)*$
  • 重要度: 高
oracle.service.name

Oracle サーバー名。設定されている場合、コネクターは常に提供されたサービス名を使用してデータベースに接続します。

  • 型: string
  • Valid Values: Must match the regex ^([a-zA-Z][a-zA-Z0-9$#_.]*)*$
  • 重要度: 低
oracle.username

Oracle データベースユーザーの名前。

  • 型: string
  • 指定可能な値: 正規表現 ^[^\?=%&\(\)]*$ に一致することが必要
  • 重要度: 高
oracle.password

Oracle データベースユーザーのパスワード。

  • 型: password
  • 重要度: 高
ssl.truststorefile

サーバー CA 証明書情報が含まれるトラストストア。SSL を使用してデータベースに接続する場合にのみ必要になります。

  • 型: password
  • デフォルト: [hidden]
  • 重要度: 低
ssl.truststorepassword

サーバー CA 証明書情報が含まれるトラストストアのパスワード。SSL を使用してデータベースに接続する場合にのみ必要になります。

  • 型: password
  • デフォルト: [hidden]
  • 重要度: 低
oracle.fan.events.enable

接続で Oracle RAC Fast Application Notification(FAN)イベントの使用を許可するかどうかを指定します。これはデフォルトでは無効になっており、FAN イベントがデータベースでサポートされていても使用されません。これを有効にするのは、FAN イベントで Oracle RAC をセットアップして使用する場合のみです。この機能を有効にすると、FAN イベントを使用するようにデータベースがセットアップされていない場合に、接続の問題が発生する可能性があります。

  • 型: boolean
  • デフォルト: false
  • 重要度: 低

データベースの詳細(Database details)

table.inclusion.regex

sid または pdb、および schema が含まれているテーブルの完全修飾名に一致する正規表現。たとえば、非コンテナーデータベースを使用する場合は、ORCLCDB[.]C##MYUSER[.](table1|orders|customers) または ORCLCDB.C##MYUSER.(table1|orders|customers) のようになります。マルチテナントデータベースを使用する場合は、ORCLPDB1[.]C##MYUSER[.](table1|orders|customers) または ORCLPDB1.C##MYUSER.(table1|orders|customers) のようになります。

  • 型: string
  • 重要度: 高
table.exclusion.regex

sid または pdb、および scheme が含まれているテーブルの完全修飾名に一致する正規表現。たとえば、非コンテナーデータベースを使用する場合は、ORCLCDB[.]C##MYUSER[.](table1|orders|customers) または ORCLCDB.C##MYUSER.(table1|orders|customers) のようになります。マルチテナントデータベースを使用する場合は、ORCLPDB1[.]C##MYUSER[.](table1|orders|customers) または ORCLPDB1.C##MYUSER.(table1|orders|customers) のようになります。空白の正規表現(デフォルト)は除外がないことを意味します。

  • 型: string
  • デフォルト: ""
  • 重要度: 高
oracle.supplemental.log.level

Database supplemental logging level for connector operation. If set to full, the connector validates the supplemental logging level on the database is FULL and then captures Snapshots and CDC events for the specified tables whenever table.topic.name.template is not set to "". When the level set to msl, then the connector only captures snapshots if table.topic.name.template is not set to "". Note that, this setting is irrelevant if the table.topic.name.template is set to "" as we only capture redo logs then. This setting defaults to full supplemental logging level mode.

  • 型: string
  • Default: full
  • Valid Values: full, msl
  • 重要度: 中
start.from

What the connector should do when it starts for the first time. The value is either the literal snapshot (default), the literal current, the literal force_current, an Oracle System Change Number (SCN), or a database timestamp in the form yyyy-MM-dd HH:mm:ss. The snapshot literal instructs the connector to snapshot captured tables the first time it is started, then continue processing redo log events from the point in time when the snapshot was taken. The current literal instructs the connector to start from the current Oracle SCN without snapshotting. The force_current literal is the same as current but it will ignore any previously stored offsets when the connector is restarted. This option should only be used to recover the connector when the SCN stored in offsets is no longer available in the Oracle archive logs.

  • 型: string
  • デフォルト: snapshot
  • 重要度: 中

コネクターの詳細(Connector details)

emit.tombstone.on.delete

true の場合、削除操作により null 値の tombstone レコードが出力されます。

  • 型: boolean
  • デフォルト: false
  • 重要度: 低
behavior.on.dictionary.mismatch

DDL ステートメントに起因するディクショナリの不一致が原因で、コネクターが列の値を解析できない場合に必要となる動作を指定します。こうした状況は、online ディクショナリモードが指定されているものの、DDL の変更が発生する前に記録された履歴データをコネクターがストリーミングしている場合に発生する可能性があります。デフォルトオプション fail では、コネクターのタスクは失敗します。log オプションは解析不能なレコードを記録し、問題のあるレコードをスキップします。コネクターのタスクは失敗しません。

  • 型: string
  • デフォルト: fail
  • 指定可能な値: fail、log
  • 重要度: 低
behavior.on.unparsable.statement

解析できなかった SQL ステートメントをコネクターが検出した場合に必要となる動作を指定します。デフォルトオプション fail では、コネクターのタスクは失敗します。log オプションは解析不能なステートメントを記録し、問題のあるレコードをスキップします。コネクターのタスクは失敗しません。

  • 型: string
  • デフォルト: fail
  • 指定可能な値: fail、log
  • 重要度: 低
db.timezone

タイムゾーン情報が利用できない場合に、Oracle の DATE 型や TIMESTAMP 型の解析で使用されるデフォルトタイムゾーンです。たとえば、db.timezone=UTC の場合、DATE および TIMESTAMP のデータはどちらも UTC タイムゾーンを想定して解析されます。値は、有効な java.util.TimeZone ID であることが必要です。

  • 型: string
  • デフォルト: UTC
  • 重要度: 低
db.timezone.date

タイムゾーン情報が利用できない場合に、Oracle の DATE 型の解析で使用されるデフォルトタイムゾーンです。このプロパティが設定されている場合、その値は DATE 型の db.timezone の値を上書きします。たとえば、db.timezone=UTC、db.timezone.date=America/Los_Angeles と設定されている場合、TIMESTAMP 型のデータは UTC タイムゾーンを想定して解析され、DATE 型のデータは America/Los_Angeles タイムゾーンを想定して解析されます。値は、有効な java.util.TimeZone ID であることが必要です。

  • 型: string
  • 重要度: 低
redo.log.startup.polling.limit.ms

The amount of time to wait for the redo log to be present on connector startup. This is only relevant when connector is configured to capture change events. The default is 5 minutes (or 300000 milliseconds). The maximum is 1 hour (or 3600000 milliseconds)

  • 型: long
  • デフォルト: 300000(5 分)
  • 指定可能な値: [0,…,3600000]
  • 重要度: 低

接続の詳細(Connection details)

query.timeout.ms

Oracle に送信される任意のクエリのタイムアウト(ミリ秒)。デフォルトは 5 分(300000 ミリ秒)です。負の値に設定した場合、コネクターはクエリにタイムアウトを適用しません。

  • 型: long
  • デフォルト: 300000(5 分)
  • 重要度: 中
max.batch.size

コネクターから Connect に返されるレコードの最大数。追加のレコードがない場合、コネクターはより少ないレコードを返す可能性もあります。

  • 型: int
  • デフォルト: 1000
  • 指定可能な値: [100,...,10000]
  • 重要度: 中
poll.linger.ms

レコードが空のバッチを返す前に待機する最大時間。デフォルトは 5 秒です。

  • 型: long
  • デフォルト: 5000(5 秒)
  • 指定可能な値: [0,…,300000]
  • 重要度: 中
max.buffer.size

すべてのスナップショットスレッドおよび redo ログからバッチへとバッファリングできる最大レコード数。デフォルトは 0 です。この場合、バッファサイズは最大バッチサイズとスレッド数から計算されます。

  • 型: int
  • デフォルト: 0
  • 指定可能な値: [0,...,10000]
  • 重要度: 低
redo.log.poll.interval.ms

データベース redo ログイベントを取得するためのポーリング間の間隔。連続マイニングが使用可能で有効になっている場合、この値は効果がありません。デフォルトは 1 秒(1000 ミリ秒)です。指定可能な最小値は 500 ミリ秒です。

  • 型: long
  • デフォルト: 500
  • 指定可能な値: [500,…,60000]
  • 重要度: 中
snapshot.row.fetch.size

スナップショットでテーブル行数をフェッチするときに JDBC ドライバにヒントとして提供される行数。値を 0 に設定すると、このヒントが無効になります。

  • 型: int
  • デフォルト: 2000
  • 指定可能な値: [0,...,10000]
  • 重要度: 中
redo.log.row.fetch.size

redo ログから行数をフェッチするときに JDBC ドライバにヒントとして提供される行数。値を 0 に設定すると、このヒントが無効になります。

  • 型: int
  • デフォルト: 5000
  • 指定可能な値: [0,...,10000]
  • 重要度: 中
redo.log.row.poll.fields.include

redo ログに含めるフィールドのコンマ区切りのリスト。

  • 型: list
  • デフォルト: ""
  • 重要度: 低
redo.log.row.poll.fields.exclude

redo ログから除外するフィールドのコンマ区切りのリスト。

  • 型: list
  • デフォルト: ""
  • 重要度: 低

出力メッセージ(Output messages)

table.topic.name.template

変更イベントが書き込まれる Kafka のトピックの名前を定義するテンプレート。値は、すべてのキャプチャー対象テーブルからすべてのレコードが 1 つのトピックに書き込まれる場合は定数にすることができます。または、サポートされているテンプレート変数(${databaseName}${schemaName}${tableName}${connectorName} など)を値に含めることができます。デフォルトは空の文字列です。この場合、コネクターは変更イベントレコードを生成しません。特別な意味を持つ文字である \${} は、テンプレート変数の一部として意図されていない場合、\ でエスケープする必要があります。redo ログトピックが指定されている場合にのみ空白にすることができます。その場合、コネクターは redo ログトピックにのみ書き込みます。

  • 型: string
  • デフォルト: ""
  • 重要度: 高
redo.log.corruption.topic

Oracle redo ログ内の破損に関する情報を記述し、欠落したデータを示すイベントをコネクターが記録する Kafka のトピックの名前。空白のトピック名(デフォルト)は、この情報が Kafka に書き込まれないことを指定します。

  • 型: string
  • デフォルト: ""
  • 重要度: 高
redo.log.topic.name

コネクターがすべての未加工の redo ログイベントを記録する Kafka のトピックの名前。

  • 型: string
  • デフォルト: ${connectorName}-${databaseName}-redo-log
  • 重要度: 高
key.template

各変更イベントの Kafka レコードキーを定義するテンプレート。デフォルトでは、キーにはプライマリキーフィールドを持つ STRUCT が含まれ、テーブルにプライマリキーまたは一意のキーがない場合は null になります。

  • 型: string
  • デフォルト: ${primaryKeyStructOrValue}
  • 重要度: 中
oracle.dictionary.mode

コネクターで使用されるディクショナリの処理モード。auto、online、redo_log のいずれか。auto の場合、コネクターでは、テーブルスキーマを進化させる DDL ステートメントが検出されるまで、オンラインカタログのディクショナリを使用します。DDL ステートメントが検出された時点で、コネクターでは、アーカイブ済み redo ログのディクショナリの使用を開始します。DDL ステートメントが処理されると、コネクターではオンラインカタログの使用に戻ります。DDL ステートメントが想定される場合は、このモードを使用します。online の場合、コネクターでは、常にオンラインディクショナリカタログを使用します。DDL ステートメントが想定されない場合は、このモードを使用します。redo_log の場合、コネクターでは、常にアーカイブ済み redo ログのディクショナリカタログを使用します。オンライン redo ログにアクセスできない場合は、このモードを使用してください。CDC イベントは、オンラインログからアーカイブされるまで遅延され、アーカイブ後にコネクターで処理されます。

  • 型: string
  • デフォルト: auto
  • 指定可能な値: auto、online、redo_log
  • 重要度: 低
output.table.name.field

Kafka に書き込まれた変更レコード内のフィールドの名前。これには、影響を受ける Oracle テーブルのスキーマ修飾名が含まれます(dbo.Customers など)。空白値は、このフィールドが変更レコードに含まれないことを示します。

  • 型: string
  • デフォルト: table
  • 重要度: 低
output.scn.field

Kafka に書き込まれた変更レコード内のフィールドの名前。これには、この変更が行われたときの Oracle システム変更番号(SCN)が含まれます。空白値は、このフィールドが変更レコードに含まれないことを示します。

  • 型: string
  • デフォルト: scn
  • 重要度: 低
output.op.type.field

Kafka に書き込まれた変更レコード内のフィールドの名前。これには、この変更イベントの操作タイプが含まれます。空白値は、このフィールドが変更レコードに含まれないことを示します。

  • 型: string
  • デフォルト: op_type
  • 重要度: 低
output.op.ts.field

Kafka に書き込まれた変更レコード内のフィールドの名前。これには、この変更イベントの操作タイムスタンプが含まれます。空白値は、このフィールドが変更レコードに含まれないことを示します。

  • 型: string
  • デフォルト: op_ts
  • 重要度: 低
output.current.ts.field

Kafka に書き込まれた変更レコード内のフィールドの名前。これには、この変更イベントが処理されたときのコネクターのタイムスタンプが含まれます。空白値は、このフィールドが変更レコードに含まれないことを示します。

  • 型: string
  • デフォルト: current_ts
  • 重要度: 低
output.row.id.field

Kafka に書き込まれた変更レコード内のフィールドの名前。これには、このイベントによって変更されたテーブルの行 ID が含まれます。空白値は、このフィールドが変更レコードに含まれないことを示します。

  • 型: string
  • デフォルト: row_id
  • 重要度: 低
output.username.field

Kafka に書き込まれた変更レコード内のフィールドの名前。これには、この変更の起因となったトランザクションを実行した Oracle ユーザーの名前が含まれます。空白値は、このフィールドが変更レコードに含まれないことを示します。

  • 型: string
  • デフォルト: username
  • 重要度: 低
output.redo.field

Kafka に書き込まれた変更レコード内のフィールドの名前。これには、この変更レコードが作成された元の redo DML ステートメントが含まれます。空白値は、このフィールドが変更レコードに含まれないことを示します。

  • 型: string
  • デフォルト: ""
  • 重要度: 低
output.undo.field

Kafka に書き込まれた変更レコード内のフィールドの名前。これには、この変更を有効に取り消し、行の "以前の" ステートを表す元の undo DML ステートメントが含まれます。空白値は、このフィールドが変更レコードに含まれないことを示します。

  • 型: string
  • デフォルト: ""
  • 重要度: 低
output.op.type.read.value

読み取り(スナップショット)変更イベントの操作タイプの値。デフォルトでは、"R" に設定されます。

  • 型: string
  • デフォルト: R
  • 重要度: 低
output.op.type.insert.value

挿入変更イベントの操作タイプの値。デフォルトでは、"I" に設定されます。

  • 型: string
  • デフォルト: I
  • 重要度: 低
output.op.type.update.value

アップデート変更イベントの操作タイプの値。デフォルトでは、"U" に設定されます。

  • 型: string
  • デフォルト: U
  • 重要度: 低
output.op.type.delete.value

削除変更イベントの操作タイプの値。デフォルトでは、"D" に設定されます。

  • 型: string
  • デフォルト: D
  • 重要度: 低
output.op.type.truncate.value

切り捨て変更イベントの操作タイプの値。デフォルトでは、"T" に設定されます。

  • 型: string
  • デフォルト: T
  • 重要度: 低
lob.topic.name.template

LOB オブジェクトが書き込まれる Kafka のトピックの名前を定義するテンプレート。値は、すべてのキャプチャー対象テーブルからすべての LOB オブジェクトが 1 つのトピックに書き込まれる場合は定数にすることができます。または、サポートされているテンプレート変数(${columnName}${databaseName}${schemaName}${tableName}${connectorName} など)を値に含めることができます。デフォルトは空で、キャプチャー対象テーブルに LOB タイプ列がある場合に、すべての LOB タイプ列を無視します。特別な意味を持つ文字である \${} は、テンプレート変数の一部として意図されていない場合、\ でエスケープする必要があります。

  • 型: string
  • デフォルト: ""
  • 重要度: 低
snapshot.by.table.partitions

テーブルがパーティションを使用するよう定義されている場合に、コネクターが各テーブルパーティションでスナップショットを実行する必要があるかどうかを指定します。これはデフォルトは false です。この場合、各テーブルの全体のスナップショットが 1 回実行されます。

  • 型: boolean
  • デフォルト: true
  • 重要度: 低
snapshot.threads.per.task

スナップショットを実行するために各タスクで使用できるスレッド数。これは、タスクに割り当てられているテーブル数より値が大きい場合のみ各タスクで使用されます。デフォルトは 4 です。

  • 型: int
  • デフォルト: 4
  • 指定可能な値: [1,...,10]
  • 重要度: 低
enable.large.lob.object.support

true の場合、コネクターは、複数の redo ログレコードに分割される、サイズの大きな LOB オブジェクトをサポートします。コネクターは、redo ログトピックにコミットメッセージを送信し、これらのコミットメッセージを使用して、サイズの大きな LOB オブジェクトを LOB トピックに送信できるタイミングを追跡します。

  • 型: boolean
  • デフォルト: false
  • 重要度: 低
numeric.mapping

NUMERIC 値を精度とスケール(オプション)に基づいて、プリミティブ型または小数型にマッピングします。すべての NUMERIC 列を Connect の DECIMAL 論理型で表す場合は、none を使用します。NUMERIC 列を、列の精度とスケールに基づいて、Connect のプリミティブ型にキャストする必要がある場合は、best_fit_or_decimal を使用します。精度とスケールがいずれかのプリミティブ型の境界を超える場合は、代わりに Connect の DECIMAL 論理型が使用されます。NUMERIC 列を、列の精度とスケールに基づいて、Connect のプリミティブ型にキャストする必要がある場合は、best_fit_or_double を使用します。精度とスケールがいずれかのプリミティブ型の境界を超える場合は、代わりに Connect の FLOAT64 型が使用されます。NUMERIC 列を、列の精度とスケールに基づいて、Connect のプリミティブ型にキャストする必要がある場合は、best_fit_or_string を使用します。精度とスケールがいずれかのプリミティブ型の境界を超える場合は、代わりに Connect の STRING 型が使用されます。列の精度のみに基づいて(列のスケールが 0 であることを前提として)NUMERIC 列をマップするには、precision_only を使用します。none オプションがデフォルトですが、Connect の DECIMAL 型はそのバイナリ表現にマップされるため、シリアル化の問題を招く可能性があります。多くの場合、best_fit_or オプションのいずれかが望ましい選択となります。後方互換性上の理由から、best_fit オプションも利用できます。その動作は、best_fit_or_decimal と同じです。

  • 型: string
  • デフォルト: none
  • 指定可能な値: best_fit、best_fit_or_decimal、best_fit_or_double、best_fit_or_string、none、precision_only
  • 重要度: 低
numeric.default.scale

スケールを決定できない場合に、数値型に対して使用するデフォルトのスケール。

  • 型: int
  • デフォルト: 127
  • 指定可能な値: [-127,...,127]
  • 重要度: 低
oracle.date.mapping

Oracle DATE 値を Connect の型にマッピングします。すべての DATE 列を Connect の Date 論理型で表す場合は、date を使用します。DATE 列を Connect のタイムスタンプにキャストする場合は、timestamp を使用します。後方互換性のため、date オプションがデフォルトとなっています。名前は似ていますが、Oracle DATE 型は Connect Date とはセマンティクスが異なります。セマンティクスの類似性から、多くの場合、timestamp が望ましい選択となります。

  • 型: string
  • デフォルト: timestamp
  • 指定可能な値: date、timestamp
  • 重要度: 低
output.data.key.format

Kafka 出力レコードキーのフォーマットを設定します。指定可能なエントリは、AVRO、JSON_SR、PROTOBUF、または JSON です。スキーマベースのメッセージフォーマット(AVRO、JSON_SR、PROTOBUF など)を使用する場合は、Confluent Cloud Schema Registry を構成しておく必要がある点に注意してください。

  • 型: string
  • デフォルト: JSON
  • 指定可能な値: AVRO、JSON、JSON_SR、PROTOBUF、STRING
  • 重要度: 高
output.data.value.format

Kafka 出力レコード値のフォーマットを設定します。指定可能なエントリは、AVRO、JSON_SR、PROTOBUF、または JSON です。スキーマベースのメッセージフォーマット(AVRO、JSON_SR、PROTOBUF など)を使用する場合は、Confluent Cloud Schema Registry を構成しておく必要がある点に注意してください。

  • 型: string
  • デフォルト: JSON
  • 指定可能な値: AVRO、JSON、JSON_SR、PROTOBUF、STRING
  • 重要度: 高

このコネクターのタスク数(Number of tasks for this connector)

tasks.max

このコネクターに使用するタスクの最大数。

  • 型: int
  • デフォルト: 2
  • 指定可能な値: [1,...,1000]
  • 重要度: 高

テンプレート変数

コネクターはテンプレート変数を使用して Kafka トピックの名前と各変更イベントのレコードキーを作成します。これらの変数は Oracle GoldenGate Kafka Connect テンプレート変数 に類似しており、Oracle GoldenGate からこのコネクターへのマイグレーションを簡素化します。変数はタスクレベルとテーブルレベルで解決されます。

コネクターとタスクの変数

変数のキーワード 説明
${connectorName} コネクターの名前に解決します。
${databaseName} データベース名に解決します。
${emptyString} 空の文字列に解決します。
${staticMap[]} 静的な値(キーは完全修飾テーブル名)に解決します。角かっこ内のキーと値は、${staticMap[dbo.table1=value1,dbo.table2=value2]} の形式で指定します。
${currentTimestamp} または ${currentTimestamp[]} 現在のタイムスタンプに解決します。現在のタイムスタンプのフォーマットは Java ベースの書式設定を使用して制御できます(SimpleDateFormat クラスのドキュメント を参照)。例: ${currentDate}${currentDate[yyyy-mm-dd hh:MM:ss.SSS]}
${connectorName} コネクターの名前に解決します。

テーブルの変数

変数のキーワード 説明
${schemaName} テーブルのスキーマ名に解決します。
${tableName} 短いテーブル名に解決します。
${fullyQualifiedTableName} 完全修飾テーブル名に解決します(スキーマ名とテーブル名の間のピリオド(.)を含む)。例: dbo.table1

列の変数

変数のキーワード 説明
${columnName} 列名に解決します。

レコードの変数

変数のキーワード 説明
${opType} 操作のタイプ(READ、INSERT、UPDATE、DELETE、TRUNCATE)に解決します。
${opTimestamp} redo ログの操作のタイムスタンプに解決します。
${rowId} 変更した行の ID に解決します。
${primaryKey} アンダースコア(_)文字で区切られた、連結されたプライマリキーの値に解決します。
${primaryKeyStruct} プライマリキー列のそれぞれの値のフィールドを持つ STRUCT に解決します。
${primaryKeyStructOrValue} 2 つ以上のプライマリキー列のそれぞれの値のフィールドを持つ STRUCT に解決されるか、または、プライマリキーが単一列を含む場合は列の値に解決されます。
${scn} 変更が行われた場合はシステム変更番号(SCN)に解決されます。
${cscn} 変更がコミットされた場合はシステム変更番号(SCN)に解決されます。
${rbaseq} 変更に関連付けられた redo レコードの Redo ブロックアドレス(RBA)に関連付けられたシーケンス番号に解決されます。
${rbablk} ログファイル内の RBA ブロック番号に解決されます。
${rbabyte} ブロック内の RBA バイトオフセットに解決されます。
${currentTimestamp} または ${currentTimestamp[]} 現在のタイムスタンプに解決します。現在のタイムスタンプのフォーマットは Java ベースの書式設定を使用して制御できます(SimpleDateFormat クラスのドキュメント を参照)。例: ${currentDate}${currentDate[yyyy-mm-dd hh:MM:ss.SSS]}

サポートされるデータ型

以下の表は、データ型および関連する Connect マッピングの一覧です。

Oracle データ型 SQL タイプのコード 接続マッピング
CHAR または CHARACTER 1 STRING
LONG 1 STRING
VARCHAR 12 STRING
VARCHAR 12 STRING
NCHAR -15 STRING
NVARCHAR2 -9 STRING
RAW -3 BYTES
LONG RAW -1 BYTES
INT または INTEGER 2 DECIMAL
SMALLINT 2 DECIMAL
DEC または DECIMAL 2 DECIMAL
NUMBER 2 DECIMAL
NUMERIC 2 DECIMAL
DOUBLE PRECISION 6 FLOAT64
FLOAT 6 FLOAT64
REAL 6 FLOAT64
TIMESTAMP WITH TIMEZONE -101 TIMESTAMP
TIMESTAMP WITH LOCAL TIME ZONE -102 TIMESTAMP
DATE 91 DATE
BLOB 2004 BYTES
CLOB 2005 BYTES
NCLOB 2011 BYTES
XMLTYPE 2009 BYTES

注釈

-101 コードと -102 コード(TIMESTAMP WITH TIMEZONE および TIMESTAMP WITH LOCAL TIMEZONE の場合)は Oracle 固有です。BLOB、CLOB、NCLOB、XMLTYPE は、個別の LOB トピックを使用して別途処理されます。

トラブルシューティングとヒント

以下に情報を示します。

19c 以前のデータベース

レコードをキャプチャーするテーブルでのアクティビティが少ない場合は、query.timeout.ms を小さくすることを検討してください。

Error while polling for records による機能低下

このコネクターが作成される際に、以下のエラーイベントによってコネクターに機能低下が発生することがあります。

{
  "datacontenttype": "application/json",
  "data": {
    "level": "ERROR",
    "context": {
      "connectorId": "lcc-12abc3"
    },
    "summary": {
      "connectorErrorSummary": {
        "message": "Error while polling for records",
        "rootCause": "Failed to subscribe to the redo log topic 'lcc-22rwg2-ORCL-redo-log' even after waiting PT5M. Verify that this redo log topic exists in the brokers at SASL_SSL://<address>, and that the redo log reading task is able to produce to that topic."
      }
    }
  },
  "subject": "lcc-12abc3-llcc-12abc3-1",
  "specversion": "1.0",
  "id": "a12345bc-6d78-91e0-fg11-c3ac4f20220b",
  "source": "crn://confluent.cloud/connector=lcc-12abc3",
  "time": "2022-03-28T18:25:29.631Z",
  "type": "io.confluent.logevents.connect.TaskFailed"
}

新たに挿入、アップデート、または削除が行われた場合に、このエラーメッセージによって自己修正することがあります。または、コネクターに対して redo.log.startup.polling.limit.ms を大きな値に設定できます。これにより、コネクターは redo ログトピックが作成されるまで待機します。

別の方法として、1 つのパーティションを持つ redo-log-topic を作成し、コネクターを作成するときに "redo.log.topic.name" : "redo-log-topic" で指定できます。