Debezium SQL Server Source Connector for Confluent Platform¶
The Debezium SQL Server Source connector is a connector that can take a snapshot of the existing data in a SQL Server database and then monitor and record all subsequent row-level changes to that data. All of the events for each table are recorded in a separate Apache Kafka® topic, where they can be easily consumed by applications and services.
Tip
- Confluent supports version 0.9.3 and later versions of the Debezium SQL Server Source connector.
- Azure SQL Managed Instances support CDC and are supported by the SQL Server Source connector. For more information, see Features comparison: Azure SQL Database and Azure SQL Managed Instance.
- The Debezium SQL Server Source connector requires the CDC feature to function. The CDC feature is provided by SQL Server Standard edition (2016 SP1 and later) or SQL Server Enterprise edition.
Features¶
The Debezium SQL Server Source connector includes the following features:
At least once delivery¶
This connector guarantees that records are delivered at least once to the Kafka topic. If the connector restarts, there may be some duplicate records in the Kafka topic.
Supports one task¶
The Debezium SQL Server Source connector supports running only one task.
Automatic topic creation¶
The connector automatically creates the internal database history Kafka topic if it doesn’t exist.
Install the SQL Server Connector¶
You can install this connector by using the confluent connect plugin install command, or by manually downloading the ZIP file.
Prerequisites¶
You must install the connector on every machine where Connect will run.
An installation of the latest (
latest
) connector version.To install the
latest
connector version, navigate to your Confluent Platform installation directory and run the following command:confluent connect plugin install debezium/debezium-connector-sqlserver:latest
You can install a specific version by replacing
latest
with a version number as shown in the following example:confluent connect plugin install debezium/debezium-connector-sqlserver:<version-number>
The Debezium PostgreSQL Source connector has specific ACL requirements. See the ACL requirements for Debezium Source connectors to ensure you meet the specified requirements.
Install the connector manually¶
Download and extract the ZIP file for your connector and then follow the manual connector installation instructions.
License¶
The Debezium SQL Server Source connector is an open source connector and does not require a Confluent Enterprise License.
Configuration Properties¶
For a complete list of configuration properties for this connector, see Configuration Reference for Debezium SQL Server Source Connector for Confluent Platform.
Configure Change Data Capture on SQL Server¶
The SQL Server database must be configured to enable the Change Data Capture (CDC) feature. The connector requires this feature be enabled for the table(s) that should be captured. The functionality of the connector is based upon this CDC feature which is included in the SQL Server Standard and SQL Server Enterprise editions.
To enable CDC on the monitored database, use the following SQL command:
USE MyDB
GO
EXEC sys.sp_cdc_enable_db
GO
Enable CDC for each table that you plan to monitor:
USE MyDB
GO
EXEC sys.sp_cdc_enable_table @source_schema = N’dbo’, @source_name = N’MyTable’, @role_name = N’MyRole’, @filegroup_name = N’MyDB_CT’, @supports_net_changes = 1
GO
Quick Start¶
Debezium’s SQL Server Source connector is a source connector that can record events for each table in a separate Kafka topic, where they can be easily consumed by applications and services.
For an example of how to get Kafka Connect connected to Confluent Cloud, see Connect Self-Managed Kafka Connect to Confluent Cloud.
Install the Connector¶
If you want to use Docker images for setting up Kafka, ZooKeeper and Connect, refer to the Debezium tutorial. For the following tutorial, it is required to have a local setup of the Confluent Platform. Note that as of Confluent Platform 7.5, ZooKeeper is deprecated for new deployments. Confluent recommends KRaft mode for new deployments.
Navigate to your Confluent Platform installation directory and run the following command to install the connector:
Tip
The command syntax for the Confluent CLI development commands changed in 5.3.0.
These commands have been moved to confluent local
. For example, the syntax for confluent start
is now
confluent local services start
. For more information, see confluent local.
confluent connect plugin install debezium/debezium-connector-sqlserver:latest
Adding a new connector plugin requires restarting Connect. Use the Confluent CLI to restart Connect.
confluent local services connect stop && confluent local services connect start
Using CONFLUENT_CURRENT: /Users/username/Sandbox/confluent-snapshots/var/confluent.NuZHxXfq
Starting Zookeeper
Zookeeper is [UP]
Starting Kafka
Kafka is [UP]
Starting Schema Registry
Schema Registry is [UP]
Starting Kafka REST
Kafka REST is [UP]
Starting Connect
Connect is [UP]
Check if the SQL Server plugin has been installed correctly and picked up by the plugin loader.
curl -sS localhost:8083/connector-plugins | jq '.[].class' | grep SqlServer
"io.debezium.connector.sqlserver.SqlServerConnector"
Set up SQL Server using Docker (Optional)¶
If you do not have a native installation of SQL Server, you may use the following command to bring up SQL Server with a Docker image.
#Pull docker image
docker pull mcr.microsoft.com/mssql/server:2017-latest
#Run docker container
docker run -e 'ACCEPT_EULA=Y' -e 'MSSQL_AGENT_ENABLED=true' \
-e 'MSSQL_PID=Standard' -e 'SA_PASSWORD=Password!' \
-p 1433:1433 --name sqlserver_1 \
-d mcr.microsoft.com/mssql/server:2017-latest
#Log into container to get your SQL Server command prompt
docker exec -it sqlserver_1 bash -c '/opt/mssql-tools/bin/sqlcmd -U sa -P Password!'
Create Test Data and Enable Change Data Capture¶
The database operator must enable Change Data Capture (CDC) for the table(s) that should be captured by the Debezium connector. The functionality of the connector is based upon this CDC feature included in the SQL Server Standard (beginning with SQL Server 2016 SP1) and SQL Server Enterprise editions.
To enable CDC on the monitored database, use the following SQL command:
USE MyDB
GO
EXEC sys.sp_cdc_enable_db
GO
Enable CDC for each table that you plan to monitor.
USE MyDB
GO
EXEC sys.sp_cdc_enable_table @source_schema = N'dbo', @source_name = N'MyTable', @role_name = N'MyRole', @filegroup_name = N'MyDB_CT', @supports_net_changes = 1
GO
In this example, the database testDB is populated with a set of customer records.
Create inventory.sql
with the following list of commands.
-- Create the test database
CREATE DATABASE testDB;
GO
USE testDB;
EXEC sys.sp_cdc_enable_db;
-- Create some customers ...
CREATE TABLE customers (
id INTEGER IDENTITY(1001,1) NOT NULL PRIMARY KEY,
first_name VARCHAR(255) NOT NULL,
last_name VARCHAR(255) NOT NULL,
email VARCHAR(255) NOT NULL UNIQUE
);
INSERT INTO customers(first_name,last_name,email)
VALUES ('Sally','Thomas','sally.thomas@acme.com');
INSERT INTO customers(first_name,last_name,email)
VALUES ('George','Bailey','gbailey@foobar.com');
INSERT INTO customers(first_name,last_name,email)
VALUES ('Edward','Walker','ed@walker.com');
INSERT INTO customers(first_name,last_name,email)
VALUES ('Anne','Kretchmar','annek@noanswer.org');
EXEC sys.sp_cdc_enable_table @source_schema = 'dbo', @source_name = 'customers', @role_name = NULL, @supports_net_changes = 0;
GO
Change Data Capture is enabled on the customers table and testDB database here.
To execute inventory.sql
in the Docker container’s sqlcmd
prompt, use the following command:
#Load inventory.sql through your container's sqlcmd prompt
cat inventory.sql | docker exec -i sqlserver_1 bash -c '/opt/mssql-tools/bin/sqlcmd -U sa -P Password!'
To execute inventory.sql
on your native installation, use the following command:
sqlcmd -S myServer\instanceName -i C:\inventory.sql
Start the Debezium SQL Server connector¶
Standalone cluster¶
Create the file
register-sqlserver.properties
to store the following connector configuration:name=inventory-connector connector.class=io.debezium.connector.sqlserver.SqlServerConnector tasks.max=1 database.server.name=server1 database.hostname=localhost database.port=1433 database.user=sa database.password=Password! database.dbname=testDB database.history.kafka.bootstrap.servers=localhost:9092 database.history.kafka.topic=schema-changes.inventory
Start the connector:
./bin/connect-standalone ./etc/.properties ./etc/register-sqlserver.properties
Distributed cluster¶
Create the file
register-sqlserver.json
to store the following connector configuration:{ "name": "inventory-connector", "config": { "connector.class" : "io.debezium.connector.sqlserver.SqlServerConnector", "tasks.max" : "1", "database.server.name" : "server1", "database.hostname" : "localhost", "database.port" : "1433", "database.user" : "sa", "database.password" : "Password!", "database.dbname" : "testDB", "database.history.kafka.bootstrap.servers" : "localhost:9092", "database.history.kafka.topic": "schema-changes.inventory" } }
Start the connector.
curl -i -X POST -H "Accept:application/json" -H "Content-Type:application/json" http://localhost:8083/connectors/ -d @register-sqlserver.json
Start your Kafka consumer¶
Start the consumer in a new terminal session.
confluent local services kafka consume server1.dbo.customers --from-beginning
When you enter SQL queries in your SQL Server bash, to add or modify records in the database, messages populate and are displayed on your consumer terminal to reflect those records.
USE testDB;
INSERT INTO customers(first_name,last_name,email) VALUES ('Pam','Thomas','pam@office.com');
GO
Clean up resources¶
Delete the connector and stop Confluent services.
curl -X DELETE localhost:8083/connectors/inventory-connector
confluent local stop
Stop SQL Server container:
docker stop sqlserver_1
Note
Portions of the information provided here derives from documentation originally produced by the Debezium Community. Work produced by Debezium is licensed under Creative Commons 3.0.