重要
このページの日本語コンテンツは古くなっている可能性があります。最新の英語版コンテンツをご覧になるには、こちらをクリックしてください。
Oracle CDC Source Connector for Confluent Cloud¶
注釈
If you are installing the connector locally for Confluent Platform, see Oracle CDC Source for Confluent Platform.
フルマネージド型の Oracle CDC Source Connector for Confluent Cloud は、各変更をデータベースの行に取り込み、それらの変更を Apache Kafka® トピックの変更イベントレコードとして表します。このコネクターは Oracle LogMiner を使用してデータベースの redo ログを読み取ります。コネクターには、LogMiner を使用する権限と、コネクターによって取り込まれたすべてのテーブルから選択する権限を持つデータベースユーザーが必要です。
コネクターは、単一のデータベース内のテーブルのサブセットを取り込むように構成することができます。これは、ユーザーがアクセスできるすべてのテーブルのうち、include 正規表現に一致するものとして定義されます。また、別の exclude 正規表現に一致するテーブルを取り込まないように構成することもできます。
コネクターは各テーブルの変更を Kafka トピックに書き込みます。これらのトピックでは、table.topic.name.template
コネクター 構成プロパティ により table-to-topic マッピングが指定されています。このプロパティのデフォルトは、テーブルのドット区切り完全修飾名です(database_name.schema_name.table_name
など)。
コネクターはリテラルといくつかの変数(${tableName}
、${schemaName}
など)を認識して table-to-topic マッピングをカスタマイズします。変数は実行時に解決されます。たとえば、次の構成プロパティにより ORCL.ADMIN.USERS
テーブルへの変更が my-prefix.ORCL.ADMIN.USERS
という名前の Kafka トピックに書き込まれます。
table.topic.name.template=my-prefix.${databaseName}.${schemaName}.${tableName}
テンプレート変数のリストについては、「テンプレート変数」を参照してください。
コネクターは未加工の Oracle redo ログレコードをすべて、"redo ログトピック" として論理参照される 1 つの Kafka トピックに書き込むように設計されています。redo.log.topic.name
構成プロパティはこのトピックの名前を決定します。コネクターは実際にこのトピックを消費して、テーブル固有のトピックに書き込まれるテーブル固有のすべてのイベントを識別し、生成します。コネクターは、table.topic.name.template
プロパティを空の文字列に設定することで、テーブル固有イベントをテーブル固有のトピックに生成することなく、redo ログトピックにのみ書き込むように構成できます。
制限¶
以下の情報を確認してください。
- コネクターの制限事項については、Oracle CDC Source Connector の制限事項を参照してください。
- Confluent Cloud Schema Registry を使用する場合は、「スキーマレジストリ Enabled Environments」を参照してください。
- 1 つ以上の Single Message Transforms(SMT)を使用する場合は、「SMT の制限」を参照してください。
クイックスタート¶
このクイックスタートを使用して、Confluent Cloud Oracle CDC Source Connector の利用を開始することができます。このクイックスタートでは、コネクターを選択してから、Oracle データベースの既存データのスナップショットを取得して、それ以降に発生する行レベルの変更をすべてモニタリングして記録するようにコネクターを構成するための基本的な方法について説明します。
重要
コネクターを構成する前に、Oracle データベースの前提条件 を参照して Oracle データベースの構成情報と構成後の検証手順を確認してください。
- 前提条件
- アマゾンウェブサービス (AWS)、Microsoft Azure (Azure)、または Google Cloud Platform (GCP)上の Confluent Cloud クラスターへのアクセスを許可されていること。
- Confluent CLI がインストールされ、クラスター用に構成されていること。「Confluent CLI のインストール」を参照してください。
- スキーマレジストリ ベースのフォーマット(Avro、JSON_SR(JSON スキーマ)、Protobuf など)を使用するには、Schema Registry を有効にしておく必要があります。詳細については、「スキーマレジストリ Enabled Environments」を参照してください。
- ネットワークに関する考慮事項については、「Networking and DNS Considerations」を参照してください。静的なエグレス IP を使用する方法については、「静的なエグレス IP アドレス」を参照してください。
- Kafka クラスターの認証情報。次のいずれかの方法で認証情報を指定できます。
- 既存の サービスアカウント のリソース ID を入力する。
- コネクター用の Confluent Cloud サービスアカウント を作成する。サービスアカウントのドキュメント で、必要な ACL エントリを確認してください。一部のコネクターには固有の ACL 要件があります。
- Confluent Cloud の API キーとシークレットを作成する。キーとシークレットを作成するには、confluent api-key create を使用するか、コネクターのセットアップ時に Cloud Console で直接 API キーとシークレットを自動生成します。
Confluent Cloud Console の使用¶
ステップ 1: Confluent Cloud クラスターを起動します。¶
インストール手順については、「Quick Start for Confluent Cloud」を参照してください。
ステップ 2: コネクターを追加します。¶
左のナビゲーションメニューの Data integration をクリックし、Connectors をクリックします。クラスター内に既にコネクターがある場合は、+ Add connector をクリックします。
ステップ 4: 接続をセットアップします。¶
以下を実行して、Continue をクリックします。
注釈
- すべての 前提条件 を満たしていることを確認してください。
- アスタリスク( * )は必須項目であることを示しています。
コネクターの 名前 を入力します。
Kafka Cluster credentials で Kafka クラスターの認証情報の指定方法を選択します。サービスアカウントのリソース ID を選択するか、API キーとシークレットを入力できます(または、Cloud Console でこれらを生成します)。
データベース接続の詳細情報 を追加します。
- Oracle server: Oracle サーバーのホスト名またはアドレス。
- Oracle port: Oracle に接続するために使用されるポート番号。デフォルトは
1521
です。 - Oracle SID: Oracle システム ID(SID)。
- Oracle PDB: Oracle PDB 名。マルチテナント CDB/PDB アーキテクチャを使用する場合にのみ設定します。デフォルトでは設定されていません。これは、キャプチャー対象のテーブルが CDB ルートに存在することを意味します。
- Oracle service: Oracle サービス名。設定されている場合、コネクターは常にこのサービス名を使用してデータベースに接続します。
- Oracle username: Oracle データベースユーザーの名前。
- Oracle password: Oracle データベースユーザーのパスワード。
- Trust store: サーバー CA 証明書情報を含むトラストストアファイル。SSL を使用してデータベースに接続する場合にのみ必要になります。
- Trust store password: トラストストアのトラストストアパスワード。SSL を使用してデータベースに接続する場合にのみ必要になります。
- Enable Oracle FAN events: 接続で Oracle RAC Fast Application Notification(FAN) イベントの使用を許可するかどうかを指定します。これはデフォルトでは無効になっており、FAN イベントがデータベースでサポートされていても使用されません。これを有効にするのは、FAN イベントで Oracle RAC をセットアップして使用する場合のみです。この機能を有効にすると、FAN イベントを使用するようにデータベースがセットアップされていない場合に、接続の問題が発生する可能性があります。
データベースの詳細情報 を追加します。
- Table inclusion regex: SID または PDB、および schema が含まれているテーブルの完全修飾名に一致する正規表現。たとえば、非コンテナーデータベースを使用する場合は、
ORCLCDB[.]C##MYUSER[.](table1|orders|customers)
またはORCLCDB.C##MYUSER.(table1|orders|customers)
のようになります。マルチテナントデータベースを使用する場合は、ORCLPDB1[.]C##MYUSER[.](table1|orders|customers)
またはORCLPDB1.C##MYUSER.(table1|orders|customers)
のようになります。 - Table exclusion regex: SID または PDB、および schema が含まれているテーブルの完全修飾名に一致する正規表現。たとえば、非コンテナーデータベースを使用する場合は、
ORCLCDB[.]C##MYUSER[.](table1|orders|customers)
またはORCLCDB.C##MYUSER.(table1|orders|customers)
のようになります。マルチテナントデータベースを使用する場合は、ORCLPDB1[.]C##MYUSER[.](table1|orders|customers)
またはORCLPDB1.C##MYUSER.(table1|orders|customers)
のようになります。空白の正規表現(デフォルト)は除外がないことを意味します。 - Start from: 初めて開始するときに、コネクターが実行すべきこと。値は、リテラル
snapshot
(デフォルト)、リテラルcurrent
、リテラルforce_current
、Oracle システム変更番号(SCN)、またはDD-MON-YYYY HH24:MI:SS
フォーマットのデータベースタイムスタンプです。snapshot
リテラルを指定すると、コネクターは初めて開始するときにキャプチャーしたテーブルのスナップショットを取得し、スナップショットが取得された時点から redo ログイベントの処理を続行します。current
リテラルを指定すると、コネクターはスナップショットを作成せずに現在の Oracle SCN から開始します。force_current
リテラルはcurrent
と同じですが、コネクターが再起動されると、以前に保管されたオフセットが無視されます。このオプションは、オフセットに保管されている SCN が Oracle アーカイブログで利用できなくなり、コネクターを回復する場合にのみ使用する必要があります。
- Table inclusion regex: SID または PDB、および schema が含まれているテーブルの完全修飾名に一致する正規表現。たとえば、非コンテナーデータベースを使用する場合は、
Connector details に情報を追加します(省略可)。
- Emit tombstone on delete:
true
に設定した場合、削除操作により null 値の tombstone レコードが出力されます。デフォルトはfalse
です。 - Behavior on dictionary mismatch: DDL ステートメントに起因するディクショナリの不一致が原因で、コネクターが列の値を解析できない場合の動作を指定します。こうした状況は、ディクショナリモード
online
が設定されているものの、DDL の変更が発生する前に記録された履歴データをコネクターがストリーミングしている場合に発生する可能性があります。このプロパティのデフォルト値はfail
(コネクターのタスクが失敗する)です。log
オプションは解析不能なレコードを記録し、問題のあるレコードをスキップします。コネクターのタスクは失敗しません。 - Behavior on unparsable statement: 解析できなかった SQL ステートメントをコネクターが検出した場合に必要とする動作を指定します。デフォルトオプション
fail
では、コネクターのタスクは失敗します。log
オプションは解析不能なステートメントを記録し、問題のあるレコードをスキップします。コネクターのタスクは失敗しません。 - Database timezone: タイムゾーン情報が利用できない場合、コネクターで Oracle の DATE 型や TIMESTAMP 型を解析するときにこのプロパティはデフォルトで UTC に設定されます。たとえば、
db.timezone=UTC
の場合、DATE および TIMESTAMP のデータはどちらも UTC タイムゾーンを使用して解析されます。この値は、有効な java.util.TimeZone ID であることが必要です。 - Database timezone for DATE type: タイムゾーン情報が利用できない場合、コネクターで Oracle の DATE 型を解析するときに使用されるデフォルトタイムゾーンです。このプロパティが設定されている場合、その値は DATE 型の
db.timezone
の値を上書きします。たとえば、db.timezone=UTC
およびdb.timezone.date=America/Los_Angeles
の場合、TIMESTAMP 型のデータは UTC タイムゾーンを使用して解析されます。DATE 型のデータは、America/Los_Angeles タイムゾーンを使用して解析されます。この値は、有効な java.util.TimeZone ID であることが必要です。 - Redo log startup polling limit (ms): The amount of time to wait for the redo log to be present on connector startup. This is only relevant when the connector is configured to capture change events. Defaults to five minutes (or
300000
milliseconds). The maximum is one hour (or3600000
milliseconds).
- Emit tombstone on delete:
Connection details に情報を追加します(省略可)。
- Query timeout (ms): Oracle に送信される任意のクエリのタイムアウト(ミリ秒)。デフォルトは 5 分(
300000
ミリ秒)です。負の値に設定した場合、コネクターはクエリにタイムアウトを適用しません。 - Maximum batch size: コネクターから Kafka に返されるレコードの最大数。追加のレコードがない場合、コネクターはより少ないレコードを返す可能性があります。
- Poll linger milliseconds: 空のバッチを返す前にレコードの到着を待機する最大時間。デフォルトは 5 秒(
5000
ミリ秒)です。 - Maximum buffer size: すべてのスナップショットスレッドおよび redo ログからバッチへとバッファリングできる最大レコード数。デフォルト値は
0
です。この場合、バッファサイズは最大バッチサイズとスレッド数から計算されます。 - Database poll interval (ms): データベース redo ログイベントを取得するためのポーリング間の間隔。連続マイニングが使用可能で有効になっている場合、この値は効果がありません。デフォルトは 1 秒(
1000
ミリ秒)です。指定可能な最小値は500
ミリ秒です。 - Snapshot row fetch size: スナップショットでテーブル行数をフェッチするときに JDBC ドライバにヒントとして提供される行数。デフォルトは
2000
行です。このヒントを無効にするには0
を使用します。 - Redo log fetch size: redo ログから行数をフェッチするときに JDBC ドライバにヒントとして提供される行数。デフォルトは
5000
行です。このヒントを無効にするには0
を使用します。 - Included fields: redo ログに含めるフィールドのコンマ区切りのリスト。
- Excluded fields: redo ログから除外するフィールドのコンマ区切りのリスト。
- Query timeout (ms): Oracle に送信される任意のクエリのタイムアウト(ミリ秒)。デフォルトは 5 分(
Output messages に詳細情報を追加します(省略可)。
Topic name template: コネクターによって変更イベントが書き込まれる Kafka トピックの名前を定義するテンプレート。この値は、コネクターがキャプチャーしたすべてのテーブルからすべてのレコードを 1 つのトピックに書き込む場合は、定数にすることができます。または、サポートされているテンプレート変数(
${databaseName}
、${schemaName}
、${tableName}
、${connectorName}
など)を値に含めることができます。デフォルトは空の文字列です。この場合、コネクターは変更イベントレコードを生成しません。特別な意味を持つ文字である\
、$
、{
、}
は、テンプレート変数の一部として意図されていない場合、\
でエスケープする必要があります。redo ログトピックが指定されている場合にのみ、このプロパティを空のままにすることができます。空のままにすると、コネクターは redo ログトピックにのみ書き込みます。Redo log corruption topic: Oracle redo ログ内の破損に関する情報を記述し、欠落したデータを示すイベントをコネクターが記録する Kafka のトピックの名前。デフォルトは空の文字列です。このプロパティを空のままにすると、コネクターはこの情報を Kafka に書き込みません。
Redo log topic: コネクターがすべての未加工の redo ログイベントを記録する Kafka のトピックの名前。空のままにすると、このプロパティは、デフォルトで
${connectorName}-${databaseName}-redo-log
に設定されます。たとえば、lcc-mycdcconnector-myoracledb-redo-log
のように設定されます。注釈
出力トピックの場合は、ACL を設定する必要があります。詳細については、Oracle CDC Source Connector の ACL を参照してください。
Key template: 各変更イベントの Kafka レコードキーを定義するテンプレート。デフォルトでは、キーにはプライマリキーフィールドを持つ STRUCT が含まれ、テーブルにプライマリキーまたは一意のキーがない場合は null になります。
Dictionary mode: コネクターで使用されるディクショナリの処理モード。
auto
、online
、redo_log
が指定できます。デフォルトはauto
です。auto
: コネクターでは、テーブルスキーマを進化させる DDL ステートメントが検出されるまで、オンラインカタログのディクショナリを使用します。DDL ステートメントが検出されると、コネクターでは、アーカイブ済み redo ログのディクショナリの使用を開始します。DDL ステートメントが処理されると、コネクターではonline
カタログの使用に戻ります。DDL ステートメントが想定される場合は、このモードを使用します。online
: コネクターでは、常にオンラインディクショナリカタログを使用します。DDL ステートメントが想定されない場合は、このモードを使用します。redo_log
: コネクターでは、常にアーカイブ済み redo ログのディクショナリカタログを使用します。オンライン redo ログにアクセスできない場合は、このモードを使用してください。CDC イベントは、オンラインログからアーカイブされるまで遅延され、コネクターで処理されません。
Output table name field: Kafka に書き込まれた変更レコード内のフィールドの名前。これには、影響を受ける Oracle テーブルのスキーマ修飾名が含まれます(
dbo.Customers
など)。空白値は、このフィールドが変更レコードに含まれないことを示します。Output SCN field: Kafka に書き込まれた変更レコード内のフィールドの名前。これには、この変更が行われたときの Oracle システム変更番号(SCN)が含まれます。これを空のままにすると、このフィールドは変更レコードに含まれません。
Output operation type field: Kafka に書き込まれた変更レコード内のフィールドの名前。これには、この変更イベントの操作タイプが含まれます。これを空のままにすると、このフィールドは変更レコードに含まれません。
Output operation timestamp field: Kafka に書き込まれた変更レコード内のフィールドの名前。これには、この変更イベントの操作タイムスタンプが含まれます。これを空のままにすると、このフィールドは変更レコードに含まれません。
Output current timestamp field: Kafka に書き込まれた変更レコード内のフィールドの名前。これには、変更イベントがいつ処理されたかを示すコネクターのタイムスタンプが含まれます。これを空のままにすると、このフィールドは変更レコードに含まれません。
Output row ID field: Kafka に書き込まれた変更レコード内のフィールドの名前。これには、このイベントによって変更されたテーブルの行 ID が含まれます。これを空のままにすると、このフィールドは変更レコードに含まれません。
Output username field: Kafka に書き込まれた変更レコード内のフィールドの名前。これには、この変更の起因となったトランザクションを実行した Oracle ユーザーの名前が含まれます。これを空のままにすると、このフィールドは変更レコードに含まれません。
Output redo field: Kafka に書き込まれた変更レコード内のフィールドの名前。これには、この変更レコードが作成された元の redo DML ステートメントが含まれます。これを空のままにすると、このフィールドは変更レコードに含まれません。
Output undo field: Kafka に書き込まれた変更レコード内のフィールドの名前。これには、この変更を有効に取り消し、行の "以前の" ステートを表す元の undo DML ステートメントが含まれます。これを空のままにすると、このフィールドは変更レコードに含まれません。
Output operation type read value: 読み取り(スナップショット)変更イベントの操作タイプの値。デフォルトでは、このプロパティ値は
R
に設定されています。Output operation type insert value: 挿入変更イベントの操作タイプの値。デフォルトでは、このプロパティ値は
I
に設定されています。Output operation type update value: アップデート変更イベントの操作タイプの値。デフォルトでは、このプロパティ値は
U
に設定されています。Output operation type delete value: 削除変更イベントの操作タイプの値。デフォルトでは、このプロパティ値は
D
に設定されています。Output operation type truncate value: 切り捨て変更イベントの操作タイプの値。デフォルトでは、このプロパティ値は
T
に設定されています。LOB topic template: コネクターによって LOB オブジェクトが書き込まれる Kafka トピックの名前を定義するテンプレート。値は、すべてのキャプチャー対象テーブルからすべての LOB オブジェクトが 1 つのトピックに書き込まれる場合は定数にすることができます。または、サポートされているテンプレート変数(
${columnName}
、${databaseName}
、${schemaName}
、${tableName}
、${connectorName}
など)を値に含めることができます。デフォルトは空で、キャプチャー対象テーブルに LOB タイプ列がある場合に、すべての LOB タイプ列を無視します。特別な意味を持つ文字である\
、$
、{
、}
は、テンプレート変数の一部として意図されていない場合、\
でエスケープする必要があります。Snapshot by partitions: テーブルがパーティションを使用するよう定義されている場合に、コネクターが各テーブルパーティションでスナップショットを実行する必要があるかどうかを指定します。デフォルトでは
false
に設定されます。この場合、各テーブルの全体のスナップショットが 1 回実行されます。Snapshot threads per task: スナップショットを実行するために各タスクで使用できるスレッド数。タスクに割り当てられているテーブル数よりこの値が大きい場合にのみ、各タスクで使用されます。デフォルトは
4
です。Enable large LOB object support: true の場合、コネクターは、複数の redo ログレコードに分割される、サイズの大きな LOB オブジェクトをサポートします。コネクターは、redo ログトピックにコミットメッセージを送信し、これらのコミットメッセージを使用して、サイズの大きな LOB オブジェクトを LOB トピックに送信できるタイミングを追跡します。
Map numeric values...: NUMERIC 値を精度とスケール(省略可)に基づいて、プリミティブ型または小数型にマップします。
none
オプションがデフォルトですが、Connect の DECIMAL 型はそのバイナリ表現にマップされるため、シリアル化の問題につながる可能性があります。一般的には、best_fit_or...
オプションのいずれかが望ましい選択となります。以下のオプションを使用できます。- すべての NUMERIC 列を Connect の DECIMAL 論理型で表す場合は、
none
を使用します。 - NUMERIC 列を、列の精度とスケールに基づいて、Connect のプリミティブ型にキャストする必要がある場合は、
best_fit_or_decimal
を使用します。精度とスケールがいずれかのプリミティブ型の境界を超える場合は、代わりに Connect の DECIMAL 論理型が使用されます。 - NUMERIC 列を、列の精度とスケールに基づいて、Connect のプリミティブ型にキャストする必要がある場合は、
best_fit_or_double
を使用します。精度とスケールがいずれかのプリミティブ型の境界を超える場合は、代わりに Connect の FLOAT64 型が使用されます。 - NUMERIC 列を、列の精度とスケールに基づいて、Connect のプリミティブ型にキャストする必要がある場合は、
best_fit_or_string
を使用します。精度とスケールがいずれかのプリミティブ型の境界を超える場合は、代わりに Connect の STRING 型が使用されます。 - 列の精度にのみ基づいて(列のスケールは 0 と仮定して)NUMERIC 列をマップするには、
precision_only
を使用します。 - 後方互換性上の理由から、
best_fit
オプションも利用できます。その動作は、best_fit_or_decimal
と同じです。
- すべての NUMERIC 列を Connect の DECIMAL 論理型で表す場合は、
Numeric default scale: スケールを決定できない場合に、数値型に対して使用するデフォルトのスケール。
Map Oracle DATE type to Connect data type: Oracle DATE 値を Connect の型にマップします。
Output Kafka record key format: Kafka 出力レコードキーのフォーマットを設定します。指定可能なエントリは、AVRO、JSON_SR、PROTOBUF、または JSON です。スキーマベースのメッセージフォーマット(AVRO、JSON_SR、PROTOBUF など)を使用する場合は、Confluent Cloud Schema Registry を構成しておく必要がある点に注意してください。
Output Kafka record value format: Kafka 出力レコード値のフォーマットを設定します。指定可能なエントリは、AVRO、JSON_SR、PROTOBUF、または JSON です。スキーマベースのメッセージフォーマット(AVRO、JSON_SR、PROTOBUF など)を使用する場合は、Confluent Cloud Schema Registry を構成しておく必要がある点に注意してください。
このコネクターで使用する タスク の数を入力します。タスクが多いほどパフォーマンスが向上する可能性があります。デフォルトのタスク数は
2
です。Transforms and Predicates: 詳細については、Single Message Transforms(SMT) のドキュメントを参照してください。SMT のいくつかの例については、「SMT の例」を参照してください。
すべてのプロパティと定義については、「構成プロパティ」を参照してください。
ステップ 5: コネクターを起動します。¶
実行中の構成をプレビューして、接続の詳細情報を確認します。プロパティの構成に問題がないことが確認できたら、Launch をクリックします。
ちなみに
コネクターの出力のプレビューについては、「コネクターのデータプレビュー」を参照してください。
ステップ 6: コネクターのステータスを確認します。¶
コネクターのステータスが Provisioning から Running に変わります。ステータスが変わるまで数分かかる場合があります。
ステップ 7: Kafka トピックを確認します。¶
コネクターが実行中になったら、レコードが Kafka トピックに取り込まれていることを確認します。
Confluent Cloud API for Connect の詳細と使用例については、「Confluent Cloud API for Connect」セクションを参照してください。
参考
フルマネージド型の Confluent Cloud コネクターが Confluent Cloud ksqlDB でどのように動作するかを示す例については、「Cloud ETL のデモ」を参照してください。この例では、Confluent CLI を使用して Confluent Cloud のリソースを管理する方法についても説明しています。
Confluent CLI の使用¶
以下の手順に従うと、Confluent CLI を使用してコネクターをセットアップし、実行できます。
注釈
- すべての 前提条件 を満たしていることを確認してください。
- コマンド例では Confluent CLI バージョン 2 を使用しています。詳細については、「Confluent CLI v2 への移行」を参照してください。
ステップ 2: コネクターの必須の構成プロパティを表示します。¶
以下のコマンドを実行して、コネクターの必須プロパティを表示します。
confluent connect plugin describe <connector-catalog-name>
例:
confluent connect plugin describe OracleCdcSource
出力例:
The following are required configs:
connector.class : OracleCdcSource
name
kafka.api.key : ["kafka.api.key" is required when "kafka.auth.mode==KAFKA_API_KEY"]
kafka.api.secret : ["kafka.api.secret" is required when "kafka.auth.mode==KAFKA_API_KEY"]
oracle.server
oracle.sid
oracle.username
table.inclusion.regex
ステップ 3: コネクターの構成ファイルを作成します。¶
コネクター構成プロパティを含む JSON ファイルを作成します。以下の例は、コネクターの必須プロパティを示しています。
{
"name": "OracleCdcSourceConnector_0",
"config": {
"connector.class": "OracleCdcSource",
"name": "OracleCdcSourceConnector_0",
"kafka.auth.mode": "KAFKA_API_KEY",
"kafka.api.key": "****************",
"kafka.api.secret": "**************************************************",
"oracle.server": "database-5.abcdefg12345.us-west-2.rds.amazonaws.com",
"oracle.port": "1521",
"oracle.sid": "ORCL",
"oracle.username": "admin",
"oracle.password": "**********",
"table.inclusion.regex": "ORCL[.]ADMIN[.]CUSTOMERS",
"start.from": "SNAPSHOT",
"query.timeout.ms": "60000",
"redo.log.row.fetch.size": "1",
"table.topic.name.template": "${databaseName}.${schemaName}.${tableName}",
"lob.topic.name.template":"${databaseName}.${schemaName}.${tableName}.${columnName}",
"numeric.mapping": "BEST_FIT_OR_DOUBLE",
"output.data.key.format": "JSON_SR",
"output.data.value.format": "JSON_SR",
"tasks.max": "2"
}
}
以下のプロパティ定義に注意してください。
"name"
: 新しいコネクターの名前を設定します。"connector.class"
: コネクターのプラグイン名を指定します。
"kafka.auth.mode"
: 使用するコネクターの認証モードを指定します。オプションはSERVICE_ACCOUNT
またはKAFKA_API_KEY
(デフォルト)です。API キーとシークレットを使用するには、構成プロパティkafka.api.key
とkafka.api.secret
を構成例(前述)のように指定します。サービスアカウント を使用するには、プロパティkafka.service.account.id=<service-account-resource-ID>
に リソース ID を指定します。使用できるサービスアカウントのリソース ID のリストを表示するには、次のコマンドを使用します。confluent iam service-account list
例:
confluent iam service-account list Id | Resource ID | Name | Description +---------+-------------+-------------------+------------------- 123456 | sa-l1r23m | sa-1 | Service account 1 789101 | sa-l4d56p | sa-2 | Service account 2
"table.inclusion.regex"
: SID または PDB、および schema が含まれているテーブルの完全修飾名に一致する正規表現。たとえば、上記の構成で示した例に加えて、非コンテナーデータベースを使用する場合は、ORCLCDB[.]C##MYUSER[.](table1|orders|customers)
またはORCLCDB.C##MYUSER.(table1|orders|customers)
のようになります。マルチテナントデータベースを使用する場合は、ORCLPDB1[.]C##MYUSER[.](table1|orders|customers)
またはORCLPDB1.C##MYUSER.(table1|orders|customers)
のようになります。"start.from"
: 初めて開始するときに、コネクターが実行すべきこと。値は、リテラルsnapshot
(デフォルト)、リテラルcurrent
、リテラルforce_current
、Oracle システム変更番号(SCN)、またはDD-MON-YYYY HH24:MI:SS
フォーマットのデータベースタイムスタンプです。snapshot
リテラルを指定すると、コネクターは初めて開始するときにキャプチャーしたテーブルのスナップショットを取得し、スナップショットが取得された時点から redo ログイベントの処理を続行します。current
リテラルを指定すると、コネクターはスナップショットを作成せずに現在の Oracle SCN から開始します。force_current
リテラルはcurrent
と同じですが、コネクターが再起動されると、以前に保管されたオフセットが無視されます。このオプションは、オフセットに保管されている SCN が Oracle アーカイブログで利用できなくなり、コネクターを回復する場合にのみ使用する必要があります。"query.timeout.ms"
: Oracle に送信される任意のクエリのタイムアウト(ミリ秒)。デフォルトは 5 分(300000
ミリ秒)です。負の値に設定した場合、コネクターはクエリにタイムアウトを適用しません。"redo.log.row.fetch.size"
: redo ログから行数をフェッチするときに JDBC ドライバにヒントとして提供される行数。デフォルトは5000
行です。このヒントを無効にするには0
を使用します。"table.topic.name.template"
: コネクターによって変更イベントが書き込まれる Kafka トピックの名前を定義するテンプレート。この値は、コネクターがキャプチャーしたすべてのテーブルからすべてのレコードを 1 つのトピックに書き込む場合は、定数にすることができます。または、サポートされているテンプレート変数(${databaseName}
、${schemaName}
、${tableName}
、${connectorName}
など)を値に含めることができます。デフォルトは空の文字列です。この場合、コネクターは変更イベントレコードを生成しません。特別な意味を持つ文字である\
、$
、{
、}
は、テンプレート変数の一部として意図されていない場合、\
でエスケープする必要があります。redo ログトピックが指定されている場合にのみ、このプロパティを空のままにすることができます。空のままにすると、コネクターは redo ログトピックにのみ書き込みます。"numeric.mapping"
: NUMERIC 値を精度とスケール(省略可)に基づいて、プリミティブ型または小数型にマッピングします。none
オプションがデフォルトですが、Connect の DECIMAL 型はそのバイナリ表現にマップされるため、シリアル化の問題につながる可能性があります。一般的には、best_fit_or...
オプションのいずれかが望ましい選択となります。以下のオプションを使用できます。- すべての NUMERIC 列を Connect の DECIMAL 論理型で表す場合は、
none
を使用します。 - NUMERIC 列を、列の精度とスケールに基づいて、Connect のプリミティブ型にキャストする必要がある場合は、
best_fit_or_decimal
を使用します。精度とスケールがいずれかのプリミティブ型の境界を超える場合は、代わりに Connect の DECIMAL 論理型が使用されます。 - NUMERIC 列を、列の精度とスケールに基づいて、Connect のプリミティブ型にキャストする必要がある場合は、
best_fit_or_double
を使用します。精度とスケールがいずれかのプリミティブ型の境界を超える場合は、代わりに Connect の FLOAT64 型が使用されます。 - NUMERIC 列を、列の精度とスケールに基づいて、Connect のプリミティブ型にキャストする必要がある場合は、
best_fit_or_string
を使用します。精度とスケールがいずれかのプリミティブ型の境界を超える場合は、代わりに Connect の STRING 型が使用されます。 - 列の精度にのみ基づいて(列のスケールは 0 と仮定して)NUMERIC 列をマップするには、
precision_only
を使用します。 - 後方互換性上の理由から、
best_fit
オプションも利用できます。その動作は、best_fit_or_decimal
と同じです。
- すべての NUMERIC 列を Connect の DECIMAL 論理型で表す場合は、
"output.data.key.format"
: Kafka 出力レコードキーのフォーマットを設定します。指定可能なエントリは、AVRO、JSON_SR、PROTOBUF、または JSON です。スキーマベースのメッセージフォーマット(AVRO、JSON_SR、PROTOBUF など)を使用する場合は、Confluent Cloud Schema Registry を構成しておく必要がある点に注意してください。"output.data.value.format"
: Kafka 出力レコード値のフォーマットを設定します。指定可能なエントリは、AVRO、JSON_SR、PROTOBUF、または JSON です。スキーマベースのメッセージフォーマット(AVRO、JSON_SR、PROTOBUF など)を使用する場合は、Confluent Cloud Schema Registry を構成しておく必要がある点に注意してください。"tasks.max"
: このコネクターで使用する タスク の最大数を設定します。タスクが多いほどパフォーマンスが向上する可能性があります。デフォルトのタスク数は2
です。
Single Message Transforms: SMT 追加の詳細については、Single Message Transforms(SMT) のドキュメントを参照してください。SMT のいくつかの例については、「SMT の例」を参照してください。
すべてのプロパティと定義については、「構成プロパティ」を参照してください。
ステップ 4: プロパティファイルを読み込み、コネクターを作成します。¶
以下のコマンドを入力して、構成を読み込み、コネクターを起動します。
confluent connect create --config <file-name>.json
例:
confluent connect create --config oracle-cdc-source.json
出力例:
Created connector OracleCdcSource_0 lcc-ix4dl
ステップ 5: コネクターのステータスを確認します。¶
以下のコマンドを入力して、コネクターのステータスを確認します。
confluent connect list
出力例:
ID | Name | Status | Type
+-----------+-----------------------+---------+-------+
lcc-ix4dl | OracleCdcSource_0 | RUNNING | source
ステップ 6: Kafka トピックを確認します。¶
コネクターが実行中になったら、メッセージが Kafka トピックに取り込まれていることを確認します。
Confluent Cloud API for Connect の詳細と使用例については、「Confluent Cloud API for Connect」セクションを参照してください。
次のステップ¶
参考
フルマネージド型の Confluent Cloud コネクターが Confluent Cloud ksqlDB でどのように動作するかを示す例については、「Cloud ETL のデモ」を参照してください。この例では、Confluent CLI を使用して Confluent Cloud のリソースを管理する方法についても説明しています。
構成プロパティ¶
このコネクターでは、以下のコネクター構成プロパティが使用されます。
データへの接続方法(How should we connect to your data?)¶
name
コネクターの名前を設定します。
- 型: string
- 指定可能な値: 最大 64 文字の文字列
- 重要度: 高
Kafka クラスターの認証情報(Kafka Cluster credentials)¶
kafka.auth.mode
Kafka の認証モード。KAFKA_API_KEY または SERVICE_ACCOUNT を指定できます。デフォルトは KAFKA_API_KEY モードです。
- 型: string
- デフォルト: KAFKA_API_KEY
- 指定可能な値: KAFKA_API_KEY、SERVICE_ACCOUNT
- 重要度: 高
kafka.api.key
- 型: password
- 重要度: 高
kafka.service.account.id
Kafka クラスターとの通信用の API キーを生成するために使用されるサービスアカウント。
- 型: string
- 重要度: 高
kafka.api.secret
- 型: password
- 重要度: 高
データベースへの接続方法(How should we connect to your database?)¶
oracle.server
Oracle サーバーのホスト名またはアドレス。
- 型: string
- 指定可能な値: 正規表現
^[^\?=%&\(\)]*$
に一致することが必要 - 重要度: 高
oracle.port
Oracle に接続するために使用されるポート番号。
- 型: int
- デフォルト: 1521
- 指定可能な値: [0,...,65535]
- 重要度: 高
oracle.sid
Oracle システム ID(SID)。
- 型: string
- 指定可能な値: 正規表現
^[a-zA-Z][a-zA-Z0-9$#_]*$
に一致することが必要 - 重要度: 高
oracle.pdb.name
Oracle PDB 名。マルチテナント CDB/PDB アーキテクチャを使用する場合にのみ設定します。デフォルトでは設定されていません。これは、キャプチャー対象のテーブルが CDB ルートに存在することを意味します。
- 型: string
- Valid Values: Must match the regex
^([a-zA-Z][a-zA-Z0-9$#_]*)*$
- 重要度: 高
oracle.service.name
Oracle サーバー名。設定されている場合、コネクターは常に提供されたサービス名を使用してデータベースに接続します。
- 型: string
- Valid Values: Must match the regex
^([a-zA-Z][a-zA-Z0-9$#_.]*)*$
- 重要度: 低
oracle.username
Oracle データベースユーザーの名前。
- 型: string
- 指定可能な値: 正規表現
^[^\?=%&\(\)]*$
に一致することが必要 - 重要度: 高
oracle.password
Oracle データベースユーザーのパスワード。
- 型: password
- 重要度: 高
ssl.truststorefile
サーバー CA 証明書情報が含まれるトラストストア。SSL を使用してデータベースに接続する場合にのみ必要になります。
- 型: password
- デフォルト: [hidden]
- 重要度: 低
ssl.truststorepassword
サーバー CA 証明書情報が含まれるトラストストアのパスワード。SSL を使用してデータベースに接続する場合にのみ必要になります。
- 型: password
- デフォルト: [hidden]
- 重要度: 低
oracle.fan.events.enable
接続で Oracle RAC Fast Application Notification(FAN)イベントの使用を許可するかどうかを指定します。これはデフォルトでは無効になっており、FAN イベントがデータベースでサポートされていても使用されません。これを有効にするのは、FAN イベントで Oracle RAC をセットアップして使用する場合のみです。この機能を有効にすると、FAN イベントを使用するようにデータベースがセットアップされていない場合に、接続の問題が発生する可能性があります。
- 型: boolean
- デフォルト: false
- 重要度: 低
データベースの詳細(Database details)¶
table.inclusion.regex
sid または pdb、および schema が含まれているテーブルの完全修飾名に一致する正規表現。たとえば、非コンテナーデータベースを使用する場合は、ORCLCDB[.]C##MYUSER[.](table1|orders|customers) または ORCLCDB.C##MYUSER.(table1|orders|customers) のようになります。マルチテナントデータベースを使用する場合は、ORCLPDB1[.]C##MYUSER[.](table1|orders|customers) または ORCLPDB1.C##MYUSER.(table1|orders|customers) のようになります。
- 型: string
- 重要度: 高
table.exclusion.regex
sid または pdb、および scheme が含まれているテーブルの完全修飾名に一致する正規表現。たとえば、非コンテナーデータベースを使用する場合は、ORCLCDB[.]C##MYUSER[.](table1|orders|customers) または ORCLCDB.C##MYUSER.(table1|orders|customers) のようになります。マルチテナントデータベースを使用する場合は、ORCLPDB1[.]C##MYUSER[.](table1|orders|customers) または ORCLPDB1.C##MYUSER.(table1|orders|customers) のようになります。空白の正規表現(デフォルト)は除外がないことを意味します。
- 型: string
- デフォルト: ""
- 重要度: 高
oracle.supplemental.log.level
Database supplemental logging level for connector operation. If set to
full
, the connector validates the supplemental logging level on the database is FULL and then captures Snapshots and CDC events for the specified tables whenevertable.topic.name.template
is not set to""
. When the level set tomsl
, then the connector only captures snapshots iftable.topic.name.template
is not set to""
. Note that, this setting is irrelevant if thetable.topic.name.template
is set to""
as we only capture redo logs then. This setting defaults tofull
supplemental logging level mode.- 型: string
- Default: full
- Valid Values: full, msl
- 重要度: 中
start.from
What the connector should do when it starts for the first time. The value is either the literal
snapshot
(default), the literalcurrent
, the literalforce_current
, an Oracle System Change Number (SCN), or a database timestamp in the formyyyy-MM-dd HH:mm:ss
. Thesnapshot
literal instructs the connector to snapshot captured tables the first time it is started, then continue processing redo log events from the point in time when the snapshot was taken. Thecurrent
literal instructs the connector to start from the current Oracle SCN without snapshotting. Theforce_current
literal is the same ascurrent
but it will ignore any previously stored offsets when the connector is restarted. This option should only be used to recover the connector when the SCN stored in offsets is no longer available in the Oracle archive logs.- 型: string
- デフォルト: snapshot
- 重要度: 中
コネクターの詳細(Connector details)¶
emit.tombstone.on.delete
true の場合、削除操作により null 値の tombstone レコードが出力されます。
- 型: boolean
- デフォルト: false
- 重要度: 低
behavior.on.dictionary.mismatch
DDL ステートメントに起因するディクショナリの不一致が原因で、コネクターが列の値を解析できない場合に必要となる動作を指定します。こうした状況は、
online
ディクショナリモードが指定されているものの、DDL の変更が発生する前に記録された履歴データをコネクターがストリーミングしている場合に発生する可能性があります。デフォルトオプションfail
では、コネクターのタスクは失敗します。log
オプションは解析不能なレコードを記録し、問題のあるレコードをスキップします。コネクターのタスクは失敗しません。- 型: string
- デフォルト: fail
- 指定可能な値: fail、log
- 重要度: 低
behavior.on.unparsable.statement
解析できなかった SQL ステートメントをコネクターが検出した場合に必要となる動作を指定します。デフォルトオプション
fail
では、コネクターのタスクは失敗します。log
オプションは解析不能なステートメントを記録し、問題のあるレコードをスキップします。コネクターのタスクは失敗しません。- 型: string
- デフォルト: fail
- 指定可能な値: fail、log
- 重要度: 低
db.timezone
タイムゾーン情報が利用できない場合に、Oracle の DATE 型や TIMESTAMP 型の解析で使用されるデフォルトタイムゾーンです。たとえば、db.timezone=UTC の場合、DATE および TIMESTAMP のデータはどちらも UTC タイムゾーンを想定して解析されます。値は、有効な java.util.TimeZone ID であることが必要です。
- 型: string
- デフォルト: UTC
- 重要度: 低
db.timezone.date
タイムゾーン情報が利用できない場合に、Oracle の DATE 型の解析で使用されるデフォルトタイムゾーンです。このプロパティが設定されている場合、その値は DATE 型の db.timezone の値を上書きします。たとえば、db.timezone=UTC、db.timezone.date=America/Los_Angeles と設定されている場合、TIMESTAMP 型のデータは UTC タイムゾーンを想定して解析され、DATE 型のデータは America/Los_Angeles タイムゾーンを想定して解析されます。値は、有効な java.util.TimeZone ID であることが必要です。
- 型: string
- 重要度: 低
redo.log.startup.polling.limit.ms
The amount of time to wait for the redo log to be present on connector startup. This is only relevant when connector is configured to capture change events. The default is 5 minutes (or 300000 milliseconds). The maximum is 1 hour (or 3600000 milliseconds)
- 型: long
- デフォルト: 300000(5 分)
- 指定可能な値: [0,…,3600000]
- 重要度: 低
接続の詳細(Connection details)¶
query.timeout.ms
Oracle に送信される任意のクエリのタイムアウト(ミリ秒)。デフォルトは 5 分(300000 ミリ秒)です。負の値に設定した場合、コネクターはクエリにタイムアウトを適用しません。
- 型: long
- デフォルト: 300000(5 分)
- 重要度: 中
max.batch.size
コネクターから Connect に返されるレコードの最大数。追加のレコードがない場合、コネクターはより少ないレコードを返す可能性もあります。
- 型: int
- デフォルト: 1000
- 指定可能な値: [100,...,10000]
- 重要度: 中
poll.linger.ms
レコードが空のバッチを返す前に待機する最大時間。デフォルトは 5 秒です。
- 型: long
- デフォルト: 5000(5 秒)
- 指定可能な値: [0,…,300000]
- 重要度: 中
max.buffer.size
すべてのスナップショットスレッドおよび redo ログからバッチへとバッファリングできる最大レコード数。デフォルトは 0 です。この場合、バッファサイズは最大バッチサイズとスレッド数から計算されます。
- 型: int
- デフォルト: 0
- 指定可能な値: [0,...,10000]
- 重要度: 低
redo.log.poll.interval.ms
データベース redo ログイベントを取得するためのポーリング間の間隔。連続マイニングが使用可能で有効になっている場合、この値は効果がありません。デフォルトは 1 秒(1000 ミリ秒)です。指定可能な最小値は 500 ミリ秒です。
- 型: long
- デフォルト: 500
- 指定可能な値: [500,…,60000]
- 重要度: 中
snapshot.row.fetch.size
スナップショットでテーブル行数をフェッチするときに JDBC ドライバにヒントとして提供される行数。値を 0 に設定すると、このヒントが無効になります。
- 型: int
- デフォルト: 2000
- 指定可能な値: [0,...,10000]
- 重要度: 中
redo.log.row.fetch.size
redo ログから行数をフェッチするときに JDBC ドライバにヒントとして提供される行数。値を 0 に設定すると、このヒントが無効になります。
- 型: int
- デフォルト: 5000
- 指定可能な値: [0,...,10000]
- 重要度: 中
redo.log.row.poll.fields.include
redo ログに含めるフィールドのコンマ区切りのリスト。
- 型: list
- デフォルト: ""
- 重要度: 低
redo.log.row.poll.fields.exclude
redo ログから除外するフィールドのコンマ区切りのリスト。
- 型: list
- デフォルト: ""
- 重要度: 低
出力メッセージ(Output messages)¶
table.topic.name.template
変更イベントが書き込まれる Kafka のトピックの名前を定義するテンプレート。値は、すべてのキャプチャー対象テーブルからすべてのレコードが 1 つのトピックに書き込まれる場合は定数にすることができます。または、サポートされているテンプレート変数(
${databaseName}
、${schemaName}
、${tableName}
、${connectorName}
など)を値に含めることができます。デフォルトは空の文字列です。この場合、コネクターは変更イベントレコードを生成しません。特別な意味を持つ文字である\
、$
、{
、}
は、テンプレート変数の一部として意図されていない場合、\
でエスケープする必要があります。redo ログトピックが指定されている場合にのみ空白にすることができます。その場合、コネクターは redo ログトピックにのみ書き込みます。- 型: string
- デフォルト: ""
- 重要度: 高
redo.log.corruption.topic
Oracle redo ログ内の破損に関する情報を記述し、欠落したデータを示すイベントをコネクターが記録する Kafka のトピックの名前。空白のトピック名(デフォルト)は、この情報が Kafka に書き込まれないことを指定します。
- 型: string
- デフォルト: ""
- 重要度: 高
redo.log.topic.name
コネクターがすべての未加工の redo ログイベントを記録する Kafka のトピックの名前。
- 型: string
- デフォルト: ${connectorName}-${databaseName}-redo-log
- 重要度: 高
key.template
各変更イベントの Kafka レコードキーを定義するテンプレート。デフォルトでは、キーにはプライマリキーフィールドを持つ STRUCT が含まれ、テーブルにプライマリキーまたは一意のキーがない場合は null になります。
- 型: string
- デフォルト: ${primaryKeyStructOrValue}
- 重要度: 中
oracle.dictionary.mode
コネクターで使用されるディクショナリの処理モード。auto、online、redo_log のいずれか。auto の場合、コネクターでは、テーブルスキーマを進化させる DDL ステートメントが検出されるまで、オンラインカタログのディクショナリを使用します。DDL ステートメントが検出された時点で、コネクターでは、アーカイブ済み redo ログのディクショナリの使用を開始します。DDL ステートメントが処理されると、コネクターではオンラインカタログの使用に戻ります。DDL ステートメントが想定される場合は、このモードを使用します。online の場合、コネクターでは、常にオンラインディクショナリカタログを使用します。DDL ステートメントが想定されない場合は、このモードを使用します。redo_log の場合、コネクターでは、常にアーカイブ済み redo ログのディクショナリカタログを使用します。オンライン redo ログにアクセスできない場合は、このモードを使用してください。CDC イベントは、オンラインログからアーカイブされるまで遅延され、アーカイブ後にコネクターで処理されます。
- 型: string
- デフォルト: auto
- 指定可能な値: auto、online、redo_log
- 重要度: 低
output.table.name.field
Kafka に書き込まれた変更レコード内のフィールドの名前。これには、影響を受ける Oracle テーブルのスキーマ修飾名が含まれます(
dbo.Customers
など)。空白値は、このフィールドが変更レコードに含まれないことを示します。- 型: string
- デフォルト: table
- 重要度: 低
output.scn.field
Kafka に書き込まれた変更レコード内のフィールドの名前。これには、この変更が行われたときの Oracle システム変更番号(SCN)が含まれます。空白値は、このフィールドが変更レコードに含まれないことを示します。
- 型: string
- デフォルト: scn
- 重要度: 低
output.op.type.field
Kafka に書き込まれた変更レコード内のフィールドの名前。これには、この変更イベントの操作タイプが含まれます。空白値は、このフィールドが変更レコードに含まれないことを示します。
- 型: string
- デフォルト: op_type
- 重要度: 低
output.op.ts.field
Kafka に書き込まれた変更レコード内のフィールドの名前。これには、この変更イベントの操作タイムスタンプが含まれます。空白値は、このフィールドが変更レコードに含まれないことを示します。
- 型: string
- デフォルト: op_ts
- 重要度: 低
output.current.ts.field
Kafka に書き込まれた変更レコード内のフィールドの名前。これには、この変更イベントが処理されたときのコネクターのタイムスタンプが含まれます。空白値は、このフィールドが変更レコードに含まれないことを示します。
- 型: string
- デフォルト: current_ts
- 重要度: 低
output.row.id.field
Kafka に書き込まれた変更レコード内のフィールドの名前。これには、このイベントによって変更されたテーブルの行 ID が含まれます。空白値は、このフィールドが変更レコードに含まれないことを示します。
- 型: string
- デフォルト: row_id
- 重要度: 低
output.username.field
Kafka に書き込まれた変更レコード内のフィールドの名前。これには、この変更の起因となったトランザクションを実行した Oracle ユーザーの名前が含まれます。空白値は、このフィールドが変更レコードに含まれないことを示します。
- 型: string
- デフォルト: username
- 重要度: 低
output.redo.field
Kafka に書き込まれた変更レコード内のフィールドの名前。これには、この変更レコードが作成された元の redo DML ステートメントが含まれます。空白値は、このフィールドが変更レコードに含まれないことを示します。
- 型: string
- デフォルト: ""
- 重要度: 低
output.undo.field
Kafka に書き込まれた変更レコード内のフィールドの名前。これには、この変更を有効に取り消し、行の "以前の" ステートを表す元の undo DML ステートメントが含まれます。空白値は、このフィールドが変更レコードに含まれないことを示します。
- 型: string
- デフォルト: ""
- 重要度: 低
output.op.type.read.value
読み取り(スナップショット)変更イベントの操作タイプの値。デフォルトでは、"R" に設定されます。
- 型: string
- デフォルト: R
- 重要度: 低
output.op.type.insert.value
挿入変更イベントの操作タイプの値。デフォルトでは、"I" に設定されます。
- 型: string
- デフォルト: I
- 重要度: 低
output.op.type.update.value
アップデート変更イベントの操作タイプの値。デフォルトでは、"U" に設定されます。
- 型: string
- デフォルト: U
- 重要度: 低
output.op.type.delete.value
削除変更イベントの操作タイプの値。デフォルトでは、"D" に設定されます。
- 型: string
- デフォルト: D
- 重要度: 低
output.op.type.truncate.value
切り捨て変更イベントの操作タイプの値。デフォルトでは、"T" に設定されます。
- 型: string
- デフォルト: T
- 重要度: 低
lob.topic.name.template
LOB オブジェクトが書き込まれる Kafka のトピックの名前を定義するテンプレート。値は、すべてのキャプチャー対象テーブルからすべての LOB オブジェクトが 1 つのトピックに書き込まれる場合は定数にすることができます。または、サポートされているテンプレート変数(
${columnName}
、${databaseName}
、${schemaName}
、${tableName}
、${connectorName}
など)を値に含めることができます。デフォルトは空で、キャプチャー対象テーブルに LOB タイプ列がある場合に、すべての LOB タイプ列を無視します。特別な意味を持つ文字である\
、$
、{
、}
は、テンプレート変数の一部として意図されていない場合、\
でエスケープする必要があります。- 型: string
- デフォルト: ""
- 重要度: 低
snapshot.by.table.partitions
テーブルがパーティションを使用するよう定義されている場合に、コネクターが各テーブルパーティションでスナップショットを実行する必要があるかどうかを指定します。これはデフォルトは
false
です。この場合、各テーブルの全体のスナップショットが 1 回実行されます。- 型: boolean
- デフォルト: true
- 重要度: 低
snapshot.threads.per.task
スナップショットを実行するために各タスクで使用できるスレッド数。これは、タスクに割り当てられているテーブル数より値が大きい場合のみ各タスクで使用されます。デフォルトは 4 です。
- 型: int
- デフォルト: 4
- 指定可能な値: [1,...,10]
- 重要度: 低
enable.large.lob.object.support
true の場合、コネクターは、複数の redo ログレコードに分割される、サイズの大きな LOB オブジェクトをサポートします。コネクターは、redo ログトピックにコミットメッセージを送信し、これらのコミットメッセージを使用して、サイズの大きな LOB オブジェクトを LOB トピックに送信できるタイミングを追跡します。
- 型: boolean
- デフォルト: false
- 重要度: 低
numeric.mapping
NUMERIC 値を精度とスケール(オプション)に基づいて、プリミティブ型または小数型にマッピングします。すべての NUMERIC 列を Connect の DECIMAL 論理型で表す場合は、
none
を使用します。NUMERIC 列を、列の精度とスケールに基づいて、Connect のプリミティブ型にキャストする必要がある場合は、best_fit_or_decimal
を使用します。精度とスケールがいずれかのプリミティブ型の境界を超える場合は、代わりに Connect の DECIMAL 論理型が使用されます。NUMERIC 列を、列の精度とスケールに基づいて、Connect のプリミティブ型にキャストする必要がある場合は、best_fit_or_double
を使用します。精度とスケールがいずれかのプリミティブ型の境界を超える場合は、代わりに Connect の FLOAT64 型が使用されます。NUMERIC 列を、列の精度とスケールに基づいて、Connect のプリミティブ型にキャストする必要がある場合は、best_fit_or_string
を使用します。精度とスケールがいずれかのプリミティブ型の境界を超える場合は、代わりに Connect の STRING 型が使用されます。列の精度のみに基づいて(列のスケールが 0 であることを前提として)NUMERIC 列をマップするには、precision_only
を使用します。none
オプションがデフォルトですが、Connect の DECIMAL 型はそのバイナリ表現にマップされるため、シリアル化の問題を招く可能性があります。多くの場合、best_fit_or
オプションのいずれかが望ましい選択となります。後方互換性上の理由から、best_fit
オプションも利用できます。その動作は、best_fit_or_decimal
と同じです。- 型: string
- デフォルト: none
- 指定可能な値: best_fit、best_fit_or_decimal、best_fit_or_double、best_fit_or_string、none、precision_only
- 重要度: 低
numeric.default.scale
スケールを決定できない場合に、数値型に対して使用するデフォルトのスケール。
- 型: int
- デフォルト: 127
- 指定可能な値: [-127,...,127]
- 重要度: 低
oracle.date.mapping
Oracle DATE 値を Connect の型にマッピングします。すべての DATE 列を Connect の Date 論理型で表す場合は、
date
を使用します。DATE 列を Connect のタイムスタンプにキャストする場合は、timestamp
を使用します。後方互換性のため、date
オプションがデフォルトとなっています。名前は似ていますが、Oracle DATE 型は Connect Date とはセマンティクスが異なります。セマンティクスの類似性から、多くの場合、timestamp
が望ましい選択となります。- 型: string
- デフォルト: timestamp
- 指定可能な値: date、timestamp
- 重要度: 低
output.data.key.format
Kafka 出力レコードキーのフォーマットを設定します。指定可能なエントリは、AVRO、JSON_SR、PROTOBUF、または JSON です。スキーマベースのメッセージフォーマット(AVRO、JSON_SR、PROTOBUF など)を使用する場合は、Confluent Cloud Schema Registry を構成しておく必要がある点に注意してください。
- 型: string
- デフォルト: JSON
- 指定可能な値: AVRO、JSON、JSON_SR、PROTOBUF、STRING
- 重要度: 高
output.data.value.format
Kafka 出力レコード値のフォーマットを設定します。指定可能なエントリは、AVRO、JSON_SR、PROTOBUF、または JSON です。スキーマベースのメッセージフォーマット(AVRO、JSON_SR、PROTOBUF など)を使用する場合は、Confluent Cloud Schema Registry を構成しておく必要がある点に注意してください。
- 型: string
- デフォルト: JSON
- 指定可能な値: AVRO、JSON、JSON_SR、PROTOBUF、STRING
- 重要度: 高
このコネクターのタスク数(Number of tasks for this connector)¶
tasks.max
このコネクターに使用するタスクの最大数。
- 型: int
- デフォルト: 2
- 指定可能な値: [1,...,1000]
- 重要度: 高
テンプレート変数¶
コネクターはテンプレート変数を使用して Kafka トピックの名前と各変更イベントのレコードキーを作成します。これらの変数は Oracle GoldenGate Kafka Connect テンプレート変数 に類似しており、Oracle GoldenGate からこのコネクターへのマイグレーションを簡素化します。変数はタスクレベルとテーブルレベルで解決されます。
コネクターとタスクの変数¶
変数のキーワード | 説明 |
---|---|
${connectorName} | コネクターの名前に解決します。 |
${databaseName} | データベース名に解決します。 |
${emptyString} | 空の文字列に解決します。 |
${staticMap[]} | 静的な値(キーは完全修飾テーブル名)に解決します。角かっこ内のキーと値は、${staticMap[dbo.table1=value1,dbo.table2=value2]} の形式で指定します。 |
${currentTimestamp} または ${currentTimestamp[]} | 現在のタイムスタンプに解決します。現在のタイムスタンプのフォーマットは Java ベースの書式設定を使用して制御できます(SimpleDateFormat クラスのドキュメント を参照)。例: ${currentDate} 、${currentDate[yyyy-mm-dd hh:MM:ss.SSS]} |
${connectorName} | コネクターの名前に解決します。 |
テーブルの変数¶
変数のキーワード | 説明 |
---|---|
${schemaName} | テーブルのスキーマ名に解決します。 |
${tableName} | 短いテーブル名に解決します。 |
${fullyQualifiedTableName} | 完全修飾テーブル名に解決します(スキーマ名とテーブル名の間のピリオド(.)を含む)。例: dbo.table1 。 |
列の変数¶
変数のキーワード | 説明 |
---|---|
${columnName} | 列名に解決します。 |
レコードの変数¶
変数のキーワード | 説明 |
---|---|
${opType} | 操作のタイプ(READ、INSERT、UPDATE、DELETE、TRUNCATE)に解決します。 |
${opTimestamp} | redo ログの操作のタイムスタンプに解決します。 |
${rowId} | 変更した行の ID に解決します。 |
${primaryKey} | アンダースコア(_ )文字で区切られた、連結されたプライマリキーの値に解決します。 |
${primaryKeyStruct} | プライマリキー列のそれぞれの値のフィールドを持つ STRUCT に解決します。 |
${primaryKeyStructOrValue} | 2 つ以上のプライマリキー列のそれぞれの値のフィールドを持つ STRUCT に解決されるか、または、プライマリキーが単一列を含む場合は列の値に解決されます。 |
${scn} | 変更が行われた場合はシステム変更番号(SCN)に解決されます。 |
${cscn} | 変更がコミットされた場合はシステム変更番号(SCN)に解決されます。 |
${rbaseq} | 変更に関連付けられた redo レコードの Redo ブロックアドレス(RBA)に関連付けられたシーケンス番号に解決されます。 |
${rbablk} | ログファイル内の RBA ブロック番号に解決されます。 |
${rbabyte} | ブロック内の RBA バイトオフセットに解決されます。 |
${currentTimestamp} または ${currentTimestamp[]} | 現在のタイムスタンプに解決します。現在のタイムスタンプのフォーマットは Java ベースの書式設定を使用して制御できます(SimpleDateFormat クラスのドキュメント を参照)。例: ${currentDate} 、${currentDate[yyyy-mm-dd hh:MM:ss.SSS]} |
サポートされるデータ型¶
以下の表は、データ型および関連する Connect マッピングの一覧です。
Oracle データ型 | SQL タイプのコード | 接続マッピング |
---|---|---|
CHAR または CHARACTER | 1 | STRING |
LONG | 1 | STRING |
VARCHAR | 12 | STRING |
VARCHAR | 12 | STRING |
NCHAR | -15 | STRING |
NVARCHAR2 | -9 | STRING |
RAW | -3 | BYTES |
LONG RAW | -1 | BYTES |
INT または INTEGER | 2 | DECIMAL |
SMALLINT | 2 | DECIMAL |
DEC または DECIMAL | 2 | DECIMAL |
NUMBER | 2 | DECIMAL |
NUMERIC | 2 | DECIMAL |
DOUBLE PRECISION | 6 | FLOAT64 |
FLOAT | 6 | FLOAT64 |
REAL | 6 | FLOAT64 |
TIMESTAMP WITH TIMEZONE | -101 | TIMESTAMP |
TIMESTAMP WITH LOCAL TIME ZONE | -102 | TIMESTAMP |
DATE | 91 | DATE |
BLOB | 2004 | BYTES |
CLOB | 2005 | BYTES |
NCLOB | 2011 | BYTES |
XMLTYPE | 2009 | BYTES |
注釈
-101
コードと -102
コード(TIMESTAMP WITH TIMEZONE
および TIMESTAMP WITH LOCAL TIMEZONE
の場合)は Oracle 固有です。BLOB、CLOB、NCLOB、XMLTYPE は、個別の LOB トピックを使用して別途処理されます。
トラブルシューティングとヒント¶
以下に情報を示します。
19c 以前のデータベース¶
レコードをキャプチャーするテーブルでのアクティビティが少ない場合は、query.timeout.ms
を小さくすることを検討してください。
Error while polling for records
による機能低下¶
このコネクターが作成される際に、以下のエラーイベントによってコネクターに機能低下が発生することがあります。
{
"datacontenttype": "application/json",
"data": {
"level": "ERROR",
"context": {
"connectorId": "lcc-12abc3"
},
"summary": {
"connectorErrorSummary": {
"message": "Error while polling for records",
"rootCause": "Failed to subscribe to the redo log topic 'lcc-22rwg2-ORCL-redo-log' even after waiting PT5M. Verify that this redo log topic exists in the brokers at SASL_SSL://<address>, and that the redo log reading task is able to produce to that topic."
}
}
},
"subject": "lcc-12abc3-llcc-12abc3-1",
"specversion": "1.0",
"id": "a12345bc-6d78-91e0-fg11-c3ac4f20220b",
"source": "crn://confluent.cloud/connector=lcc-12abc3",
"time": "2022-03-28T18:25:29.631Z",
"type": "io.confluent.logevents.connect.TaskFailed"
}
新たに挿入、アップデート、または削除が行われた場合に、このエラーメッセージによって自己修正することがあります。または、コネクターに対して redo.log.startup.polling.limit.ms
を大きな値に設定できます。これにより、コネクターは redo ログトピックが作成されるまで待機します。
別の方法として、1 つのパーティションを持つ redo-log-topic を作成し、コネクターを作成するときに "redo.log.topic.name" : "redo-log-topic"
で指定できます。