Debezium SQL Server Source Connector for Confluent Platform¶
Debezium SQL Server Source Connector は、SQL Server データベースの既存データのスナップショットを取り込み、そのデータに対する後続の行レベルの変更をすべてモニタリングおよび記録できるコネクターです。テーブルごとにすべてのイベントが個別の Apache Kafka® トピックに記録されるので、アプリケーションやサービスで簡単に取り込むことができます。
ちなみに
- Confluent は、Debezium SQL Server Source Connector のバージョン 0.9.3 以降をサポートしています。
- Azure SQL Database はこのコネクターではサポートされていません。これは、このサービスが 変更データキャプチャー (CDC)をサポートしていないためです。Azure SQL Managed Instance は CDC をサポートしないため、このコネクターのサポート対象です。詳細については、「機能の比較: Azure SQL Database と Azure SQL Managed Instance」を参照してください。
- Debezium SQL Server Source Connector が動作するには、CDC 機能が必要です。CDC 機能は SQL Server Standard Edition(2016 SP1 以降)または SQL Server Enterprise Edition で提供されています。
- Confluent Cloud を使用する場合は、クラウドのクイックスタートとして「Microsoft SQL Server CDC Source (Debezium) Connector for Confluent Cloud」を参照してください。
機能¶
Debezium SQL Server Source Connector には、以下の機能があります。
少なくとも 1 回のデリバリー¶
このコネクターによって、レコードが Kafka のトピックに少なくとも 1 回は配信されることが保証されます。コネクターを再起動した場合、Kafka のトピックに重複レコードが存在している可能性があります。
1 つのタスクをサポート¶
Debezium SQL Server Source Connector は、1 つのタスクのみの実行をサポートしています。
SQL Server コネクターのインストール¶
このコネクターは、Confluent Hub クライアントのインストール手順 に従うか、手作業で ZIP ファイルをダウンロードしてインストールします。
前提条件¶
重要
このコネクターは、Connect が実行されるすべてのマシンにインストールする必要があります。
Confluent Hub クライアント のインストール。これは、Confluent Enterprise とともにデフォルトでインストールされます。
コネクターの最新(
latest
)バージョンのインストール。コネクターの
latest
バージョンをインストールするには、Confluent Platform のインストールディレクトリに移動し、次のコマンドを実行します。confluent-hub install debezium/debezium-connector-sqlserver:latest
特定のバージョンをインストールするには、次の例に示すように
latest
をバージョン番号に置き換えます。confluent-hub install debezium/debezium-connector-sqlserver:0.9.4
コネクターの手動インストール¶
コネクターの ZIP ファイル をダウンロードして展開し、コネクターの手動インストール 手順 に従ってください。
ライセンス¶
Debezium SQL Server Source Connector はオープンソースコネクターであるため、Confluent エンタープライズライセンスは不要です。
構成プロパティ¶
このコネクターの構成プロパティの網羅的なリストについては、「 SQL Server Source Connector (Debezium) 構成プロパティ 」を参照してください。
SQL Server での変更データキャプチャーの構成¶
変更データキャプチャー (CDC)機能を有効にするために、SQL Server データベースを構成する必要があります。このコネクターを使用するには、キャプチャーするテーブルに対して、この機能を有効にしておく必要があります。コネクターの機能は、SQL Server Standard Edition または SQL Server Enterprise Edition に含まれているこの CDC 機能に基づいています。
モニタリングするデータベースで CDC を有効にするには、以下の SQL コマンドを使用します。
USE MyDB
GO
EXEC sys.sp_cdc_enable_db
GO
モニタリングする各テーブルに対して CDC を有効にします。
USE MyDB
GO
EXEC sys.sp_cdc_enable_table @source_schema = N’dbo’, @source_name = N’MyTable’, @role_name = N’MyRole’, @filegroup_name = N’MyDB_CT’, @supports_net_changes = 1
GO
クイックスタート¶
Debezium の SQL Server Source Connector は、各テーブルのイベントをそれぞれ別の Kafka トピックに記録することができるソースコネクターです。これにより、アプリケーションやサービスでイベントを簡単に消費できます。
注釈
Kafka Connect を Confluent Cloud に接続する方法の例については、「分散クラスター」を参照してください。
コネクターのインストール¶
Docker イメージを使用して Kafka、ZooKeeper、Connect をセットアップする場合は、Debezium のチュートリアル を参照してください。以下のチュートリアルでは、Confluent Platform をローカルにセットアップしておく必要があります。
Confluent Platform のインストールディレクトリに移動し、次のコマンドを実行してコネクターをインストールします。
ちなみに
Confluent CLI 開発用コマンドのコマンド構文が、5.3.0 で変更されています。該当するコマンドは confluent local
に移行されています。たとえば、confluent start
の構文は、confluent local services start
に変わりました。詳しくは、「confluent local」を参照してください。
confluent-hub install debezium/debezium-connector-sqlserver:latest
新しいコネクタープラグインを追加した場合は、Connect の再起動が必要です。Connect を再起動するには、Confluent CLI を使用します。
confluent local services connect stop && confluent local services connect start
Using CONFLUENT_CURRENT: /Users/username/Sandbox/confluent-snapshots/var/confluent.NuZHxXfq
Starting Zookeeper
Zookeeper is [UP]
Starting Kafka
Kafka is [UP]
Starting Schema Registry
Schema Registry is [UP]
Starting Kafka REST
Kafka REST is [UP]
Starting Connect
Connect is [UP]
SQL Server プラグインが正しくインストールされ、プラグインローダーに反映されていることを確認します。
curl -sS localhost:8083/connector-plugins | jq '.[].class' | grep SqlServer
"io.debezium.connector.sqlserver.SqlServerConnector"
Docker を使用した SQL Server のセットアップ(オプション)¶
SQL Server のネイティブインストール環境がない場合は、次のコマンドを使用して、Docker イメージを使用して SQL Server を起動することができます。
#Pull docker image
docker pull mcr.microsoft.com/mssql/server:2017-latest
#Run docker container
docker run -e 'ACCEPT_EULA=Y' -e 'MSSQL_AGENT_ENABLED=true' \
-e 'MSSQL_PID=Standard' -e 'SA_PASSWORD=Password!' \
-p 1433:1433 --name sqlserver_1 \
-d mcr.microsoft.com/mssql/server:2017-latest
#Log into container to get your SQL Server command prompt
docker exec -it sqlserver_1 bash -c '/opt/mssql-tools/bin/sqlcmd -U sa -P Password!'
テストデータの作成と変更データキャプチャーの有効化¶
データベース運用者が、Debezium コネクターでキャプチャーするテーブルに対して、変更データキャプチャー (CDC)機能を有効にする必要があります。コネクターの機能は、SQL Server Standard Edition(SQL Server 2016 SP1 以降)または SQL Server Enterprise Edition に含まれているこの CDC 機能に基づいています。
モニタリングするデータベースで CDC を有効にするには、以下の SQL コマンドを使用します。
USE MyDB
GO
EXEC sys.sp_cdc_enable_db
GO
モニタリングする各テーブルに対して CDC を有効にします。
USE MyDB
GO
EXEC sys.sp_cdc_enable_table @source_schema = N'dbo', @source_name = N'MyTable', @role_name = N'MyRole', @filegroup_name = N'MyDB_CT', @supports_net_changes = 1
GO
この例では、データベース testDB に顧客レコードのセットが入っています。
下記のコマンドを含む inventory.sql
を作成します。
-- Create the test database
CREATE DATABASE testDB;
GO
USE testDB;
EXEC sys.sp_cdc_enable_db;
-- Create some customers ...
CREATE TABLE customers (
id INTEGER IDENTITY(1001,1) NOT NULL PRIMARY KEY,
first_name VARCHAR(255) NOT NULL,
last_name VARCHAR(255) NOT NULL,
email VARCHAR(255) NOT NULL UNIQUE
);
INSERT INTO customers(first_name,last_name,email)
VALUES ('Sally','Thomas','sally.thomas@acme.com');
INSERT INTO customers(first_name,last_name,email)
VALUES ('George','Bailey','gbailey@foobar.com');
INSERT INTO customers(first_name,last_name,email)
VALUES ('Edward','Walker','ed@walker.com');
INSERT INTO customers(first_name,last_name,email)
VALUES ('Anne','Kretchmar','annek@noanswer.org');
EXEC sys.sp_cdc_enable_table @source_schema = 'dbo', @source_name = 'customers', @role_name = NULL, @supports_net_changes = 0;
GO
ここでは、customers テーブルと testDB データベースに対して変更データキャプチャーを有効にしています。
Docker コンテナーの sqlcmd
プロンプトで inventory.sql
を実行するには、以下のコマンドを使用します。
#Load inventory.sql through your container's sqlcmd prompt
cat inventory.sql | docker exec -i sqlserver_1 bash -c '/opt/mssql-tools/bin/sqlcmd -U sa -P Password!'
ネイティブインストール環境で inventory.sql
を実行するには、以下のコマンドを使用します。
sqlcmd -S myServer\instanceName -i C:\inventory.sql
Debezium SQL Server コネクターの起動¶
ファイル register-sqlserver.json
を作成して、次のコネクター構成を保存します。
{
"name": "inventory-connector",
"config": {
"connector.class" : "io.debezium.connector.sqlserver.SqlServerConnector",
"tasks.max" : "1",
"database.server.name" : "server1",
"database.hostname" : "localhost",
"database.port" : "1433",
"database.user" : "sa",
"database.password" : "Password!",
"database.dbname" : "testDB",
"database.history.kafka.bootstrap.servers" : "localhost:9092",
"database.history.kafka.topic": "schema-changes.inventory"
}
}
コネクターを起動します。
curl -i -X POST -H "Accept:application/json" -H "Content-Type:application/json" http://localhost:8083/connectors/ -d @register-sqlserver.json
Kafka コンシューマーの起動¶
新しいターミナルセッションでコンシューマーを起動します。
confluent local services kafka consume server1.dbo.customers --from-beginning
SQL Server の bash で SQL クエリを入力してデータベースのレコードを追加または変更すると、そのレコードを反映したメッセージが生成され、コンシューマーのターミナルに表示されます。
USE testDB;
INSERT INTO customers(first_name,last_name,email) VALUES ('Pam','Thomas','pam@office.com');
GO
リソースのクリーンアップ¶
コネクターを削除し、Confluent サービスを停止します。
curl -X DELETE localhost:8083/connectors/inventory-connector
confluent local stop
SQL Server コンテナーを停止します。
docker stop sqlserver_1
注釈
ここで提供されている情報の一部は、Debezium Community で最初に生成されたドキュメントから派生したものです。Debezium によって生成された作品は、Creative Commons 3.0 でライセンスされています。