Kafka Connect Microsoft SQL Server Connector

You can use the Microsoft SQL Server Connector to monitor source databases for changes and write them in realtime to Kafka.

The Microsoft SQL Server connector utilizes Change Tracking to identify changes. There are two ways to read the changes from the source system as they are generated. Change Data Capture is a feature that is only available on SQL Server Enterprise and Developer editions. Change Tracking is a lightweight solution that will efficiently find rows that have changed. If the rows are modified in quick succession all of the changes might not be found. The latest version of the change will be returned.

Install the Microsoft SQL Server Connector

You can install this connector by using the Confluent Hub client (recommended) or you can manually download the ZIP file.

Install Connector Using Confluent Hub

Prerequisite
Confluent Hub Client must be installed. This is installed by default with Confluent Enterprise.

Navigate to your Confluent Platform installation directory and run this command to install the latest (latest) connector version. The connector must be installed on every machine where Connect will be run.

confluent-hub install confluentinc/kafka-connect-cdc-mssql:latest

You can install a specific version by replacing latest with a version number. For example:

confluent-hub install confluentinc/kafka-connect-cdc-mssql:1.0.1-preview

Install Connector Manually

Download and extract the ZIP file for your connector and then follow the manual connector installation instructions.

Usage Notes

Tasks are used to spread the work across the Connect cluster. The connector will partition all of the tables with Change Tracking enabled across the number of tasks allocated to the connector. Each task will have at least one connection to the source database server.

This connector utilizes the Change Tracking feature available in most versions of SQL Server. For this connector to detect changes you must enable Change Tracking on the source database server.

Examples

This example will connect to a database server and use any table configured with change tracking.

Note

Increasing tasks.max will help increase the speed at which changes are imported. This comes with the side affect of increased connections/load on the source SQL Server.

Select one of the following configuration methods based on how you have deployed Kafka Connect. Distributed Mode will the the JSON / REST examples. Standalone mode will use the properties based example.

Distributed Mode JSON

{
  "name" : "msSqlSourceConnector1",
  "config" : {
    "connector.class" : "io.confluent.connect.cdc.mssql.MsSqlSourceConnector",
    "tasks.max" : "1",
    "initial.database" : "testing",
    "username" : "cdc",
    "password" : "secret",
    "server.name" : "db-01.example.com",
    "server.port" : "1433"
  }
}

Standalone Mode Properties

connector.class=io.confluent.connect.cdc.mssql.MsSqlSourceConnector
tasks.max=1
initial.database=testing
username=cdc
password=secret
server.name=db-01.example.com
server.port=1433

This example will connect to a SQL Server and will only check for changes from specific tables.

Select one of the following configuration methods based on how you have deployed Kafka Connect. Distributed Mode will the the JSON / REST examples. Standalone mode will use the properties based example.

Distributed Mode JSON

{
  "name" : "msSqlSourceConnector1",
  "config" : {
    "connector.class" : "io.confluent.connect.cdc.mssql.MsSqlSourceConnector",
    "tasks.max" : "1",
    "initial.database" : "testing",
    "username" : "cdc",
    "password" : "secret",
    "server.name" : "db-01.example.com",
    "server.port" : "1433",
    "change.tracking.tables" : "dbo.Table1,dbo.Table2"
  }
}

Standalone Mode Properties

connector.class=io.confluent.connect.cdc.mssql.MsSqlSourceConnector
tasks.max=1
initial.database=testing
username=cdc
password=secret
server.name=db-01.example.com
server.port=1433
change.tracking.tables=dbo.Table1,dbo.Table2