重要

このページの日本語コンテンツは古くなっている可能性があります。最新の英語版コンテンツをご覧になるには、こちらをクリックしてください。

PostgreSQL CDC Source Connector (Debezium) for Confluent Cloud

注釈

Confluent Platform 用にコネクターをローカルにインストールする場合は、「Debezium PostgreSQL Source Connector for Confluent Platform」を参照してください。

Kafka Connect PostgreSQL Change Data Capture (CDC) Source Connector (Debezium) for Confluent Cloud は、PostgreSQL データベースの既存データのスナップショットを取得し、そのデータに対して後続の行レベルの変更をすべてモニタリングして記録することができます。このコネクターは、Avro、JSON スキーマ、Protobuf、または JSON(スキーマレス)の出力データフォーマットをサポートします。テーブルごとにすべてのイベントが個別の Apache Kafka® トピックに記録されます。そのため、アプリケーションやサービスでイベントを簡単に消費することができます。

注釈

詳細については、Debezium のドキュメント を参照してください。

機能

PostgreSQL CDC Source Connector (Debezium) には、以下の機能が備わっています。

  • トピックの自動作成: このコネクターは、命名規則 <database.server.name>.<schemaName>.<tableName> を使用して自動的に Kafka トピックを作成します。テーブルの作成には topic.creation.default.partitions=1 プロパティおよび topic.creation.default.replication.factor=3 プロパティが使用されます。詳細については、「最大メッセージサイズ」を参照してください。
  • 論理デコードプラグインのサポート: wal2jsonwal2json_rdswal2json_streamingwal2json_rds_streamingpgoutputdecoderbufs。デフォルトでは pgoutput に設定されています。
  • データベースの認証: パスワード認証を使用します。
  • SSL のサポート: 一方向 SSL をサポートします。
  • 出力データフォーマット: このコネクターでは、Kafka 出力レコード値フォーマット として Avro、JSON スキーマ、Protobuf、JSON(スキーマレス)がサポートされます。また、出力レコードキーフォーマット として Avro、JSON スキーマ、Protobuf、JSON(スキーマレス)、String がサポートされます。スキーマレジストリ ベースのフォーマット(Avro、JSON_SR(JSON スキーマ)、Protobuf など)を使用するには、Schema Registry を有効にしておく必要があります。詳細については、「スキーマレジストリ Enabled Environments」を参照してください。
  • コネクターあたりのタスク数: 組織では複数のコネクターを実行できますが、コネクターあたり 1 つのタスク("tasks.max": "1")という制限があります。
  • 厳選された構成プロパティ:
    • 対象に含めるテーブル対象から除外するテーブル: テーブルの変更をモニタリングするかどうかを設定できます。デフォルトでは、コネクターはシステムテーブル以外のテーブルをすべてモニタリングします。
    • スナップショットモード: スナップショットを実行する条件を指定できます。
    • 削除時のトゥームストーン: delete イベントの後に tombstone イベントを生成するかどうかを構成できます。デフォルトは true です。
    • その他の構成プロパティ:
      • poll.interval.ms
      • max.batch.size
      • max.queue.size

Connect 用の Confluent Cloud API の使用に関する詳細とサンプルについては、「Confluent Cloud API for Connect」セクションを参照してください。

制限

以下の情報を確認してください。

最大メッセージサイズ

このコネクターはトピックを自動的に作成します。トピックの作成時に、内部コネクター構成プロパティ max.message.size が次のように設定されます。

  • ベーシッククラスター: 2 MB
  • スタンダードクラスター: 2 MB
  • 専用クラスター: 20 MB

Confluent Cloud クラスターの詳細については、「クラスタータイプごとの Confluent Cloud の機能と制限」を参照してください。

クイックスタート

このクイックスタートを使用して、Confluent Cloud の PostgreSQL CDC ソース(Debezium)コネクターの利用を開始することができます。このクイックスタートでは、コネクターを選択してから、PostgreSQL データベースの既存データのスナップショットを取得して、それ以降に発生する行レベルの変更をすべてモニタリングして記録するようにコネクターを構成するための基本的な方法について説明します。

前提条件
  • アマゾンウェブサービス (AWS)、Microsoft Azure (Azure)、または Google Cloud Platform (GCP)上の Confluent Cloud クラスターへのアクセスを許可されていること。

  • Confluent CLI がインストールされ、クラスター用に構成されていること。「Confluent CLI のインストール」を参照してください。

  • スキーマレジストリ ベースのフォーマット(Avro、JSON_SR(JSON スキーマ)、Protobuf など)を使用するには、Schema Registry を有効にしておく必要があります。詳細については、「スキーマレジストリ Enabled Environments」を参照してください。

  • Azure では基本データベースは使用できません。汎用の PostgreSQL データベースまたはメモリーが最適化された PostgreSQL データベースを使用する必要があります。

  • PostgreSQL データベースに CDC を構成しておく必要があります。詳細については、『PostgreSQL in the Cloud』を参照してください。

  • Azure Virtual Network のクライアントは、デフォルトではサーバーへのアクセスが許可されません。Azure Virtual Network が正しく構成され、Azure サービスへのアクセスを許可 が有効になっていることを確認してください。

  • データベースにパブリックアクセスが必要になる場合があります。詳細については、「ネットワークアクセス」を参照してください。以下の例は、PostgreSQL データベースをセットアップするときの AWS 管理コンソールを示しています。

    AWS での PostgreSQL へのパブリックアクセスの例

    パブリックアクセスを有効にする

  • プロパティ rds.logical_replication=1 を含むパラメーターグループが必要です。以下に例を示します。作成したら、データベースを再起動 する必要があります。

    パラメーターグループ

    パラメーターグループ

    RDS 論理レプリケーション

    RDS 論理レプリケーション

  • ネットワークに関する考慮事項については、「Networking and DNS Considerations」を参照してください。静的なエグレス IP を使用する方法については、「静的なエグレス IP アドレス」を参照してください。以下の例は、VPC にセキュリティグループルールをセットアップするときの AWS 管理コンソールを示しています。

    AWS でのセキュリティグループルールの例

    インバウンドトラフィックを開く

    注釈

    VPC にセキュリティルールを構成する方法については、ご使用のクラウドプラットフォームのドキュメントを参照してください。

  • Kafka クラスターの認証情報。次のいずれかの方法で認証情報を指定できます。
    • 既存の サービスアカウント のリソース ID を入力する。
    • コネクター用の Confluent Cloud サービスアカウント を作成します。「サービスアカウント」のドキュメントで、必要な ACL エントリを確認してください。一部のコネクターには固有の ACL 要件があります。
    • Confluent Cloud の API キーとシークレットを作成する。キーとシークレットを作成するには、confluent api-key create を使用するか、コネクターのセットアップ時に Cloud Console で直接 API キーとシークレットを自動生成します。

Confluent Cloud Console の使用

ステップ 1: Confluent Cloud クラスターを起動します。

インストール手順については、「Quick Start for Confluent Cloud」を参照してください。

ステップ 2: コネクターを追加します。

左のナビゲーションメニューの Data integration をクリックし、Connectors をクリックします。クラスター内に既にコネクターがある場合は、+ Add connector をクリックします。

ステップ 3: コネクターを選択します。

Click the PostgreSQL CDC Source connector card.

PostgreSQL Source Connector Card

ステップ 4: 接続をセットアップします。

以下を実行して、Continue をクリックします。

注釈

  • すべての 前提条件 を満たしていることを確認してください。
  • アスタリスク ( * ) は必須項目であることを示しています。
  1. コネクターの 名前 を入力します。

  2. Kafka Cluster credentials で Kafka クラスターの認証情報の指定方法を選択します。サービスアカウントのリソース ID を選択するか、API キーとシークレットを入力できます(または、Cloud Console でこれらを生成します)。

  3. データベース接続の詳細情報を追加します。

    重要

    接続の hostname プロパティには jdbc:xxxx:// を含めないでください。以下にホストアドレスの例を示します。

    ../_images/ccloud-postgresql-source-connect-to-data.png

    SSL mode を選択しない場合は、disable がデフォルトオプションとして使用されます。require が選択された場合、コネクターはセキュアな(暗号化された)接続を使用します。セキュアな接続を確立できない場合、コネクターは失敗します。このモードでは、認証機関(CA)検証は行われません。

  4. Database details に情報を追加します(省略可)。

    • Tables included: このコネクターでモニタリングするテーブルの完全修飾識別子をコンマ区切りにしたリストを入力します。デフォルトでは、コネクターはシステムテーブル以外のテーブルを "すべて" モニタリングします。完全修飾テーブル名の形式は、schemaName.tableName です。このプロパティを Tables excluded プロパティとともに使用することはできません。
    • Tables excluded: このコネクターで "無視" するテーブルの完全修飾識別子をコンマ区切りにしたリストを入力します。完全修飾テーブル名のフォームは、schemaName.tableName です。このプロパティは、プロパティ Tables included とともに使用できません。
    • Snapshot mode: コネクターの起動時にデータベーススナップショットを実行する条件を指定します。
      • デフォルトの設定は、initial です。選択すると、コネクターは、キャプチャー対象テーブルの構造およびデータのスナップショットを取得します。これは、コネクターの起動時にキャプチャー対象テーブルの完全なデータ表現をトピックに取り込む必要がある場合に便利です。
      • never は、コネクターでスナップショットを一切実行しないこと、また、コネクターを最初に起動するときには前回終了した位置から読み取りを開始することを指定します。
      • exported は、データベーススナップショットが、レプリケーションスロットが作成された時点に基づくことを指定します。これは "lock-free" スナップショットを実行する場合に適しています(『Snapshot isolation』を参照)。
    • Propagate Source Types by Data Type: データベース固有の データ型 に一致する正規表現のコンマ区切りリストを入力します。このプロパティでは、出力された変更レコードの対応するフィールドスキーマに、データ型の元の型と元の長さが(パラメーターとして)追加されます。
    • Tombstones on delete: delete イベントの後に tombstone イベントを生成するかどうかを設定します。デフォルト値は true です。
    • Columns Excluded: 変更イベントレコードの値から除外する列の完全修飾名に一致する正規表現のコンマ区切りリストを入力します。列の完全修飾名は、databaseName.tableName.columnName という形式になります。
    • Plugin name: 使用するプラグインを選択します。オプションは、wal2jsonwal2json_rdswal2json_streamingwal2json_rds_streamingpgoutput、および decoderbufs です。デフォルトは pgoutput です。詳細については、PostgreSQL logical decoding plugin の説明を参照してください。
    • Slot name: プラグインからの変更およびデータベースの変更のストリーミング用に作成された PostgreSQL 論理デコードスロットの名前を入力します。スロット名に使用できる文字は、英小文字、数字、アンダースコアのみです。デフォルト値は debezium です。
  5. Connection details に情報を追加します(省略可)。

    • Poll interval (ms): 各反復でコネクターが新しい変更イベントの出現を待機するミリ秒数を入力します。デフォルト値は 1000 ミリ秒(1 秒)です。
    • Max batch size: 各反復の最後にコネクターがバッチで処理するイベントの最大数を入力します。デフォルト値は 1000 件です。
    • Event processing failure handling mode: binlog イベントを処理する際の例外へのコネクターによる対応方法を指定します。デフォルト値は fail です。イベントをスキップする場合や警告を発行する場合は、それぞれ skip または warn を選択します。
    • Heartbeat interval (ms): コネクターが Kafka トピックに送信するハートビートメッセージの間隔をミリ秒単位で設定します。デフォルト値は 0 で、コネクターがハートビートメッセージを送信しないことを示します。
    • Heartbeat action query: コネクターがハートビートメッセージを送信する際に、コネクターがソースデータベースで実行するクエリを追加します。詳細については、Debezium のドキュメント を参照してください。
  6. Connector details に情報を追加します(省略可)。

    • Provide transaction metadata: トランザクションメタデータ を有効にするかどうかを選択します。トランザクションメタデータは、専用の Kafka トピックに保存されます。デフォルトは false です。
    • Decimal handling mode: 変更イベントでの DECIMAL 列と NUMERIC 列の表現方法を指定します。precise (デフォルト値)を指定すると、値の表現に java.math.BigDecimal が使用されます。変更イベントでは、バイナリ表現と Connect の org.apache.kafka.connect.data.Decimal データ型を使用して値がエンコードされます。string 型を使用して値を表すには、string を選択します。double を選択すると、Java の double データ型を使用して値が表現されます。double では精度が低下する可能性がありますが、コンシューマーでの使いやすさは大幅に向上します。
    • Binary handling mode: 変更イベントでのバイナリ(blob、binary)列の表現方法を指定します。バイナリデータをバイト配列形式で表現するには、bytes (デフォルト値)を選択します。バイナリデータを base64 でエンコードされた文字列形式で表現するには、base64 を選択します。バイナリデータを 16 進(base16)でエンコードされた文字列形式で表現するには、hex を選択します。
    • Time precision mode: 時間、日付、タイムスタンプは、さまざまな種類の精度で表現できます。時刻、日付、およびタイムスタンプの値の精度をデータベース列の精度に基づいて決定するには、adaptive (デフォルト値)を選択します。adaptive_time_microseconds は、TIME フィールドで常にマイクロ秒の精度が使用される点を除き、基本的に adaptive モードと同じです。connect: 時刻、日付、タイムスタンプ値の表現には常に、Connect に組み込まれている Time、Date、および Timestamp の表現が使用されます。connect を選択した場合は、データベース列に使用されている精度に関係なく、精度としてミリ秒が使用されます。詳細については、「Temporal types」を参照してください。
    • Topic cleanup policy: トピック保持クリーンアップポリシー を設定します。古いトピックを破棄するには、delete (デフォルト値)を選択します。トピックでの ログ圧縮 を有効にするには、compact を選択します。
    • HStore handling mode: 変更イベントでの HSTORE 列の表現方法を指定します。これらの列を JSON として表すには、json (デフォルト)を選択します。これらの列を map データとして表すには、map を選択します。
    • Interval handling mode: 変更イベントでの INTERVAL 列の表現方法を指定します。これらの列を numeric データとして表すには、numeric (デフォルト)を選択します。これらの列を string データとして表すには、string を選択します。
  7. 以下の Output プロパティについて、値を選択します。

    • Output Kafka record value フォーマット: コネクターから送られるデータのフォーマットを AVRO、JSON(スキーマレス)、JSON_SR(JSON スキーマ)、または PROTOBUF から選択します。スキーマベースのレコードフォーマット(Avro、JSON_SR(JSON スキーマ)、Protobuf など)を使用するには、有効なスキーマが Schema Registry に存在する必要があります。

    • After-state only:(省略可)デフォルトでは true となり、Kafka レコードは、適用された変更イベントからのレコードステートのみを持ちます。false を選択すると、変更イベントの適用後も以前のレコードステートが維持されます。詳細については、「After-state only の出力制限」を参照してください。

    • Output Kafka record key format: コネクターから送られるデータのフォーマットを AVRO、JSON(スキーマレス)、JSON_SR(JSON スキーマ)、PROTOBUF、または String から選択します。スキーマベースのレコードフォーマット(Avro、JSON_SR(JSON スキーマ)、Protobuf など)を使用するには、有効なスキーマが Schema Registry に存在する必要があります。

    • JSON output decimal format:(省略可)デフォルトでは BASE64 です。

      JSON output decimal format プロパティ
  8. このコネクターで使用する タスク の数を入力します。組織では複数のコネクターを実行できますが、コネクターあたり 1 つのタスク("tasks.max": "1")という制限があります。

  9. Transforms and Predicates: 詳細については、Single Message Transforms(SMT) のドキュメントを参照してください。

すべてのプロパティの値と定義については、「構成プロパティ」を参照してください。

ステップ 5: コネクターを起動します。

接続の詳細情報を確認し、Launch をクリックします。

ステップ 6: コネクターのステータスを確認します。

コネクターのステータスが Provisioning から Running に変わります。ステータスが変わるまで数分かかる場合があります。

ステップ 7: Kafka トピックを確認します。

コネクターが実行中になったら、メッセージが Kafka トピックに取り込まれていることを確認します。

Connect 用の Confluent Cloud API の使用に関する詳細とサンプルについては、「Confluent Cloud API for Connect」セクションを参照してください。

参考

フルマネージド型の Confluent Cloud コネクターが Confluent Cloud ksqlDB でどのように動作するかを示す例については、「Cloud ETL のデモ」を参照してください。この例では、Confluent CLI を使用して Confluent Cloud のリソースを管理する方法についても説明しています。

../_images/topology.ja.png

Confluent CLI の使用

以下の手順に従うと、Confluent CLI を使用してコネクターをセットアップし、実行できます。

注釈

ステップ 1: 使用可能なコネクターをリスト表示します。

以下のコマンドを入力して、使用可能なコネクターをリスト表示します。

confluent connect plugin list

ステップ 2: コネクターの必須の構成プロパティを表示します。

以下のコマンドを実行して、コネクターの必須プロパティを表示します。

confluent connect plugin describe <connector-catalog-name>

例:

confluent connect plugin describe PostgresCdcSource

出力例:

Following are the required configs:
connector.class: PostgresCdcSource
name
kafka.auth.mode
kafka.api.key
kafka.api.secret
database.hostname
database.user
database.dbname
database.server.name
output.data.format
tasks.max

ステップ 3: コネクターの構成ファイルを作成します。

コネクター構成プロパティを含む JSON ファイルを作成します。以下の例は、コネクターの必須プロパティを示しています。

{
  "connector.class": "PostgresCdcSource",
  "name": "PostgresCdcSourceConnector_0",
  "kafka.auth.mode": "KAFKA_API_KEY",
  "kafka.api.key": "****************",
  "kafka.api.secret": "****************************************************************",
  "database.hostname": "debezium-1.<host-id>.us-east-2.rds.amazonaws.com",
  "database.port": "5432",
  "database.user": "postgres",
  "database.password": "**************",
  "database.dbname": "postgres",
  "database.server.name": "cdc",
  "table.include.list":"public.passengers",
  "plugin.name": "pgoutput",
  "output.data.format": "JSON",
  "tasks.max": "1"
}

以下のプロパティ定義にご注意ください。

  • "connector.class": コネクターのプラグイン名を指定します。
  • "name": 新しいコネクターの名前を設定します。
  • "kafka.auth.mode": 使用するコネクターの認証モードを指定します。オプションは SERVICE_ACCOUNT または KAFKA_API_KEY (デフォルト)です。API キーとシークレットを使用するには、構成プロパティ kafka.api.keykafka.api.secret を構成例(前述)のように指定します。サービスアカウント を使用するには、プロパティ kafka.service.account.id=<service-account-resource-ID>リソース ID を指定します。使用できるサービスアカウントのリソース ID のリストを表示するには、次のコマンドを使用します。

    confluent iam service-account list
    

    例:

    confluent iam service-account list
    
       Id     | Resource ID |       Name        |    Description
    +---------+-------------+-------------------+-------------------
       123456 | sa-l1r23m   | sa-1              | Service account 1
       789101 | sa-l4d56p   | sa-2              | Service account 2
    
  • "table.whitelist":(オプション)コネクターでモニタリングするテーブルの完全修飾識別子をコンマ区切りにしたリストを入力します。デフォルトでは、コネクターはシステムテーブル以外のテーブルを "すべて" モニタリングします。完全修飾テーブル名のフォームは、schemaName.tableName です。

  • "database.sslmode": 入力しない場合は、disable がデフォルトオプションとして使用されます。"database.sslmode" : "require" を入力した場合、コネクターではセキュアな(暗号化された)接続が使用されます。セキュアな接続を確立できない場合、コネクターは失敗します。このモードでは、認証機関(CA)検証は行われません。

  • "output.data.format": 出力レコードフォーマット(コネクターから送られるデータ)を設定します。指定可能なエントリは、AVROJSON_SRPROTOBUF、または JSON です。スキーマベースのレコードフォーマット(Avro、JSON_SR(JSON スキーマ)、Protobuf など)を使用するには、Confluent Cloud Schema Registry を構成しておく必要があります。

  • after.state.only:(省略可)デフォルトでは true となり、Kafka レコードは、適用された変更イベントからのレコードステートのみを持ちます。false と入力すると、変更イベントの適用後も以前のレコードステートが維持されます。詳細については、「After-state only の出力制限」を参照してください。

  • "json.output.decimal.format":(省略可)デフォルトでは BASE64 です。次の 2 つのリテラル値を取る Connect DECIMAL 論理型の値の JSON/JSON_SR シリアル化フォーマットを指定します。

    • BASE64 では、DECIMAL 論理型を base64 でエンコードされたバイナリデータとしてシリアル化します。
    • NUMERIC では、JSON または JSON_SR の Connect DECIMAL 論理型の値を、10 進数の値を表す数字としてシリアル化します。
  • "column.exclude.list":(省略可能)変更イベントレコードの値から除外する列の完全修飾名に一致する正規表現のリスト。コンマ区切りで指定します。列の完全修飾名は、databaseName.tableName.columnName という形式になります。

  • "plugin.name":(オプション)使用するプラグインを選択します。選択肢は、wal2jsonwal2json_rdswal2json_streamingwal2json_rds_streamingpgoutput、および decoderbufs です。デフォルトは pgoutput です。

  • "slot.name":(省略可能)プラグインからの変更、およびデータベースの変更のストリーミング用に作成された PostgreSQL 論理デコードスロットの名前。スロット名に使用できる文字は、英小文字、数字、アンダースコアのみです。デフォルト値は debezium です。

  • "snapshot.mode":(省略可能)コネクターの起動時にデータベーススナップショットを実行する条件を指定します。

    • デフォルトの設定は、initial です。選択すると、コネクターは、キャプチャー対象テーブルの構造およびデータのスナップショットを取得します。これは、コネクターの起動時にキャプチャー対象テーブルの完全なデータ表現をトピックに取り込む必要がある場合に便利です。
    • never は、コネクターでスナップショットを一切実行しないこと、また、コネクターを最初に起動するときには前回終了した位置から読み取りを開始することを指定します。
    • exported は、データベーススナップショットが、レプリケーションスロットが作成された時点に基づくことを指定します。これは "lock-free" スナップショットを実行する場合に適しています(『Snapshot isolation』を参照)。
  • "tasks.max": このコネクターで使用する タスク の数を入力します。組織では複数のコネクターを実行できますが、コネクターあたり 1 つのタスク("tasks.max": "1")という制限があります。

Single Message Transforms: CLI を使用する SMT の追加の詳細については、Single Message Transforms(SMT) のドキュメントを参照してください。

すべてのプロパティの値と定義については、「構成プロパティ」を参照してください。

ステップ 4: プロパティファイルを読み込み、コネクターを作成します。

以下のコマンドを入力して、構成を読み込み、コネクターを起動します。

confluent connect create --config <file-name>.json

例:

confluent connect create --config postgresql-cdc-source.json

出力例:

Created connector PostgresCdcSourceConnector_0 lcc-ix4dl

ステップ 5: コネクターのステータスを確認します。

以下のコマンドを入力して、コネクターのステータスを確認します。

confluent connect list

出力例:

ID          |            Name              | Status  |  Type
+-----------+------------------------------+---------+-------+
lcc-ix4dl   | PostgresCdcSourceConnector_0 | RUNNING | source

ステップ 6: Kafka トピックを確認します。

コネクターが実行中になったら、メッセージが Kafka トピックに取り込まれていることを確認します。

Connect 用の Confluent Cloud API の使用に関する詳細とサンプルについては、「Confluent Cloud API for Connect」セクションを参照してください。

構成プロパティ

このコネクターでは、以下のコネクター構成プロパティを使用します。

データへの接続方法(How should we connect to your data?)

name

コネクターの名前を設定します。

  • 型: string
  • 指定可能な値: 最大 64 文字の文字列
  • 重要度: 高

Kafka クラスターの認証情報(Kafka Cluster credentials)

kafka.auth.mode

Kafka の認証モード。KAFKA_API_KEY または SERVICE_ACCOUNT を指定できます。デフォルトは KAFKA_API_KEY モードです。

  • 型: string
  • デフォルト: KAFKA_API_KEY
  • 指定可能な値: KAFKA_API_KEY、SERVICE_ACCOUNT
  • 重要度: 高
kafka.api.key
  • 型: password
  • 重要度: 高
kafka.service.account.id

Kafka クラスターとの通信用の API キーを生成するために使用されるサービスアカウント。

  • 型: string
  • 重要度: 高
kafka.api.secret
  • 型: password
  • 重要度: 高

データベースへの接続方法(How should we connect to your database?)

database.hostname

PostgreSQL サーバーのアドレス。

  • 型: string
  • 重要度: 高
database.port

PostgreSQL サーバーのポート番号。

  • 型: int
  • 指定可能な値: [0,...,65535]
  • 重要度: 高
database.user

必要な認可を持つ PostgreSQL ユーザーの名前。

  • 型: string
  • 重要度: 高
database.password

必要な認可を持つ PostgreSQL ユーザーのパスワード。

  • 型: password
  • 重要度: 高
database.dbname

接続先の PostgreSQL データベースの名前。

  • 型: string
  • 重要度: 高
database.server.name

PostgreSQL サーバー/クラスターの論理名。この論理名は 1 つの名前空間を形成し、すべての Kafka トピック名および Kafka Connect スキーマ名で使用されます。また、Avro データフォーマットが使用される場合、論理名は、対応する Avro スキーマの名前空間でも使用されます。Kafka トピックは、プレフィックス database.server.name を付加して作成する必要があります(そのように作成されます)。英数字、アンダースコア、ハイフン、ドットのみを使用できます。

  • 型: string
  • 重要度: 高
database.sslmode

データベースへの接続に使用する必要がある SSL モードを指定します。require の場合、暗号化された接続を確立します。なんらかの理由で暗号化された接続を確立できない場合はエラーになります。データベースサーバーで SSL が必須になっている場合は、require を使用してください。disable の場合、非暗号化接続を使用します。

  • 型: string
  • デフォルト: disable
  • 重要度: 低

データベースの詳細(Database details)

signal.data.collection

Fully-qualified name of the data collection that needs to be used to send signals to the connector. Use the following format to specify the fully-qualified collection name: schemaName.tableName

  • 型: string
  • 重要度: 中
publication.autocreate.mode

Determines how creation of a publication should work when using pgoutput. Possible options are: all_tables, disabled, and filtered. Check the documentation for more details on each option.

  • 型: string
  • Default: all_tables
  • Valid Values: all_tables, disabled, filtered
  • 重要度: 中
publication.name

The name of the PostgreSQL publication created for streaming changes when using pgoutput

  • 型: string
  • Default: dbz_publication
  • Valid Values: Must match the regex ^[^\s\"\'\`]+$
  • 重要度: 中
table.include.list

モニタリング対象のテーブルの完全修飾識別子と一致する文字列のコンマ区切りのリスト(省略可)。この構成プロパティに含まれていないテーブルはすべて、モニタリング対象から除外されます。各識別子は、schemaName.tableName という形式になります。デフォルトでは、コネクターはモニタリング対象の各スキーマのすべての非システムテーブルをモニタリングします。"Table excluded" と一緒に使用することはできません。

  • 型: list
  • 重要度: 中
table.exclude.list

モニタリング対象から除外するテーブルの完全修飾識別子と一致する文字列のコンマ区切りのリスト(省略可)。この構成プロパティに含まれていないテーブルはすべて、モニタリングの対象になります。各識別子は、schemaName.tableName という形式になります。"Table included" と一緒に使用することはできません。

  • 型: list
  • 重要度: 中
snapshot.mode

コネクターの起動時にスナップショットを実行する条件を指定します。デフォルトの設定は initial であり、この設定では、論理サーバー名で記録されたオフセットがない場合にのみコネクターがスナップショットを実行できます。never オプションを指定すると、コネクターでスナップショットが一切使用されません。また、論理サーバー名を指定してコネクターが起動された場合、コネクターは、論理レプリケーションスロットの仕組みに基づいて、前回終了した位置(前回の LSN の位置)から読み取るか、または先頭から読み取りを開始します。exported オプションを指定すると、レプリケーションスロットが作成された時点に基づいてデータベースのスナップショットが実行されます。これは "lock-free" でスナップショットを実行するのに適しています。

  • 型: string
  • デフォルト: initial
  • 重要度: 低
datatype.propagate.source.type

データベース固有のデータ型名と一致する正規表現のコンマ区切りのリスト。出力された変更レコードの対応するフィールドスキーマに、データ型の元の型と元の長さがパラメーターとして追加されます。

  • 型: list
  • 重要度: 低
tombstones.on.delete

delete イベントの後に tombstone イベントを生成するかどうかを制御します。true に設定すると、削除操作は delete イベント、およびそれに続く tombstone イベントで表されます。false に設定すると、delete イベントだけが送信されます。tombstone イベントの実行(デフォルトの動作)により、Kafka はソースレコードが削除されてから特定のキーに関連したすべてのイベントを完全に削除できるようになります。

  • 型: boolean
  • デフォルト: true
  • 重要度: 高
column.exclude.list

変更イベントから除外する列と一致する正規表現。

  • 型: list
  • 重要度: 中
plugin.name

サーバーにインストールされている Postgres 論理デコードプラグインの名前。注: pgoutput は PostgreSQL 10 以降でのみ使用できます。

  • 型: string
  • デフォルト: pgoutput
  • 重要度: 高
slot.name

特定のプラグインからの変更のストリーミング、および特定のデータベース/スキーマ用に作成された PostgreSQL 論理デコードスロットの名前。サーバーはコネクターへのイベントのストリーミングにこのスロットを使用します。

  • 型: string
  • デフォルト: debezium
  • 指定可能な値: 正規表現 ^[a-z0-9_]+$ に一致することが必要
  • 重要度: 低

接続の詳細(Connection details)

poll.interval.ms

各反復処理中にコネクターが新しい変更イベントの出現を待機するミリ秒数を指定する正の整数値。デフォルトは 1000 ミリ秒(1 秒)です。

  • 型: int
  • デフォルト: 1000(1 秒)
  • 指定可能な値: [1,...]
  • 重要度: 低
max.batch.size

このコネクターの各反復処理中に処理されるイベントの各バッチの最大サイズを指定する正の整数値。

  • 型: int
  • デフォルト: 1000
  • 指定可能な値: [1,...,5000]
  • 重要度: 低
event.processing.failure.handling.mode

binlog イベントの処理中にコネクターが例外に対応する方法を指定します。

  • 型: string
  • デフォルト: fail
  • 指定可能な値: fail、skip、warn
  • 重要度: 低
heartbeat.interval.ms

コネクターがハートビートメッセージを Kafka トピックに送信する頻度を制御します。デフォルト値 0 の動作では、コネクターはハートビートメッセージを送信しません。

  • 型: int
  • デフォルト: 0
  • 指定可能な値: [0,...]
  • 重要度: 低
heartbeat.action.query

コネクターがハートビートメッセージを送信する際に、コネクターがソースデータベースに対して実行するクエリを指定します。

  • 型: string
  • 重要度: 低

コネクターの詳細(Connector details)

provide.transaction.metadata

トランザクションメタデータの情報を専用トピックに保存し、イベントのカウントとともにトランザクションメタデータを抽出できるようにします。

  • 型: boolean
  • デフォルト: false
  • 重要度: 低
decimal.handling.mode

変更イベントでの DECIMAL 列と NUMERIC 列の表現方法を指定します。「precise」(デフォルト)の場合、java.math.BigDecimal を使用して値が表現されます。この値は変更イベントでバイナリ表現と Kafka Connect の「org.apache.kafka.connect.data.Decimal」型を使用してエンコードされます。「string」の場合、文字列を使用して値が表現されます。「double」の場合、Java の「double」を使用して値が表現されます。この場合、精度が低下する可能性がありますが、コンシューマーでの使いやすさは大幅に向上します。

  • 型: string
  • デフォルト: precise
  • 指定可能な値: double、precise、string
  • 重要度: 中
binary.handling.mode

変更イベントでのバイナリ列(blob、binary など)の表現方法を指定します。「bytes」(デフォルト)の場合、バイナリデータがバイト配列で表現されます。「base64」の場合、バイナリデータが base64 でエンコードされた文字列で表現されます。「hex」の場合、バイナリデータが 16 進エンコードされた(base16)文字列で表現されます。

  • 型: string
  • デフォルト: bytes
  • 指定可能な値: base64、bytes、hex
  • 重要度: 低
time.precision.mode

時間、日付、タイムスタンプは、さまざまな種類の精度で表現することができます。「adaptive」(デフォルト値)の場合、時刻、日付、およびタイムスタンプの値の精度が、データベース列の精度に基づいて決定されます。「adaptive_time_microseconds」は、「adaptive」モードに似ていますが、TIME フィールドで常にマイクロ秒の精度が使用されます。「connect」の場合、時刻、日付、タイムスタンプ値の表現には常に、Kafka Connect に組み込まれている Time、Date、および Timestamp の表現が使用されます。つまり、データベースの列の精度に関係なく、ミリ秒の精度が使用されます。

  • 型: string
  • デフォルト: adaptive
  • 指定可能な値: adaptive、adaptive_time_microseconds、connect
  • 重要度: 中
cleanup.policy

トピッククリーンアップポリシーを設定します。

  • 型: string
  • デフォルト: delete
  • 指定可能な値: compact、delete
  • 重要度: 中
hstore.handling.mode

変更イベントでの HSTORE 列の表現方法を指定します。

  • 型: string
  • デフォルト: json
  • 指定可能な値: json、map
  • 重要度: 中
interval.handling.mode

変更イベントでの INTERVAL 列の表現方法を指定します。

  • 型: string
  • デフォルト: numeric
  • 指定可能な値: numeric、string
  • 重要度: 中
schema.refresh.mode

テーブルのインメモリースキーマの更新をトリガーする条件を指定します。columns_diff`(デフォルト)が最も安全なモードであり、インメモリースキーマとデータベーステーブルのスキーマの同期が常に確保されます。`columns_diff_exclude_unchanged_toast を指定すると、インメモリースキーマのキャッシュと受信メッセージから取得したスキーマに不一致がある場合に、変更されていない TOAST 可能なデータで不一致の原因が明確になる場合を除き、コネクターによってインメモリースキーマのキャッシュが更新されます。

  • 型: string
  • デフォルト: columns_diff
  • 指定可能な値: columns_diff、columns_diff_exclude_unchanged_toast
  • 重要度: 中

出力メッセージ(Output messages)

output.data.format

Kafka 出力レコード値のフォーマットを設定します。指定可能なエントリは、AVRO、JSON_SR、PROTOBUF、または JSON です。スキーマベースのメッセージフォーマット(AVRO、JSON_SR、PROTOBUF など)を使用する場合は、Confluent Cloud Schema Registry を構成しておく必要がある点に注意してください。

  • 型: string
  • 重要度: 高
output.key.format

Kafka 出力レコードキーのフォーマットを設定します。指定可能なエントリは、AVRO、JSON_SR、PROTOBUF、STRING、または JSON です。スキーマベースのメッセージフォーマット(AVRO、JSON_SR、PROTOBUF など)を使用する場合は、Confluent Cloud Schema Registry を構成しておく必要がある点に注意してください

  • 型: string
  • デフォルト: JSON
  • 指定可能な値: AVRO、JSON、JSON_SR、PROTOBUF、STRING
  • 重要度: 高
after.state.only

生成された Kafka レコードに変更イベント適用後のステートのみを含めるかどうかを制御します。

  • 型: boolean
  • デフォルト: true
  • 重要度: 低
json.output.decimal.format

次の 2 つのリテラル値を取る Connect DECIMAL 論理型の値の JSON/JSON_SR シリアル化フォーマットを指定します。

BASE64 では、DECIMAL 論理型を base64 でエンコードされたバイナリデータとしてシリアル化します。

NUMERIC では、JSON/JSON_SR の Connect DECIMAL 論理型の値を、10 進数の値を表す数字としてシリアル化します。

  • 型: string
  • デフォルト: BASE64
  • 重要度: 低

このコネクターのタスク数(Number of tasks for this connector)

tasks.max
  • 型: int
  • 指定可能な値: [1,…,1]
  • 重要度: 高

次のステップ

参考

フルマネージド型の Confluent Cloud コネクターが Confluent Cloud ksqlDB でどのように動作するかを示す例については、「Cloud ETL のデモ」を参照してください。この例では、Confluent CLI を使用して Confluent Cloud のリソースを管理する方法についても説明しています。

../_images/topology.ja.png

After-state only の出力制限

コネクターに After-state only=false プロパティが構成されている場合は、レコード内の before の下にすべての列の以前の値が表示されます。ただし、特定の条件下では、beforenull が含まれるか、一部の列のみ表示されます。Protobuf が使用されている場合、レコードに before フィールドがまったく含まれない場合があります。以下の例では、この問題と、それを修正するためのアクションを示します。

例では、コネクターが JSON で構成され、After-state onlyfalse に設定されています。PostgreSQL データベースでレコードがアップデートされた場合、レコードの表示は以下のようになります。ここでは "before"null になっています。

{
  "before": null,
  "after": {
    "id": 5,
    "name": "Allen William Henry",
    "sex": "male",
    "age": 25,
    "sibsp": 0,
    "parch": 0,
    "created_at": "2018-01-02T15:22:14.831461Z"
   },
   "source": {
     "version": "1.3.1.Final",
     "connector": "postgresql",
     "name": "test",
     "ts_ms": 1621389097781,
     "snapshot": "false",
     "db": "postgres",
     "schema": "public",
     "table": "passengers",
     "txId": 572,
     "lsn": 872429856,
     "xmin": null
   },
   "op": "u",
   "ts_ms": 1621389098688,
   "transaction": null
}

アップデートされたレコードに、行のすべての列の以前の(before)値を含めるには、ALTER TABLE passengers REPLICA IDENTITY FULL を実行して passengers テーブルを変更する必要があります。PostgreSQL データベースでこの変更を行ってレコードがアップデートされると、レコードの表示は以下のようになります。

{
  "before": {
    "id": 8,
    "name": "Gosta Leonard",
    "sex": "male",
    "age": 2,
    "sibsp": 3,
    "parch": 1,
    "created_at": "2018-01-03T20:53:55.955056Z"
  },
  "after": {
    "id": 8,
    "name": "Gosta Leonard",
    "sex": "male",
    "age": 25,
    "sibsp": 3,
    "parch": 1,
    "created_at": "2018-01-03T20:53:55.955056Z"
  },
  "source": {
    "version": "1.3.1.Final",
    "connector": "postgresql",
    "name": "test",
    "ts_ms": 1621390542864,
    "snapshot": "false",
    "db": "postgres",
    "schema": "public",
    "table": "passengers",
    "txId": 581,
    "lsn": 1207967968,
    "xmin": null
  },
  "op": "u",
  "ts_ms": 1621390544032,
  "transaction": null
}