Important

You are viewing documentation for an older version of Confluent Platform. For the latest, click here.

Debezium SQL Server Source Connector

The Debezium’s SQL Server Connector is a source connector that can obtain a snapshot of the existing data in a SQL Server database and then monitor and record all subsequent row-level changes to that data. All of the events for each table are recorded in a separate Apache Kafka® topic, where they can be easily consumed by applications and services.

  • Confluent supports Debezium SQL Server connector version 0.9.3 and later.
  • Confluent supports using this connector with SQL Server 2016 SP1 or later.
  • SQL Server on Microsoft Azure is currently not supported.

Install the SQL Server Connector

You can install this connector by using the Confluent Hub client (recommended) or you can manually download the ZIP file.

confluent-hub install debezium/debezium-connector-sqlserver:latest

You can install a specific version by replacing latest with a version number. For example:

confluent-hub install debezium/debezium-connector-sqlserver:0.9.4

License

The Debezium SQL Server connector is an open source connector and does not require a Confluent Enterprise License.

Configure Change Data Capture on SQL Server

The SQL Server database (SQL Server 2016 SP1 or later versions) must be configured to enable the Change Data Capture (CDC) feature. The connect requires this feature be enabled for the table(s) that should be captured by the Debezium connector. The functionality of the connector is based upon this CDC feature included in the SQL Server Standard and SQL Server Enterprise editions.

To enable CDC on the monitored database, use the following SQL command:

USE MyDB
GO
EXEC sys.sp_cdc_enable_db
GO

Enable CDC for each table that you plan to monitor:

USE MyDB
GO
EXEC sys.sp_cdc_enable_table @source_schema = N’dbo’, @source_name = N’MyTable’, @role_name = N’MyRole’, @filegroup_name = N’MyDB_CT’, @supports_net_changes = 1
GO

Quick Start

Debezium’s SQL Server Connector is a source connector that can record events for each table in a separate Kafka topic, where they can be easily consumed by applications and services.

Install the Connector

If you wish to use Docker images for setting up Kafka, Zookeeper and Connect, refer to the Debezium tutorial. For the following tutorial, it is required to have a local setup of the Confluent Platform.

Navigate to your Confluent Platform installation directory and run the following command to install the connector:

confluent-hub install debezium/debezium-connector-sqlserver:latest

Adding a new connector plugin requires restarting Connect. Use the Confluent CLI to restart Connect.

confluent stop connect && confluent start connect
Using CONFLUENT_CURRENT: /Users/username/Sandbox/confluent-snapshots/var/confluent.NuZHxXfq
Starting zookeeper
zookeeper is [UP]
Starting kafka
kafka is [UP]
Starting schema-registry
schema-registry is [UP]
Starting kafka-rest
kafka-rest is [UP]
Starting connect
connect is [UP]

Check if the SQL Server plugin has been installed correctly and picked up by the plugin loader.

curl -sS localhost:8083/connector-plugins | jq '.[].class' | grep SqlServer
"io.debezium.connector.sqlserver.SqlServerConnector"

Set up SQL Server using Docker (Optional)

If you do not have a native installation of SQL Server, you may use the following command to bring up SQL Server with a Docker image.

#Pull docker image
docker pull mcr.microsoft.com/mssql/server:2017-latest

#Run docker container
docker run -e 'ACCEPT_EULA=Y' -e 'MSSQL_AGENT_ENABLED=true' \
-e 'MSSQL_PID=Standard' -e 'SA_PASSWORD=Password!' \
-p 1433:1433 --name sqlserver_1 \
-d mcr.microsoft.com/mssql/server:2017-latest

#Log into container to get your SQL Server command prompt
docker exec -it sqlserver_1 bash -c '/opt/mssql-tools/bin/sqlcmd -U sa -P Password!'

Create Test Data and Enable Change Data Capture

The database operator must enable Change Data Capture (CDC) for the table(s) that should be captured by the Debezium connector. The functionality of the connector is based upon this CDC feature included in the SQL Server Standard (beginning with SQL Server 2016 SP1) and SQL Server Enterprise editions.

To enable CDC on the monitored database, use the following SQL command:

USE MyDB
GO
EXEC sys.sp_cdc_enable_db
GO

Enable CDC for each table that you plan to monitor.

USE MyDB
GO
EXEC sys.sp_cdc_enable_table @source_schema = N’dbo’, @source_name = N’MyTable’, @role_name = N’MyRole’, @filegroup_name = N’MyDB_CT’, @supports_net_changes = 1
GO

In this example, the database testDB is populated with a set of customer records.

Create inventory.sql with the following list of commands.


-- Create the test database
CREATE DATABASE testDB;
GO
USE testDB;
EXEC sys.sp_cdc_enable_db;

-- Create some customers ...
CREATE TABLE customers (
  id INTEGER IDENTITY(1001,1) NOT NULL PRIMARY KEY,
  first_name VARCHAR(255) NOT NULL,
  last_name VARCHAR(255) NOT NULL,
  email VARCHAR(255) NOT NULL UNIQUE
);
INSERT INTO customers(first_name,last_name,email)
  VALUES ('Sally','Thomas','sally.thomas@acme.com');
INSERT INTO customers(first_name,last_name,email)
  VALUES ('George','Bailey','gbailey@foobar.com');
INSERT INTO customers(first_name,last_name,email)
  VALUES ('Edward','Walker','ed@walker.com');
INSERT INTO customers(first_name,last_name,email)
  VALUES ('Anne','Kretchmar','annek@noanswer.org');
EXEC sys.sp_cdc_enable_table @source_schema = 'dbo', @source_name = 'customers', @role_name = NULL, @supports_net_changes = 0;
GO

Change Data Capture is enabled on the customers table and testDB database here.

To execute inventory.sql in the Docker container’s sqlcmd prompt, use the following command:

#Load inventory.sql through your container's sqlcmd prompt
cat inventory.sql | docker exec -i sqlserver_1 bash -c '/opt/mssql-tools/bin/sqlcmd -U sa -P Password!'

To execute inventory.sql on your native installation, use the following command:

sqlcmd -S myServer\instanceName -i C:\inventory.sql

Start the Debezium SQL Server connector

Create the file register-sqlserver.json to store the following connector configuration:

{
 "name": "inventory-connector",
 "config": {
     "connector.class" : "io.debezium.connector.sqlserver.SqlServerConnector",
     "tasks.max" : "1",
     "database.server.name" : "server1",
     "database.hostname" : "localhost",
     "database.port" : "1433",
     "database.user" : "sa",
     "database.password" : "Password!",
     "database.dbname" : "testDB",
     "database.history.kafka.bootstrap.servers" : "localhost:9092",
     "database.history.kafka.topic": "schema-changes.inventory"
     }
 }

Start the connector.

curl -i -X POST -H "Accept:application/json" -H "Content-Type:application/json" http://localhost:8083/connectors/ -d @register-sqlserver.json

Start your Kafka consumer

Start the consumer in a new terminal session.

confluent consume server1.dbo.customers​ --from-beginning

When you enter SQL queries in your SQL Server bash, to add or modify records in the database, messages populate and are displayed on your consumer terminal to reflect those records.

USE testDB;
INSERT INTO customers(first_name,last_name,email) VALUES ('Pam','Thomas','pam@office.com');
GO

Clean up resources

Delete the connector and stop Confluent services.

curl -X DELETE localhost:8083/connectors/inventory-connector
confluent stop

Stop SQL Server container:

docker stop sqlserver_1

Note

Portions of the information provided here derives from documentation originally produced by the Debezium Community. Work produced by Debezium is licensed under Creative Commons 3.0.

Additional Documentation