Debezium PostgreSQL Source Connector for Confluent Platform¶
注釈
Confluent Cloud を使用している場合は、「PostgreSQL Source Connector for Confluent Cloud」または「Salesforce CDC Source Connector for Confluent Cloud」を参照してください。
Debezium PostgreSQL Connector は、PostgreSQL データベース上の既存のデータのスナップショットを取得し、それらのデータに対する後続の行レベルの変更すべてをモニタリングおよび記録することができるソースコネクターです。テーブルごとにすべてのイベントが個別の Apache Kafka® トピックに記録されるので、アプリケーションやサービスで簡単に取り込むことができます。
- Confluent では Debezium PostgreSQL コネクターのバージョン 0.9.3 以降がサポートされています。
- Confluent では PostgreSQL 9.6、10、11 でのこのコネクターの使用がサポートされています。
- 論理デコード プラグインをインストールできない場合もあり、Heroku Postgres のようなサービスでホストされるデータベースを Debezium でモニタリングできない可能性があります。
機能¶
Debezium PostgreSQL Source Connector には、以下の機能があります。
少なくとも 1 回のデリバリー¶
このコネクターによって、レコードが Kafka のトピックに少なくとも 1 回は配信されることが保証されます。コネクターを再起動した場合、Kafka のトピックに重複レコードが存在している可能性があります。
1 つのタスクをサポート¶
Debezium PostgreSQL Source Connector は、1 つのタスクのみの実行をサポートしています。
Postgres コネクターのインストール¶
このコネクターは、こちらの手順 でインストールできます。ZIP ファイルを手動でダウンロードすることもできます。
confluent-hub install debezium/debezium-connector-postgresql:latest
特定のバージョンをインストールするには、latest
をバージョン番号に置き換えます。以下に例を示します。
confluent-hub install debezium/debezium-connector-postgresql:0.9.4
コネクターの手動インストール¶
コネクターの ZIP ファイル をダウンロードして展開し、コネクターの手動インストール 手順 に従ってください。
ライセンス¶
Debezium PostgreSQL コネクターは、オープンソースコネクターであり、Confluent エンタープライズライセンスは不要です。
構成プロパティ¶
このコネクターの構成プロパティの網羅的なリストについては、「 PostgreSQL Source Connector (Debezium) 構成プロパティ 」を参照してください。
PostgreSQL のセットアップ¶
Debezium PostgreSQL Connector を使用して PostgreSQL サーバー上でコミットされた変更をモニタリングするには、最初に 論理デコードプラグイン を PostgreSQL サーバーにインストールしておく必要があります。レプリケーションスロットを有効にし、レプリケーションを実行するために十分な権限が付与されるようにユーザーを構成します。
Amazon RDS で実行される PostgreSQL データベースをモニタリングするには、Debezium ドキュメントで『PostgreSQL on Amazon RDS』のセクションを参照してください。
PostgreSQL サーバーでの論理デコードとレプリケーションの有効化¶
Postgres リレーショナルデータベース管理システムには、論理デコードと呼ばれる機能があります。クライアントはこの機能を使用して、データベーステーブルに対するすべての永続的な変更を一貫したフォーマットで抽出することができます。フォーマットされたこのデータは、データベースの内部ステートが詳しく分からなくても解釈することができます。出力プラグインは、先書きログの内部表現のデータをレプリケーションスロットのコンシューマーで必要なフォーマットに変換します。
Debezium PostgreSQL コネクターは、Debezium でサポートされている以下の論理デコードプラグインのいずれかを使用します。
wal2json
プラグインのインストール¶
コマンドを実行するには、最初に PostgreSQL lib ディレクトリの wal2json
ライブラリへの書き込み権限をユーザーに付与する必要があります。テスト環境では、このディレクトリは /usr/pgsql-9.6/lib/
です。テスト環境では、export パスを次のようにセットアップします。
export PATH="$PATH:/usr/pgsql-9.6/bin"
wal2json インストールコマンドを入力します。
git clone https://github.com/eulerto/wal2json -b master --single-branch \
&& cd wal2json \
&& git checkout 92b33c7d7c2fccbeb9f79455dafbc92e87e00ddd \
&& make && make install \
&& cd .. \
&& rm -rf wal2json
PostgreSQL サーバーでのレプリケーションの有効化¶
/usr/share/postgresql/postgresql.conf
PostgreSQL 構成ファイルの末尾に以下の行を追加します。これらの行には、共有ライブラリにあるプラグインが含まれており、いくつかの先書きログ(WAL)とストリーミングレプリケーションの設定を調整します。
# LOGGING
log_min_error_statement = fatal
# CONNECTION
listen_addresses = '*'
# MODULES
shared_preload_libraries = 'decoderbufs'
# REPLICATION
wal_level = logical # minimal, archive, hot_standby, or logical (change requires restart)
max_wal_senders = 1 # max number of walsender processes (change requires restart)
#wal_keep_segments = 4 # in logfile segments, 16MB each; 0 disables
#wal_sender_timeout = 60s # in milliseconds; 0 disables
max_replication_slots = 1 # max number of replication slots (change requires restart)
レプリケーションアクセス許可の初期化¶
pg_hba.conf
PostgreSQL 構成ファイルの末尾に以下の行を追加します。これらの行は、データベースレプリケーションのクライアント認証を構成します。
############ REPLICATION ##############
local replication postgres trust
host replication postgres 127.0.0.1/32 trust
host replication postgres ::1/128 trust
クイックスタート¶
Debezium PostgreSQL Connector は、各テーブルのイベントをそれぞれ別の Kafka トピックに記録することができるソースコネクターです。これにより、アプリケーションやサービスでイベントを簡単に消費できます。
注釈
Kafka Connect を Confluent Cloud に接続する方法の例については、「分散クラスター」を参照してください。
コネクターのインストール¶
Docker イメージを使用して Kafka、ZooKeeper、Connect をセットアップする場合は、Debezium のチュートリアル を参照してください。以下のチュートリアルでは、Confluent Platform をローカルにインストールしておく必要があります。
Confluent Platform のインストールディレクトリに移動し、次のコマンドを実行してコネクターをインストールします。
confluent-hub install debezium/debezium-connector-postgresql:0.9.4
新しいコネクタープラグインを追加した場合は、Connect の再起動が必要です。Confluent CLI を使用して Connect を再起動します。
ちなみに
Confluent CLI 開発用コマンドのコマンド構文が、5.3.0 で変更されています。該当するコマンドは confluent local
に移行されています。たとえば、confluent start
の構文は、confluent local services start
に変わりました。詳しくは、「confluent local」を参照してください。
confluent local services connect stop && confluent local services connect start
Using CONFLUENT_CURRENT: /Users/username/Sandbox/confluent-snapshots/var/confluent.NuZHxXfq
Starting Zookeeper
Zookeeper is [UP]
Starting Kafka
Kafka is [UP]
Starting Schema Registry
Schema Registry is [UP]
Starting Kafka REST
Kafka REST is [UP]
Starting Connect
Connect is [UP]
PostgreSQL プラグインが正しくインストールされ、プラグインローダーによって選択されていることを確認します。
curl -sS localhost:8083/connector-plugins | jq .[].class | grep postgres
"io.debezium.connector.postgresql.PostgresConnector"
Docker を使用した PostgreSQL のセットアップ(オプション)¶
PostgreSQL がネイティブインストールされていない場合は、次のコマンドを使用して、新しいコンテナーを起動し、論理デコード プラグイン、レプリケーションスロット、inventory
テストデータベースを事前構成した PostgreSQL データベースサーバーを実行します。
# Pull docker image
docker pull debezium/example-postgres
# Run docker container
docker run -it --rm --name postgres -p 5432:5432 \
-e POSTGRES_USER=postgres -e POSTGRES_PASSWORD=postgres \
debezium/example-postgres
# In a separate terminal, launch psql to run SQL queries:
docker run -it --rm --name psql_client \
-e PGOPTIONS="--search_path=inventory" \
-e PGPASSWORD=postgres --link postgres:postgres debezium/example-postgres \
psql -h postgres -U postgres
# To see the list of relations in the inventory database, type \d at the postgres prompt. To exit, type \q
PostgreSQL サーバーでの論理デコードの有効化¶
(前のセクションで)Docker イメージを使用して PostgreSQL をセットアップした場合、論理エンコーディングは既に有効になっています。ネイティブインストールで、以下のステップに従って PostgreSQL サーバーでの論理デコードとレプリケーションの有効化 を行います。
Debezium PostgreSQL コネクターの起動¶
ファイル register-postgres.json
を作成して、次のコネクター構成を格納します。
{
"name": "inventory-connector",
"config": {
"connector.class": "io.debezium.connector.postgresql.PostgresConnector",
"tasks.max": "1",
"database.hostname": "0.0.0.0",
"database.port": "5432",
"database.user": "postgres",
"database.password": "postgres",
"database.dbname" : "postgres",
"database.server.name": "dbserver1",
"schema.whitelist": "inventory"
}
}
コネクターを起動します。
curl -i -X POST -H "Accept:application/json" -H "Content-Type:application/json" http://localhost:8083/connectors/ -d @register-postgres.json
Kafka コンシューマーの起動¶
新しいターミナルセッションでコンシューマーを起動します。
confluent local services kafka consume dbserver1.inventory.customers --from-beginning
bash で SQL クエリを入力(してデータベースにレコードを追加、またはデータベースのレコードを変更)すると、メッセージが取り込まれて、そのレコードを反映したメッセージがコンシューマーのターミナルに表示されます。
以下は、顧客テーブルのレコードを更新する psql クエリの例です。
update customers set first_name = 'Sarah' where id = 1001;
リソースのクリーンアップ¶
コネクターを削除し、Confluent サービスを停止します。
curl -X DELETE localhost:8083/connectors/inventory-connector
confluent local stop
PostgreSQL コンテナーを停止します。
docker stop psql_client # Alternatively type \q at the psql prompt
docker stop postgres
注釈
ここで提供している情報の一部は、Debezium Community を作成元とするドキュメントからの引用です。Debezium によって生成された作品は、Creative Commons 3.0 でライセンスされています。