Debezium SQL Server Source Connector for Confluent Platform

The Debezium SQL Server Source connector is a connector that can take a snapshot of the existing data in a SQL Server database and then monitor and record all subsequent row-level changes to that data. All of the events for each table are recorded in a separate Apache Kafka® topic, where they can be easily consumed by applications and services.

Tip

  • Confluent supports version 0.9.3 and later versions of the Debezium SQL Server Source connector.
  • Azure SQL Managed Instances support CDC and are supported by the SQL Server Source connector. For more information, see Features comparison: Azure SQL Database and Azure SQL Managed Instance.
  • The Debezium SQL Server Source connector requires the CDC feature to function. The CDC feature is provided by SQL Server Standard edition (2016 SP1 and later) or SQL Server Enterprise edition.

Features

The Debezium SQL Server Source connector includes the following features:

At least once delivery

This connector guarantees that records are delivered at least once to the Kafka topic. If the connector restarts, there may be some duplicate records in the Kafka topic.

Supports one task

The Debezium SQL Server Source connector supports running only one task.

Automatic topic creation

The connector automatically creates the internal database history Kafka topic if it doesn’t exist.

Install the SQL Server Connector

You can install this connector by using the confluent connect plugin install command, or by manually downloading the ZIP file.

Prerequisites

  • You must install the connector on every machine where Connect will run.

  • An installation of the latest (latest) connector version.

    To install the latest connector version, navigate to your Confluent Platform installation directory and run the following command:

    confluent connect plugin install debezium/debezium-connector-sqlserver:latest
    

    You can install a specific version by replacing latest with a version number as shown in the following example:

    confluent connect plugin install debezium/debezium-connector-sqlserver:<version-number>
    
  • The Debezium PostgreSQL Source connector has specific ACL requirements. See the ACL requirements for Debezium Source connectors to ensure you meet the specified requirements.

Install the connector manually

Download and extract the ZIP file for your connector and then follow the manual connector installation instructions.

License

The Debezium SQL Server Source connector is an open source connector and does not require a Confluent Enterprise License.

Configuration Properties

For a complete list of configuration properties for this connector, see Configuration Reference for Debezium SQL Server Source Connector for Confluent Platform.

Configure Change Data Capture on SQL Server

The SQL Server database must be configured to enable the Change Data Capture (CDC) feature. The connector requires this feature be enabled for the table(s) that should be captured. The functionality of the connector is based upon this CDC feature which is included in the SQL Server Standard and SQL Server Enterprise editions.

To enable CDC on the monitored database, use the following SQL command:

USE MyDB
GO
EXEC sys.sp_cdc_enable_db
GO

Enable CDC for each table that you plan to monitor:

USE MyDB
GO
EXEC sys.sp_cdc_enable_table @source_schema = N’dbo’, @source_name = N’MyTable’, @role_name = N’MyRole’, @filegroup_name = N’MyDB_CT’, @supports_net_changes = 1
GO

Quick Start

Debezium’s SQL Server Source connector is a source connector that can record events for each table in a separate Kafka topic, where they can be easily consumed by applications and services.

For an example of how to get Kafka Connect connected to Confluent Cloud, see Connect Self-Managed Kafka Connect to Confluent Cloud.

Install the Connector

If you want to use Docker images for setting up Kafka, ZooKeeper and Connect, refer to the Debezium tutorial. For the following tutorial, it is required to have a local setup of the Confluent Platform. Note that as of Confluent Platform 7.5, ZooKeeper is deprecated for new deployments. Confluent recommends KRaft mode for new deployments.

Navigate to your Confluent Platform installation directory and run the following command to install the connector:

Tip

The command syntax for the Confluent CLI development commands changed in 5.3.0. These commands have been moved to confluent local. For example, the syntax for confluent start is now confluent local services start. For more information, see confluent local.

confluent connect plugin install debezium/debezium-connector-sqlserver:latest

Adding a new connector plugin requires restarting Connect. Use the Confluent CLI to restart Connect.

confluent local services connect stop && confluent local services connect start
Using CONFLUENT_CURRENT: /Users/username/Sandbox/confluent-snapshots/var/confluent.NuZHxXfq
Starting Zookeeper
Zookeeper is [UP]
Starting Kafka
Kafka is [UP]
Starting Schema Registry
Schema Registry is [UP]
Starting Kafka REST
Kafka REST is [UP]
Starting Connect
Connect is [UP]

Check if the SQL Server plugin has been installed correctly and picked up by the plugin loader.

curl -sS localhost:8083/connector-plugins | jq '.[].class' | grep SqlServer
"io.debezium.connector.sqlserver.SqlServerConnector"

Set up SQL Server using Docker (Optional)

If you do not have a native installation of SQL Server, you may use the following command to bring up SQL Server with a Docker image.

#Pull docker image
docker pull mcr.microsoft.com/mssql/server:2017-latest

#Run docker container
docker run -e 'ACCEPT_EULA=Y' -e 'MSSQL_AGENT_ENABLED=true' \
-e 'MSSQL_PID=Standard' -e 'SA_PASSWORD=Password!' \
-p 1433:1433 --name sqlserver_1 \
-d mcr.microsoft.com/mssql/server:2017-latest

#Log into container to get your SQL Server command prompt
docker exec -it sqlserver_1 bash -c '/opt/mssql-tools/bin/sqlcmd -U sa -P Password!'

Create Test Data and Enable Change Data Capture

The database operator must enable Change Data Capture (CDC) for the table(s) that should be captured by the Debezium connector. The functionality of the connector is based upon this CDC feature included in the SQL Server Standard (beginning with SQL Server 2016 SP1) and SQL Server Enterprise editions.

To enable CDC on the monitored database, use the following SQL command:

USE MyDB
GO
EXEC sys.sp_cdc_enable_db
GO

Enable CDC for each table that you plan to monitor.

USE MyDB
GO
EXEC sys.sp_cdc_enable_table @source_schema = N'dbo', @source_name = N'MyTable', @role_name = N'MyRole', @filegroup_name = N'MyDB_CT', @supports_net_changes = 1
GO

In this example, the database testDB is populated with a set of customer records.

Create inventory.sql with the following list of commands.


-- Create the test database
CREATE DATABASE testDB;
GO
USE testDB;
EXEC sys.sp_cdc_enable_db;

-- Create some customers ...
CREATE TABLE customers (
  id INTEGER IDENTITY(1001,1) NOT NULL PRIMARY KEY,
  first_name VARCHAR(255) NOT NULL,
  last_name VARCHAR(255) NOT NULL,
  email VARCHAR(255) NOT NULL UNIQUE
);
INSERT INTO customers(first_name,last_name,email)
  VALUES ('Sally','Thomas','sally.thomas@acme.com');
INSERT INTO customers(first_name,last_name,email)
  VALUES ('George','Bailey','gbailey@foobar.com');
INSERT INTO customers(first_name,last_name,email)
  VALUES ('Edward','Walker','ed@walker.com');
INSERT INTO customers(first_name,last_name,email)
  VALUES ('Anne','Kretchmar','annek@noanswer.org');
EXEC sys.sp_cdc_enable_table @source_schema = 'dbo', @source_name = 'customers', @role_name = NULL, @supports_net_changes = 0;
GO

Change Data Capture is enabled on the customers table and testDB database here.

To execute inventory.sql in the Docker container’s sqlcmd prompt, use the following command:

#Load inventory.sql through your container's sqlcmd prompt
cat inventory.sql | docker exec -i sqlserver_1 bash -c '/opt/mssql-tools/bin/sqlcmd -U sa -P Password!'

To execute inventory.sql on your native installation, use the following command:

sqlcmd -S myServer\instanceName -i C:\inventory.sql

Start the Debezium SQL Server connector

Standalone cluster

  1. Create the file register-sqlserver.properties to store the following connector configuration:

    name=inventory-connector
    connector.class=io.debezium.connector.sqlserver.SqlServerConnector
    tasks.max=1
    database.server.name=server1
    database.hostname=localhost
    database.port=1433
    database.user=sa
    database.password=Password!
    database.dbname=testDB
    database.history.kafka.bootstrap.servers=localhost:9092
    database.history.kafka.topic=schema-changes.inventory
    
  2. Start the connector:

    ./bin/connect-standalone  ./etc/.properties ./etc/register-sqlserver.properties
    

Distributed cluster

  1. Create the file register-sqlserver.json to store the following connector configuration:

    {
     "name": "inventory-connector",
     "config": {
         "connector.class" : "io.debezium.connector.sqlserver.SqlServerConnector",
         "tasks.max" : "1",
         "database.server.name" : "server1",
         "database.hostname" : "localhost",
         "database.port" : "1433",
         "database.user" : "sa",
         "database.password" : "Password!",
         "database.dbname" : "testDB",
         "database.history.kafka.bootstrap.servers" : "localhost:9092",
         "database.history.kafka.topic": "schema-changes.inventory"
         }
     }
    
  2. Start the connector.

    curl -i -X POST -H "Accept:application/json" -H "Content-Type:application/json" http://localhost:8083/connectors/ -d @register-sqlserver.json
    

Start your Kafka consumer

Start the consumer in a new terminal session.

confluent local services kafka consume server1.dbo.customers --from-beginning

When you enter SQL queries in your SQL Server bash, to add or modify records in the database, messages populate and are displayed on your consumer terminal to reflect those records.

USE testDB;
INSERT INTO customers(first_name,last_name,email) VALUES ('Pam','Thomas','pam@office.com');
GO

Clean up resources

Delete the connector and stop Confluent services.

curl -X DELETE localhost:8083/connectors/inventory-connector
confluent local stop

Stop SQL Server container:

docker stop sqlserver_1

Note

Portions of the information provided here derives from documentation originally produced by the Debezium Community. Work produced by Debezium is licensed under Creative Commons 3.0.