重要

このページの日本語コンテンツは古くなっている可能性があります。最新の英語版コンテンツをご覧になるには、こちらをクリックしてください。

Confluent Cloud の Microsoft SQL Server CDC ソース(Debezium)コネクター

注釈

Confluent Platform 用にコネクターをローカルにインストールする場合は、「Debezium SQL Server CDC Source Connector for Confluent Platform」を参照してください。

Kafka Connect Microsoft SQL Server Change Data Capture (CDC) Source (Debezium) Connector for Confluent Cloud は、SQL Server データベースの既存データのスナップショットを取得し、そのデータに対してそれ以降に発生する行レベルの変更をすべてモニタリングして記録することができます。このコネクターは、Avro、JSON スキーマ、Protobuf、または JSON(スキーマレス)の出力データフォーマットをサポートします。テーブルごとにすべてのイベントが個別の Apache Kafka® トピックに記録されます。そのため、アプリケーションやサービスでイベントを簡単に消費することができます。

機能

SQL Server CDC Source(Debezium)Connector には、以下の機能があります。

  • トピックの自動作成: このコネクターは、命名規則 <database.server.name>.<schemaName>.<tableName> を使用して自動的に Kafka トピックを作成します。テーブルの作成には topic.creation.default.partitions=1 プロパティおよび topic.creation.default.replication.factor=3 プロパティが使用されます。詳細については、「最大メッセージサイズ」を参照してください。
  • 対象に含めるテーブル対象から除外するテーブル: テーブルの変更をモニタリングするかどうかを設定できます。デフォルトでは、コネクターはシステムテーブル以外のテーブルをすべてモニタリングします。
  • コネクターあたりのタスク数: 組織では複数のコネクターを実行できますが、コネクターあたり 1 つのタスク("tasks.max": "1")という制限があります。
  • スナップショットモード: スナップショットを実行する条件を指定できます。
  • 削除時のトゥームストーン: delete イベントの後に tombstone イベントを生成するかどうかを構成できます。デフォルトは true です。
  • データベースの認証: パスワード認証。
  • 出力フォーマット: このコネクターでは、"Kafka 出力レコード値フォーマット" として Avro、JSON スキーマ、Protobuf、JSON(スキーマレス)がサポートされます。また、"Kafka 出力レコードキーフォーマット" として Avro、JSON スキーマ、Protobuf、JSON(スキーマレス)、String がサポートされます。スキーマレジストリ ベースのフォーマット(Avro、JSON_SR(JSON スキーマ)、Protobuf など)を使用するには、Schema Registry を有効にしておく必要があります。詳細については、「スキーマレジストリ Enabled Environments」を参照してください。

Connect 用の Confluent Cloud API の使用に関する詳細とサンプルについては、「Confluent Cloud API for Connect」セクションを参照してください。

制限

以下の情報を確認してください。

最大メッセージサイズ

このコネクターはトピックを自動的に作成します。トピックの作成時に、内部コネクター構成プロパティ max.message.size が次のように設定されます。

  • ベーシッククラスター: 2 MB
  • スタンダードクラスター: 2 MB
  • 専用クラスター: 20 MB

Confluent Cloud クラスターの詳細については、「クラスタータイプごとの Confluent Cloud の機能と制限」を参照してください。

クイックスタート

このクイックスタートを使用して、Confluent Cloud の Microsoft SQL Server CDC ソース(Debezium)コネクターの利用を開始することができます。このクイックスタートでは、コネクターを選択してから、Microsoft SQL Server データベースの既存データのスナップショットを取得して、それ以降に発生する行レベルの変更をすべてモニタリングして記録するようにコネクターを構成するための基本的な方法について説明します。

前提条件
  • アマゾンウェブサービス (AWS)、Microsoft Azure (Azure)、または Google Cloud Platform (GCP)上の Confluent Cloud クラスターへのアクセスを許可されていること。

  • Confluent CLI がインストールされ、クラスター用に構成されていること。「Confluent CLI のインストール」を参照してください。

  • スキーマレジストリ ベースのフォーマット(Avro、JSON_SR(JSON スキーマ)、Protobuf など)を使用するには、Schema Registry を有効にしておく必要があります。詳細については、「スキーマレジストリ Enabled Environments」を参照してください。

  • 変更データキャプチャー(CDC)を構成した SQL Server。

  • データベースにパブリックアクセスが必要になる場合があります。詳細については、「ネットワークアクセス」を参照してください。以下の例は、Microsoft SQL Server データベースをセットアップするときの AWS 管理コンソールを示しています。

    AWS での Microsoft SQL Server へのパブリックアクセスの例

    パブリックアクセスを有効にする

  • ネットワークに関する考慮事項については、「Networking and DNS Considerations」を参照してください。静的なエグレス IP を使用する方法については、「静的なエグレス IP アドレス」を参照してください。以下の例は、VPC にセキュリティグループルールをセットアップするときの AWS 管理コンソールを示しています。

    AWS でのセキュリティグループルールの例

    インバウンドトラフィックを開く

    注釈

    VPC にセキュリティルールを構成する方法については、ご使用のクラウドプラットフォームのドキュメントを参照してください。

  • Kafka クラスターの認証情報。次のいずれかの方法で認証情報を指定できます。
    • 既存の サービスアカウント のリソース ID を入力する。
    • コネクター用の Confluent Cloud サービスアカウント を作成します。「サービスアカウント」のドキュメントで、必要な ACL エントリを確認してください。一部のコネクターには固有の ACL 要件があります。
    • Confluent Cloud の API キーとシークレットを作成する。キーとシークレットを作成するには、confluent api-key create を使用するか、コネクターのセットアップ時に Cloud Console で直接 API キーとシークレットを自動生成します。

Confluent Cloud Console の使用

ステップ 1: Confluent Cloud クラスターを起動します。

インストール手順については、「Quick Start for Confluent Cloud」を参照してください。

ステップ 2: コネクターを追加します。

左のナビゲーションメニューの Data integration をクリックし、Connectors をクリックします。クラスター内に既にコネクターがある場合は、+ Add connector をクリックします。

ステップ 3: コネクターを選択します。

Click the Microsoft SQL Server CDC Source connector card.

Microsoft SQL Server CDC Source Connector Card

ステップ 4: 接続をセットアップします。

以下を実行して、Continue をクリックします。

注釈

  • すべての 前提条件 を満たしていることを確認してください。
  • アスタリスク( * )は必須項目であることを示しています。
  1. コネクターの 名前 を入力します。

  2. Kafka Cluster credentials で Kafka クラスターの認証情報の指定方法を選択します。サービスアカウントのリソース ID を選択するか、API キーとシークレットを入力できます(または、Cloud Console でこれらを生成します)。

  3. データベース接続の詳細情報を追加します。

    重要

    接続の hostname プロパティには jdbc:xxxx:// を含めないでください。以下にホストアドレスの例を示します。

    ../_images/ccloud-postgresql-source-connect-to-data.png
  4. データベース接続の詳細情報を追加します。

    Database application intent は省略可能なプロパティです。これにより、SQL サーバーの接続文字列キーワード ApplicationIntent のプロパティとして ReadOnly または ReadWrite (デフォルト値)が設定されます。サーバー接続時にアプリケーションインテントの宣言に使用されます。詳細については、「Specify application intent」を参照してください。

  5. Database details に情報を追加します(省略可)。

    • Snapshot isolation mode: 使用されるトランザクション分離レベルと、キャプチャー用に指定されたテーブルをコネクターがロックする期間の長さを制御するモード。read_committed モードおよび read_uncommitted モードでは、初期スナップショット中に他のトランザクションによってテーブル行が更新される状況が回避されません。exclusive モードおよび repeatable_read (デフォルト値)モードでは、同時更新が 回避 されます。モードの選択は、データの一貫性にも影響します。完全な一貫性が保証されるのは、exclusive モードおよび snapshot モードのみです。線形履歴は初期スナップショットとストリーミングログで構成される点に注意してください。つまり、repeatable_read または read_committed モードを使用すると、追加されたレコードが 2 回出現する場合があります(初期スナップショットで 1 回、ストリーミング中に 1 回)。データのミラーリングでは、この一貫性レベルで十分です。read_uncommitted の場合、データの整合性はまったく保証されません(一部のデータについて損失や破損の可能性があります)。
    • Tables included: このコネクターでモニタリングするテーブルの完全修飾識別子をコンマ区切りにしたリストを入力します。デフォルトでは、コネクターはシステムテーブル以外のテーブルを "すべて" モニタリングします。完全修飾テーブル名の形式は、schemaName.tableName です。このプロパティを Tables excluded プロパティとともに使用することはできません。
    • Tables excluded: このコネクターで "無視" するテーブルの完全修飾識別子をコンマ区切りにしたリストを入力します。完全修飾テーブル名のフォームは、schemaName.tableName です。このプロパティは、プロパティ Tables included と同時には使用できません。
    • Snapshot mode: コネクターの起動時にデータベーススナップショットを実行する条件を指定します。
      • デフォルトの設定は、initial です。これを選択すると、コネクターは、キャプチャー対象テーブルの構造およびデータのスナップショットを取得します。これは、コネクターの起動時にキャプチャー対象テーブルの完全なデータ表現をトピックに取り込む必要がある場合に便利です。
      • schema_only は、キャプチャー対象テーブルの構造のスナップショットのみを取得します。これは、コネクターの起動時以降に発生するデータ変更のみをキャプチャーする場合に便利です。
    • Propagate Source Types by Data Type: データベース固有の データ型 に一致する正規表現のコンマ区切りリストを入力します。このプロパティでは、出力された変更レコードの対応するフィールドスキーマに、データ型の元の型と元の長さが(パラメーターとして)追加されます。
    • Tombstones on delete: delete イベントの後に tombstone イベントを生成するかどうかを構成します。デフォルトは true です。
    • Columns Excluded: 変更イベントレコードの値から除外する列の完全修飾名に一致する正規表現のリスト(省略可能)。コンマ区切りで指定します。列の完全修飾名は、databaseName.tableName.columnName という形式になります。
  6. Connection details に情報を追加します(省略可)。

    • Poll interval (ms): 各反復でコネクターが新しい変更イベントの出現を待機するミリ秒数を入力します。デフォルト値は 1000 ミリ秒(1 秒)です。
    • Max batch size: 各反復でコネクターがバッチで処理するイベントの最大数を入力します。デフォルト値は 1000 件です。
    • Event processing failure handling mode: binlog イベントを処理する際の例外へのコネクターによる対応方法を指定します。デフォルト値は fail です。イベントをスキップする場合や警告を発行する場合は、それぞれ skip または warn を選択します。
    • Heartbeat interval (ms): コネクターが Kafka トピックに送信するハートビートメッセージの間隔をミリ秒単位で設定します。デフォルト値は 0 で、コネクターがハートビートメッセージを送信しないことを示します。
    • Skip unparseable DDL: これを true に設定すると、正しくない形式または不明なデータベースステートメントが無視されます。これらの問題を修正できるように処理を停止するには、false に設定します。デフォルトは false です。解析不能なステートメントを無視するには、これを true に設定することを検討してください。
  7. Connector details に情報を追加します(省略可)。

    • Provide transaction metadata: トランザクションメタデータ を有効にするかどうかを選択します。トランザクションメタデータは、専用の Kafka トピックに保存されます。デフォルトは false です。
    • Decimal handling mode: 変更イベントでの DECIMAL 列と NUMERIC 列の表現方法を指定します。precise (デフォルト値)を指定すると、値の表現に java.math.BigDecimal が使用されます。変更イベントでは、バイナリ表現と Connect の org.apache.kafka.connect.data.Decimal データ型を使用して値がエンコードされます。string 型を使用して値を表すには、string を選択します。double を選択すると、Java の double データ型を使用して値が表現されます。double では精度が低下する可能性がありますが、コンシューマーでの使いやすさは大幅に向上します。
    • Binary handling mode: 変更イベントでのバイナリ(blob、binary)列の表現方法を指定します。バイナリデータをバイト配列形式で表現するには、bytes (デフォルト値)を選択します。バイナリデータを base64 でエンコードされた文字列形式で表現するには、base64 を選択します。バイナリデータを 16 進(base16)でエンコードされた文字列形式で表現するには、hex を選択します。
    • Time precision mode: 時間、日付、タイムスタンプは、さまざまな種類の精度で表現できます。時刻、日付、およびタイムスタンプの値の精度をデータベース列の精度に基づいて決定するには、adaptive (デフォルト値)を選択します。adaptive_time_microseconds は、TIME フィールドで常にマイクロ秒の精度が使用される点を除き、基本的に adaptive モードと同じです。connect: 時刻、日付、タイムスタンプ値の表現には常に、Connect に組み込まれている Time、Date、および Timestamp の表現が使用されます。connect を選択した場合は、データベース列に使用されている精度に関係なく、精度としてミリ秒が使用されます。詳細については、「Temporal types」を参照してください。
    • Topic cleanup policy: トピック保持クリーンアップポリシー を設定します。古いトピックを破棄するには、delete (デフォルト値)を選択します。トピックでの ログ圧縮 を有効にするには、compact を選択します。
  8. 以下の Output プロパティについて、値を選択します。

    • Output Kafka record value フォーマット: コネクターから送られるデータのフォーマットを AVRO、JSON(スキーマレス)、JSON_SR(JSON スキーマ)、または PROTOBUF から選択します。スキーマベースのレコードフォーマット(Avro、JSON_SR(JSON スキーマ)、Protobuf など)を使用するには、有効なスキーマが Schema Registry に存在する必要があります。

    • After-state only:(省略可)デフォルトでは true となり、Kafka レコードは、適用された変更イベントからのレコードステートのみを持ちます。false を選択すると、変更イベントの適用後も以前のレコードステートが維持されます。

    • Output Kafka record key format: コネクターから送られるデータのフォーマットを AVRO、JSON(スキーマレス)、JSON_SR(JSON スキーマ)、PROTOBUF、または String から選択します。スキーマベースのレコードフォーマット(Avro、JSON_SR(JSON スキーマ)、Protobuf など)を使用するには、有効なスキーマが Schema Registry に存在する必要があります。

    • JSON output decimal format:(省略可)デフォルトでは BASE64 です。

      JSON output decimal format プロパティ
  9. このコネクターで使用する タスク の数を入力します。組織では複数のコネクターを実行できますが、コネクターあたり 1 つのタスク("tasks.max": "1")という制限があります。

  10. Transforms and Predicates: 詳細については、Single Message Transforms(SMT) のドキュメントを参照してください。

すべてのプロパティの値と定義については、「構成プロパティ」を参照してください。

ステップ 5: コネクターを起動します。

接続の詳細情報を確認し、Launch をクリックします。

ステップ 6: コネクターのステータスを確認します。

コネクターのステータスが Provisioning から Running に変わります。ステータスが変わるまで数分かかる場合があります。

ステップ 7: Kafka トピックを確認します。

コネクターが実行中になったら、メッセージが Kafka トピックに取り込まれていることを確認します。

注釈

A topic named dbhistory.<database.server.name>.<connect-id> is automatically created for database.history.kafka.topic with one partition.

Connect 用の Confluent Cloud API の使用に関する詳細とサンプルについては、「Confluent Cloud API for Connect」セクションを参照してください。

参考

フルマネージド型の Confluent Cloud コネクターが Confluent Cloud ksqlDB でどのように動作するかを示す例については、「Cloud ETL のデモ」を参照してください。この例では、Confluent CLI を使用して Confluent Cloud のリソースを管理する方法についても説明しています。

../_images/topology.ja.png

Confluent CLI の使用

以下の手順に従うと、Confluent CLI を使用してコネクターをセットアップし、実行できます。

注釈

ステップ 1: 使用可能なコネクターをリスト表示します。

以下のコマンドを入力して、使用可能なコネクターをリスト表示します。

confluent connect plugin list

ステップ 2: コネクターの必須の構成プロパティを表示します。

以下のコマンドを実行して、コネクターの必須プロパティを表示します。

confluent connect plugin describe <connector-catalog-name>

例:

confluent connect plugin describe MicrosoftSqlServerSource

出力例:

Following are the required configs:
connector.class: SqlServerCdcSource
name
kafka.auth.mode
kafka.api.key
kafka.api.secret
database.hostname
database.port
database.user
database.password
database.dbname
database.server.name
output.data.format
tasks.max

ステップ 3: コネクターの構成ファイルを作成します。

コネクター構成プロパティを含む JSON ファイルを作成します。以下の例は、コネクターの必須プロパティを示しています。

{
  "connector.class": "SqlServerCdcSource",
  "name": "SqlServerCdcSourceConnector_0",
  "kafka.auth.mode": "KAFKA_API_KEY",
  "kafka.api.key": "****************",
  "kafka.api.secret": "****************************************************************",
  "database.hostname": "connect-sqlserver-cdc.<host-id>.us-west-2.rds.amazonaws.com",
  "database.port": "1433",
  "database.user": "admin",
  "database.password": "************",
  "database.dbname": "database-name",
  "database.server.name": "sql",
  "table.include.list":"public.passengers",
  "snapshot.mode": "initial",
  "output.data.format": "JSON",
  "tasks.max": "1"
}

以下のプロパティ定義に注意してください。

  • "connector.class": コネクターのプラグイン名を指定します。
  • "name": 新しいコネクターの名前を設定します。
  • "kafka.auth.mode": 使用するコネクターの認証モードを指定します。オプションは SERVICE_ACCOUNT または KAFKA_API_KEY (デフォルト)です。API キーとシークレットを使用するには、構成プロパティ kafka.api.keykafka.api.secret を構成例(前述)のように指定します。サービスアカウント を使用するには、プロパティ kafka.service.account.id=<service-account-resource-ID>リソース ID を指定します。使用できるサービスアカウントのリソース ID のリストを表示するには、次のコマンドを使用します。

    confluent iam service-account list
    

    例:

    confluent iam service-account list
    
       Id     | Resource ID |       Name        |    Description
    +---------+-------------+-------------------+-------------------
       123456 | sa-l1r23m   | sa-1              | Service account 1
       789101 | sa-l4d56p   | sa-2              | Service account 2
    
  • "table.whitelist":(オプション)コネクターでモニタリングするテーブルの完全修飾識別子をコンマ区切りにしたリストを入力します。デフォルトでは、コネクターはシステムテーブル以外のテーブルを "すべて" モニタリングします。完全修飾テーブル名のフォームは、schemaName.tableName です。

  • "snapshot.mode": コネクターの起動時にデータベーススナップショットを実行する条件を指定します。

    • デフォルトの設定は、initial です。これを選択すると、コネクターは、キャプチャー対象テーブルの構造およびデータのスナップショットを取得します。これは、コネクターの起動時にキャプチャー対象テーブルの完全なデータ表現をトピックに取り込む必要がある場合に便利です。
    • これを schema.only に設定すると、キャプチャー対象テーブルの構造のスナップショットのみが取得されます。これは、コネクターの起動時以降に発生するデータ変更のみをキャプチャーする場合に便利です。
  • "output.data.format": Kafka 出力レコード値のフォーマット(コネクターから送られるデータ)を設定します。指定可能なエントリは、AVROJSON_SRPROTOBUF、または JSON です。スキーマベースのレコードフォーマット(Avro、JSON_SR(JSON スキーマ)、Protobuf など)を使用するには、Confluent Cloud Schema Registry を構成しておく必要があります。

  • after.state.only:(省略可)デフォルトでは true となり、Kafka レコードは、適用された変更イベントからのレコードステートのみを持ちます。false と入力すると、変更イベントの適用後も以前のレコードステートが維持されます。

  • "json.output.decimal.format":(省略可)デフォルトでは BASE64 です。次の 2 つのリテラル値を取る Connect DECIMAL 論理型の値の JSON/JSON_SR シリアル化フォーマットを指定します。

    • BASE64 では、DECIMAL 論理型を base64 でエンコードされたバイナリデータとしてシリアル化します。
    • NUMERIC では、JSON または JSON_SR の Connect DECIMAL 論理型の値を、10 進数の値を表す数字としてシリアル化します。
  • "column.exclude.list":(省略可能)変更イベントレコードの値から除外する列の完全修飾名に一致する正規表現のリスト。コンマ区切りで指定します。列の完全修飾名は、databaseName.tableName.columnName という形式になります。

  • "tasks.max": このコネクターで使用する タスク の数を入力します。組織では複数のコネクターを実行できますが、コネクターあたり 1 つのタスク("tasks.max": "1")という制限があります。

Single Message Transforms: CLI を使用する SMT の追加の詳細については、Single Message Transforms(SMT) のドキュメントを参照してください。

すべてのプロパティの値と定義については、「構成プロパティ」を参照してください。

ステップ 4: プロパティファイルを読み込み、コネクターを作成します。

以下のコマンドを入力して、構成を読み込み、コネクターを起動します。

confluent connect create --config <file-name>.json

例:

confluent connect create --config microsoft-sql-cdc-source.json

出力例:

Created connector SqlServerCdcSourceConnector_0 lcc-ix4dl

ステップ 5: コネクターのステータスを確認します。

以下のコマンドを入力して、コネクターのステータスを確認します。

confluent connect list

出力例:

ID          |            Name                | Status  |  Type
+-----------+--------------------------------+---------+-------+
lcc-ix4dl   | SqlServerCdcSourceConnector_0  | RUNNING | source

ステップ 6: Kafka トピックを確認します。

コネクターが実行中になったら、メッセージが Kafka トピックに取り込まれていることを確認します。

Connect 用の Confluent Cloud API の使用に関する詳細とサンプルについては、「Confluent Cloud API for Connect」セクションを参照してください。

注釈

A topic named dbhistory.<database.server.name>.<connect-id> is automatically created for database.history.kafka.topic with one partition.

構成プロパティ

このコネクターでは、以下のコネクター構成プロパティを使用します。

データへの接続方法(How should we connect to your data?)

name

コネクターの名前を設定します。

  • 型: string
  • 指定可能な値: 最大 64 文字の文字列
  • 重要度: 高

Kafka クラスターの認証情報(Kafka Cluster credentials)

kafka.auth.mode

Kafka の認証モード。KAFKA_API_KEY または SERVICE_ACCOUNT を指定できます。デフォルトは KAFKA_API_KEY モードです。

  • 型: string
  • デフォルト: KAFKA_API_KEY
  • 指定可能な値: KAFKA_API_KEY、SERVICE_ACCOUNT
  • 重要度: 高
kafka.api.key
  • 型: password
  • 重要度: 高
kafka.service.account.id

Kafka クラスターとの通信用の API キーを生成するために使用されるサービスアカウント。

  • 型: string
  • 重要度: 高
kafka.api.secret
  • 型: password
  • 重要度: 高

データベースへの接続方法(How should we connect to your database?)

database.hostname

Microsoft SQL Server のアドレス。

  • 型: string
  • 重要度: 高
database.port

Microsoft SQL Server のポート番号。

  • 型: int
  • 指定可能な値: [0,...,65535]
  • 重要度: 高
database.user

必要な認可を持つ Microsoft SQL Server ユーザーの名前。

  • 型: string
  • 重要度: 高
database.password

必要な認可を持つ Microsoft SQL Server ユーザーのパスワード。

  • 型: password
  • 重要度: 高
database.dbname

接続先の Microsoft SQL Server データベースの名前。

  • 型: string
  • 重要度: 高
database.server.name

Microsoft SQL Server クラスターの論理名。この論理名は 1 つの名前空間を形成し、すべての Kafka トピック名および Kafka Connect スキーマ名で使用されます。また、Avro データフォーマットが使用される場合、論理名は、対応する Avro スキーマの名前空間でも使用されます。Kafka トピックは、プレフィックス database.server.name を付加して作成する必要があります(そのように作成されます)。英数字、アンダースコア、ハイフン、ドットのみを使用できます。

  • 型: string
  • 重要度: 高
database.instance

Microsoft SQL Server インスタンスの名前

  • 型: string
  • 重要度: 中
database.applicationIntent

接続文字列でキーワード ApplicationIntent を指定できます。指定可能な値は、ReadWrite(デフォルト)または ReadOnly です

  • 型: string
  • デフォルト: ReadWrite
  • 指定可能な値: ReadOnly、ReadWrite
  • 重要度: 中

データベースの詳細(Database details)

signal.data.collection

Fully-qualified name of the data collection that needs to be used to send signals to the connector. Use the following format to specify the fully-qualified collection name: databaseName.schemaName.tableName

  • 型: string
  • 重要度: 中
snapshot.isolation.mode

使用されるトランザクション分離レベルと、キャプチャー用に指定されたテーブルをコネクターがロックする期間の長さを制御するモード。snapshot モード、read_committed モードおよび read_uncommitted モードでは、初期スナップショット中に他のトランザクションによってテーブル行が更新される状況が回避されません。exclusive モードおよび repeatable_read モードでは、同時更新が回避されます。モードの選択は、データの一貫性にも影響します。exclusive モードおよび snapshot モードでのみ完全な一貫性が保証されます。つまり、線形履歴は初期スナップショットとストリーミングログで構成されます。repeatable_read モードおよび read_committed モードの場合、たとえば、追加されたレコードが 2 回出現する可能性があります(初期スナップショットで 1 回、ストリーミング段階で 1 回)。そうであったとしても、データのミラーリングでは、この一貫性レベルで十分です。read_uncommitted の場合、データの整合性はまったく保証されません(一部のデータについて損失や破損の可能性があります)。

  • 型: string
  • デフォルト: repeatable_read
  • 指定可能な値: exclusive、read_committed、read_uncommitted、repeatable_read、snapshot
  • 重要度: 低
table.include.list

モニタリング対象のテーブルの完全修飾識別子と一致する文字列のコンマ区切りのリスト(省略可)。この構成プロパティに含まれていないテーブルはすべて、モニタリング対象から除外されます。各識別子は、schemaName.tableName という形式になります。デフォルトでは、コネクターはモニタリング対象の各スキーマのすべての非システムテーブルをモニタリングします。"Table excluded" と一緒に使用することはできません。

  • 型: list
  • 重要度: 中
table.exclude.list

モニタリング対象から除外するテーブルの完全修飾識別子と一致する文字列のコンマ区切りのリスト(省略可)。この構成プロパティに含まれていないテーブルはすべて、モニタリングの対象になります。各識別子は、schemaName.tableName という形式になります。"Table included" と一緒に使用することはできません。

  • 型: list
  • 重要度: 中
snapshot.mode

キャプチャー対象のテーブルの構造とデータ(省略可)の初期スナップショットを取得するためのモード。スナップショットの完了後、コネクターはデータベースの redo ログからの変更イベントの読み取りを続行します。デフォルトの設定は initial で、キャプチャー対象のテーブルの構造とデータのスナップショットを取得します。キャプチャー対象のテーブルの完全なデータ表現をトピックに取り込む必要がある場合に便利です。schema_only オプションでは、キャプチャー対象のテーブルの構造についてのみスナップショットを取得します。現時点以降に行われる変更のみがトピックに伝搬される必要がある場合に便利です。

  • 型: string
  • デフォルト: initial
  • 重要度: 低
datatype.propagate.source.type

データベース固有のデータ型名と一致する正規表現のコンマ区切りのリスト。出力された変更レコードの対応するフィールドスキーマに、データ型の元の型と元の長さがパラメーターとして追加されます。

  • 型: list
  • 重要度: 低
tombstones.on.delete

delete イベントの後に tombstone イベントを生成するかどうかを制御します。true に設定すると、削除操作は delete イベント、およびそれに続く tombstone イベントで表されます。false に設定すると、delete イベントだけが送信されます。tombstone イベントの実行(デフォルトの動作)により、Kafka はソースレコードが削除されてから特定のキーに関連したすべてのイベントを完全に削除できるようになります。

  • 型: boolean
  • デフォルト: true
  • 重要度: 高
column.exclude.list

変更イベントから除外する列と一致する正規表現。

  • 型: list
  • 重要度: 中

接続の詳細(Connection details)

poll.interval.ms

各反復処理中にコネクターが新しい変更イベントの出現を待機するミリ秒数を指定する正の整数値。デフォルトは 1000 ミリ秒(1 秒)です。

  • 型: int
  • デフォルト: 1000(1 秒)
  • 指定可能な値: [1,...]
  • 重要度: 低
max.batch.size

このコネクターの各反復処理中に処理されるイベントの各バッチの最大サイズを指定する正の整数値。

  • 型: int
  • デフォルト: 1000
  • 指定可能な値: [1,...,5000]
  • 重要度: 低
event.processing.failure.handling.mode

binlog イベントの処理中にコネクターが例外に対応する方法を指定します。

  • 型: string
  • デフォルト: fail
  • 指定可能な値: fail、skip、warn
  • 重要度: 低
heartbeat.interval.ms

コネクターがハートビートメッセージを Kafka トピックに送信する頻度を制御します。デフォルト値 0 の動作では、コネクターはハートビートメッセージを送信しません。

  • 型: int
  • デフォルト: 0
  • 指定可能な値: [0,...]
  • 重要度: 低
database.history.skip.unparseable.ddl

形式に誤りがある、または不明なデータベースステートメントが見つかった場合の対応を指定するブール値。コネクターで、それらを無視する(true)か、人間が問題を修正できるように処理を停止する(false)かを指定します。デフォルトは false です。解析不能なステートメントを無視するには、これを true に設定することを検討してください。

  • 型: boolean
  • デフォルト: false
  • 重要度: 低

コネクターの詳細(Connector details)

provide.transaction.metadata

トランザクションメタデータの情報を専用トピックに保存し、イベントのカウントとともにトランザクションメタデータを抽出できるようにします。

  • 型: boolean
  • デフォルト: false
  • 重要度: 低
decimal.handling.mode

変更イベントでの DECIMAL 列と NUMERIC 列の表現方法を指定します。「precise」(デフォルト)の場合、java.math.BigDecimal を使用して値が表現されます。この値は変更イベントでバイナリ表現と Kafka Connect の「org.apache.kafka.connect.data.Decimal」型を使用してエンコードされます。「string」の場合、文字列を使用して値が表現されます。「double」の場合、Java の「double」を使用して値が表現されます。この場合、精度が低下する可能性がありますが、コンシューマーでの使いやすさは大幅に向上します。

  • 型: string
  • デフォルト: precise
  • 指定可能な値: double、precise、string
  • 重要度: 中
binary.handling.mode

変更イベントでのバイナリ列(blob、binary など)の表現方法を指定します。「bytes」(デフォルト)の場合、バイナリデータがバイト配列で表現されます。「base64」の場合、バイナリデータが base64 でエンコードされた文字列で表現されます。「hex」の場合、バイナリデータが 16 進エンコードされた(base16)文字列で表現されます。

  • 型: string
  • デフォルト: bytes
  • 指定可能な値: base64、bytes、hex
  • 重要度: 低
time.precision.mode

時間、日付、タイムスタンプは、さまざまな種類の精度で表現することができます。「adaptive」(デフォルト値)の場合、時刻、日付、およびタイムスタンプの値の精度が、データベース列の精度に基づいて決定されます。「adaptive_time_microseconds」は、「adaptive」モードに似ていますが、TIME フィールドで常にマイクロ秒の精度が使用されます。「connect」の場合、時刻、日付、タイムスタンプ値の表現には常に、Kafka Connect に組み込まれている Time、Date、および Timestamp の表現が使用されます。つまり、データベースの列の精度に関係なく、ミリ秒の精度が使用されます。

  • 型: string
  • デフォルト: adaptive
  • 指定可能な値: adaptive、adaptive_time_microseconds、connect
  • 重要度: 中
cleanup.policy

トピッククリーンアップポリシーを設定します。

  • 型: string
  • デフォルト: delete
  • 指定可能な値: compact、delete
  • 重要度: 中

出力メッセージ(Output messages)

output.data.format

Kafka 出力レコード値のフォーマットを設定します。指定可能なエントリは、AVRO、JSON_SR、PROTOBUF、または JSON です。スキーマベースのメッセージフォーマット(AVRO、JSON_SR、PROTOBUF など)を使用する場合は、Confluent Cloud Schema Registry を構成しておく必要がある点に注意してください。

  • 型: string
  • 重要度: 高
output.key.format

Kafka 出力レコードキーのフォーマットを設定します。指定可能なエントリは、AVRO、JSON_SR、PROTOBUF、STRING、または JSON です。スキーマベースのメッセージフォーマット(AVRO、JSON_SR、PROTOBUF など)を使用する場合は、Confluent Cloud Schema Registry を構成しておく必要がある点に注意してください

  • 型: string
  • デフォルト: JSON
  • 指定可能な値: AVRO、JSON、JSON_SR、PROTOBUF、STRING
  • 重要度: 高
after.state.only

生成された Kafka レコードに変更イベント適用後のステートのみを含めるかどうかを制御します。

  • 型: boolean
  • デフォルト: true
  • 重要度: 低
json.output.decimal.format

次の 2 つのリテラル値を取る Connect DECIMAL 論理型の値の JSON/JSON_SR シリアル化フォーマットを指定します。

BASE64 では、DECIMAL 論理型を base64 でエンコードされたバイナリデータとしてシリアル化します。

NUMERIC では、JSON/JSON_SR の Connect DECIMAL 論理型の値を、10 進数の値を表す数字としてシリアル化します。

  • 型: string
  • デフォルト: BASE64
  • 重要度: 低

このコネクターのタスク数(Number of tasks for this connector)

tasks.max
  • 型: int
  • 指定可能な値: [1,...,1]
  • 重要度: 高

次のステップ

参考

フルマネージド型の Confluent Cloud コネクターが Confluent Cloud ksqlDB でどのように動作するかを示す例については、「Cloud ETL のデモ」を参照してください。この例では、Confluent CLI を使用して Confluent Cloud のリソースを管理する方法についても説明しています。

../_images/topology.ja.png