Configuration Reference for Debezium SQL Server Source Connector for Confluent Platform¶
The SQL Server Source connector can be configured using a variety of configuration properties.
Note
These are properties for the self-managed connector. If you are using Confluent Cloud, see Debezium SQL Server Source Connector for Confluent Cloud.
Required properties¶
name
- A unique name for the connector. Trying to register again with the same name will fail.
connector.class
- The name of the Java class for the connector. You must use
io.debezium.connector.sqlserver.SqlServerConnector
for the SQL Server connector. tasks.max
The maximum number of tasks that should be created for this connector. The SQL Server connector always uses a single task.
- Type: int
- Default: 1
database.hostname
IP address or hostname of the SQL Server database server.
- Type: string
- Importance: high
database.port
Integer port number of the SQL Server database server.
- Type: int
- Importance: low
- Default: 1433
database.user
Username to use when when connecting to the SQL Server database server.
- Type: string
- Importance: high
database.password
Password to use when when connecting to the SQL Server database server.
- Type: password
- Importance: high
database.instance
Specifies the instance name of the SQL Server named instance.
- Type: string
- Importance: No default
database.names
The comma-separated list of the SQL Server database names from which to stream the changes. Currently, only one database name is supported. This property must not be used with
database.dbname
.- Type: list of string
- Default: No default
topic.prefix
Topic prefix that provides a namespace for the particular PostgreSQL database server or cluster in which Debezium is capturing changes. The prefix should be unique across all other connectors, since it is used as a topic name prefix for all Kafka topics that receive records from this connector. Only alphanumeric characters, hyphens, dots and underscores must be used in the database server logical name. Do not change the value of this property. If you change the name value, after a restart, instead of continuing to emit events to the original topics, the connector emits subsequent events to topics whose names are based on the new value.
- Type: string
- Default: No default
schema.include.list
An optional comma-separated list of regular expressions that match schema names to be monitored. Any schema name not included in the whitelist will be excluded from monitoring. By default all non-system schemas are monitored. May not be used with
schema.exclude.list
.- Type: list of strings
- Importance: low
schema.exclude.list
An optional comma-separated list of regular expressions that match schema names to be excluded from monitoring. Any schema name not included in the exclude list will be monitored, with the exception of system schemas. May not be used with
schema.whitelist
.- Type: list of strings
- Importance: low
table.include.list
A comma-separated list of regular expressions that match fully-qualified table identifiers for tables you want Debezium to capture; any table that is not included in
table.include.list
is excluded from capture. Each identifier is of the formschemaName.tableName
. By default, the connector captures all non-system tables for the designated schemas. Do not use withtable.exclude.list
.- Type: list of strings
- Importance: low
table.exclude.list
An comma-separated list of regular expressions that match fully-qualified table identifiers for the tables you want to exclude from being captured. Each identifier is of the form
schemaName.tableName
. Do not use withtable.include.list
.- Type: list of strings
- Importance: low
column.include.list
An optional, comma-separated list of regular expressions that match the fully-qualified names of columns that should be included in change event record values. Fully-qualified names for columns use the form
schemaName.tableName.columnName
. Do not also set thecolumn.exclude.list
property.- Type: list of strings
- Importance: low
- Default: “” (Empty string)
column.exclude.list
A comma-separated list of regular expressions that match the fully-qualified names of columns that should be excluded from change event message values. Fully-qualified names for columns use the form
schemaName.tableName.columnName
. Do not also set thecolumn.include.list
property.- Type: list of strings
- Importance: low
- Default: Empty string
skip.messages.without.change
Specifies whether to skip publishing messages when there is no change in included columns. This would essentially filter messages if there is no change in columns included as per the
column.include.list
orcolumn.exclude.list
properties.- Type: boolean
- Default: false
column.mask.hash.hashAlgorithm.with.salt.salt
;column.mask.hash.v2.hashAlgorithm.with.salt.salt
An optional comma-separated list of regular expressions that match the fully-qualified names of character-based columns; the character-based column values should be pseudonyms in the change event message values with a field value consisting of the hashed value using the algorithm
hashAlgorithm
andsalt
. For more details, see the Debezium documentation.- Type: list of strings
- Default: No default
decimal.handling.mode
Specifies how the connector should handle values for DECIMAL and NUMERIC columns:
precise
: Represents them precisely usingjava.math.BigDecimal
values represented in change events in a binary form.double
: Represents them using double values, which may result in a loss of precision but is easier to use.string
: Encodes values as formatted strings, which is easy to consume but semantic information about the real type is lost.
- Type: string
- Default:
precise
time.precision.mode
Time, date, and timestamps can be represented with different kinds of precision, including:
adaptive
: (the default) captures the time and timestamp values exactly as in the database using either millisecond, microsecond, or nanosecond precision values based on the database column’s type.connect
: always represents time and timestamp values using Kafka Connect’s built-in representations for Time, Date, and Timestamp, which uses millisecond precision regardless of the database columns’ precision. For more information, see temporal values.
- Type: string
- Default:
adaptive
include.schema.changes
Use this property to specify whether the connector should publish changes in the database schema to a Kafka topic with the same name as the database server ID. Each schema change is recorded with a key that contains the database name and a value that is a JSON structure that describes the schema update. This is independent of how the connector internally records database history.
- Type: boolean
- Default:
true
tombstones.on.delete
This property controls whether a tombstone event should be generated after a delete event. When set to
true
, the delete operations are represented by a delete event and a subsequent tombstone event. When set tofalse
, only a delete event is sent. Emitting the tombstone event (the default behavior) allows Kafka to completely delete all events pertaining to the given key once the source record is deleted.- Type: string
- Importance: low
- Default:
true
column.truncate.to.length.chars
An optional comma-separated list of regular expressions that match the fully-qualified names of character-based columns; the character-based column values should be truncated in the change event message values if the field values are longer than the specified number of characters. Fully-qualified names for columns use the form
schemaName.tableName.columnName
.- Type: list of strings
- Default: No default
column.mask.with.length.chars
An optional, comma-separated list of regular expressions that match the fully-qualified names of character-based columns; the character-based columns should be replaced in the change event message values with a field value consisting of the specified number of asterisk (*) characters. Fully-qualified names for columns use the form
schemaName.tableName.columnName
.- Type: list of strings
- Default: No default
column.propagate.source.type
An optional comma-separated list of regular expressions that match the fully-qualified names of columns whose original type and length should be added as a parameter to the corresponding field schemas in the emitted change messages. The schema parameters
__debezium.source.column.type
,__debezium.source.column.length
and__debezium.source.column.scale
will be used to propagate the original type name and length (for variable-width types), respectively. Useful to properly size corresponding columns in sink databases. Fully-qualified names for columns use the form`` schemaName.tableName.columnName``.- Type: list of strings
- Importance: low
- Default: No default
datatype.propagate.source.type+
An optional comma-separated list of regular expressions. These expressions match the database-specific data type name of columns whose original type and length should be added as a parameter to the corresponding field schemas in the emitted change messages. The schema parameters
__debezium.source.column.type
,__debezium.source.column.length
and__debezium.source.column.scale
will be used to propagate the original type name and length (for variable-width types), respectively. Fully-qualified data type names use the form:schemaName.tableName.typeName
. For a list of SQL Server-specific data type names, see SQL Server data types.- Type: list of strings
- Default: No default
message.key.columns
A semicolon list of regular expressions that match fully-qualified tables and columns to map a primary key. Each item (regular expression) must be of the form
<fully-qualified table>:<a comma-separated list of columns>
, representing the custom key. Fully-qualified tables could be defined asschemaName.tableName
.- Type: list of strings
- Default: No default
binary.handling.mode
This property specifies how binary (binary, varbinary) columns should be represented in change events.
bytes
represents binary data as byte array.base64
represents binary data as a base64-encoded string, andhex
represents binary data as a base16-encoded (hex-encoded) string.- Type: string
- Default:
bytes
schema.name.adjustment.mode
Specifies how schema names should be adjusted for compatibility with the message converter used by the connector. Possible settings are:
avro
, which replaces the characters that cannot be used in the Avro type name with an underscore, andnone
which does not apply any adjustment.- Type: string
- Default:
avro
field.name.adjustment.mode
Specifies how schema names should be adjusted for compatibility with the message converter used by the connector. The following are possible settings:
avro
: Replaces the characters that cannot be used in the Avro type name with an underscorenone
: Does not apply any adjustmentavro_unicode
: Replaces the underscore or characters that cannot be used in the Avro type name with corresponding unicode like_uxxxx
. Note that_
is an escape sequence like backslash in Java.
For more details, see Avro naming.
- Type: string
- Default:
none
Advanced properties¶
converters
Enumerates a comma-separated list of the symbolic names of the custom converter instances that the connector can use. For more details about this property, see the Debezium documentation
- Default: No default
snapshot.mode
A mode for taking an initial snapshot of the structure and optionally data of captured tables. Once the snapshot is complete, the connector will continue reading change events from the database’s redo logs.
- Type: string
- Default:
initial
- Valid values: [
always
,initial
,initial_only
never
,custom
]
snapshot.include.collection.list
A optional comma-separated list of regular expressions that match the fully-qualified names (
<schemaName>.<tableName>
) of the tables to include in a snapshot. The specified items must be named in the connector’stable.include.list
property. This property takes effect only if the connector’ssnapshot.mode
property is set to a value other thannever
.- Default: All tables specified in
table.include.list
- Default: All tables specified in
snapshot.isolation.mode
Mode to control which transaction isolation level is used and how long the connector locks tables that are designated for capture.
- Type: string
- Default:
repeatable_read
- Valid values: [
read_uncommitted
,read_committed
,repeatable_read
,snapshot
,exclusive
]
event.processing.failure.handling.mode
Specifies how the connector should react to exceptions during processing of events.
- Type: string
- Default:
fail
- Valid values: [
fail
,warn
,skip
]
poll.interval.ms
Specifies the number of milliseconds the connector should wait during each iteration for new change events to appear.
- Type: int
- Importance: low
- Default: 500
max.queue.size
Specifies the maximum size of the blocking queue into which change events received via streaming replication are placed before they are written to Kafka. This queue can provide backpressure when, for example, writes to Kafka are slower or if Kafka is not available.
- Type: int
- Importance: low
- Default: 8192
max.queue.size.in.bytes
A long value that specifies the maximum volume of the blocking queue in bytes. By default, volume limits are not specified for the blocking queue. To specify the number of bytes that the queue can consume, you must set this property to a positive long value. If
max.queue.size
is also set, writing to the queue is blocked when the size of the queue reaches the limit specified by either property. For example, if you setmax.queue.size
to 1000, and setmax.queue.size.in.bytes
to 5000, writing to the queue is blocked after the queue contains 1000 records, or after the volume of the records in the queue reaches 5000 bytes.- Type: long
- Default: 0
max.batch.size
Specifies the maximum size of each batch of events that should be processed during each iteration of this connector.
- Type: int
- Importance: low
- Default: 2048
heartbeat.interval.ms
Controls how frequently heartbeat messages are sent. This property (which is disabled by default) contains an interval in milliseconds that defines how frequently the connector sends messages to a heartbeat topic. This can be used to monitor whether the connector is still receiving change events from the database. You also should use heartbeat messages when records in non-captured tables are changed for a longer period of time. In this case, the connector proceeds to read the log from the database but never emits any change messages into Kafka. This means that no offset updates are committed to Kafka.
This causes WAL files to be retained by the database longer than needed because the connector processed the files already but did not flush the latest retrieved Log Sequence Number (LSN) to the database. Using heartbeat messages may also result in more re-sent change events after a connector restart. Set this parameter to
0
to not send heartbeat messages.- Type: int
- Importance: low
- Default: 0
snapshot.delay.ms
An interval in milliseconds that the connector should wait before taking a snapshot after starting up. This setting can be used to avoid snapshot interruptions when starting multiple connectors in a cluster, which can cause connector re-balancing.
- Type: int
- Importance: low
- Default: No default
snapshot.fetch.size
During a snapshot, the connector reads table content in batches of rows. This property specifies the maximum number of rows in a batch.
- Type: int
- Default: 2000
query.fetch.size
Specifies the number of rows that will be fetched for each database of a given query.
- Type: int
- Default: JDBC driver’s default fetch size
snapshot.lock.timeout.ms
Positive integer value that specifies the maximum amount of time (in milliseconds) to wait to obtain table locks when performing a snapshot. If table locks cannot be acquired in this time interval, the snapshot will fail.
- Type: string
- Importance: low
- Default: 10000
snapshot.select.statement.overrides
- Controls which rows from tables will be included in snapshot.
This property contains a comma-separated list of fully-qualified tables (
DB_NAME.TABLE_NAME
). Select statements for the individual tables are specified in additional configuration properties, one for each table, identified by the id source.struct.version
Schema version for the source block in CDC events.
- Type: string
- Default:
v2
provide.transaction.metadata
Determines whether the connector generates events with transaction boundaries and enriches change event envelopes with transaction metadata. Specify
true
if you want the connector to do this.- Type: boolean
- Default:
false
retriable.restart.connector.wait.ms
The number of milliseconds to wait before restarting a connector after a retriable error occurs.
- Type: int
- Default: 10000
skipped.operations
Comma-separated list of operation types that will be skipped during streaming. The operations include:
c
for inserts/create,u
for updates, andd
for deletes.- Type: list of strings
- Default: By default, no operations are skipped
signal.data.collection
The fully-qualified name of the data collection that is used to send signals to the connector. Use the following format to specify the collection name:
<databaseName>.<tableName>
.- Type: string
- Default: No default
signal.enabled.channels
A list of the signal channel names that are enabled for the connector. The following channels are available by default:
source
,kafka
,file
andjmx
.Note that you may also implement a custom signaling channel.
- Type: string
- Default:
source
notification.enabled.channels
A list of the notification channel names that are enabled for the connector. The following channels are available by default:
sink
,log
andjmx
.Note that you may also implement a custom notification channel.
- Type: string
- Default: No default
incremental.snapshot.allow.schema.changes
When set to
true
this property allows schema changes during an incremental snapshot.- Type: boolean
- Default: false
incremental.snapshot.chunk.size
The number of rows fetched from the database for each incremental snapshot iteration.
- Type: int
- Default: 1024
max.iteration.transactions
Specifies the maximum number of transactions per iteration to be used to reduce the memory footprint when streaming changes from multiple tables in a database.
- Type: int
- Default: 0
incremental.snapshot.option.recompile
Uses the OPTION(RECOMPILE) query option for all SELECT statements used during an incremental snapshot. This can help to solve parameter sniffing issues that may occur but can cause increased CPU load on the source database, depending on the frequency of query execution.
- Type: int
- Default: false
topic.naming.strategy
The name of the TopicNamingStrategy class that should be used to determine the topic name for data change, schema change, transaction, heartbeat event, and so forth.
- Type: string
- Defaults:
io.debezium.schema.DefaultTopicNamingStrategy
topic.delimiter
Specifies the delimiter for topic name.
- Type: string
- Default:
.
topic.cache.size
The size used for holding the topic names in a bounded concurrent hash map. This cache will help to determine the topic name corresponding to a given data collection.
- Type: int
- Default: 10000
topic.heartbeat.prefix
Sets the name of the topic to which heartbeat messages are sent. The topic is named according to the pattern
<topics.heartbeat.prefix>.<topic.prefix>
. For example, if the topic prefix isfulfillment
, the default topic name is__debezium-heartbeat.fulfillment
.- Type: string
- Importance: low
- Default:
__debezium-heartbeat
topic.transaction
Controls the name of the topic to which the connector sends transaction metadata messages. The topic name has the following pattern:
<topic.prefix>.<topic.transaction>
. For example, if the topic prefix isfulfillment
, the default topic name isfulfillment.transaction
.- Type: string
- Default:
transaction
snapshot.max.threads
Specifies the number of threads that the connector uses when performing an initial snapshot. To enable parallel initial snapshots, set the property to a value greater than 1. In a parallel initial snapshot, the connector processes multiple tables concurrently.
- Type: int
- Importance: medium
- Default: 1
custom.metric.tags
Accepts key-value pairs to customize the MBean object name. For more details, see the Debezium documentation
- Type: string
- Default: No default
errors.max.retries
The maximum number of retries on retriable errors (for example, connection errors) before failing.
- Type: int
- Default: -1
Database schema history properties¶
schema.history.internal.kafka.topic
The full name of the Kafka topic where the connector will store the database schema history.
Important
Because Debezium uses multiple topics–of which certain limitations may apply–for storing data, Confluent recommends you view Configuring Debezium Topics before you create a database schema history topic.
- Type: string
- Importance: high
- Default: No default
schema.history.internal.kafka.bootstrap.servers
A list of host/port pairs that the connector will use for establishing an initial connection to the Kafka cluster. This connection will be used for retrieving database schema history previously stored by the connector, and for writing each DDL statement read from the source database. This should point to the same Kafka cluster used by the Kafka Connect process.
- Type: list of strings
- Importance: high
- Default: No Default
schema.history.internal.kafka.recovery.poll.interval.ms
The maximum time duration, in milliseconds, the connector waits during startup or recovery while polling for persisted data.
- Type: int
- Default: 100
schema.history.internal.kafka.query.timeout.ms
The maximum time duration, in milliseconds, the connector waits for cluster information to be fetched using the Kafka admin client.
- Type: int
- Default: 3000
schema.history.internal.kafka.create.timeout.ms
An integer value that specifies the maximum number of milliseconds the connector should wait while creating the Kafka history topic using the Kafka admin client.
- Type: int
- Default: 30000
schema.history.internal.kafka.recovery.attempts
The maximum number of times that the connector should try to read persisted history data before the connector recovery fails with an error. The maximum amount of time to wait after receiving no data is
recovery.attempts x recovery.poll.interval.ms
.- Type: int
- Default: 100
schema.history.skip.unparseable.ddl
A Boolean value that specifies whether the connector should ignore malformed or unknown database statements or stop processing so a human can fix the issue. The safe default is
false
. Skipping should be used only with care as it can lead to data loss or mangling when the binlog is being processed.- Type: boolean
- Default: false
schema.history.internal.store.only.captured.tables.ddl
A Boolean value that specifies whether the connector records schema structures from all tables in a schema or database, or only from tables that are designated for capture.
- Type: boolean
- Default: false
schema.history.internal.store.only.captured.databases.ddl
A Boolean value that specifies whether the connector should record schema structures from all logical databases instance. When set to
true
the connector records schema structures only for tables in the logical database and schema from which Debezium captures change events. When set tofalse
the connector records schema structures for all logical databases.- Type: boolean
- Default: false
Auto topic creation¶
For more information about Auto topic creation, see Configuring Auto Topic Creation for Source Connectors.
Configuration properties accept regular expressions (regex) that are defined as Java regex.
topic.creation.groups
A list of group aliases that are used to define per-group topic configurations for matching topics. A
default
group always exists and matches all topics.- Type: List of String types
- Default: empty
- Possible Values: The values of this property refer to any additional groups. A
default
group is always defined for topic configurations.
topic.creation.$alias.replication.factor
The replication factor for new topics created by the connector. This value must not be larger than the number of brokers in the Kafka cluster. If this value is larger than the number of Kafka brokers, an error occurs when the connector attempts to create a topic. This is a required property for the
default
group. This property is optional for any other group defined intopic.creation.groups
. Other groups use the Kafka broker default value.- Type: int
- Default: n/a
- Possible Values:
>= 1
for a specific valid value or-1
to use the Kafka broker’s default value.
topic.creation.$alias.partitions
The number of topic partitions created by this connector. This is a required property for the
default
group. This property is optional for any other group defined intopic.creation.groups
. Other groups use the Kafka broker default value.- Type: int
- Default: n/a
- Possible Values:
>= 1
for a specific valid value or-1
to use the Kafka broker’s default value.
topic.creation.$alias.include
A list of strings that represent regular expressions that match topic names. This list is used to include topics with matching values, and apply this group’s specific configuration to the matching topics.
$alias
applies to any group defined intopic.creation.groups
. This property does not apply to thedefault
group.- Type: List of String types
- Default: empty
- Possible Values: Comma-separated list of exact topic names or regular expressions.
topic.creation.$alias.exclude
A list of strings representing regular expressions that match topic names. This list is used to exclude topics with matching values from getting the group’s specfic configuration.
$alias
applies to any group defined intopic.creation.groups
. This property does not apply to thedefault
group. Note that exclusion rules override any inclusion rules for topics.- Type: List of String types
- Default: empty
- Possible Values: Comma-separated list of exact topic names or regular expressions.
topic.creation.$alias.${kafkaTopicSpecificConfigName}
Any of the Changing Broker Configurations Dynamically for the version of the Kafka broker where the records will be written. The broker’s topic-level configuration value is used if the configuration is not specified for the rule.
$alias
applies to thedefault
group as well as any group defined intopic.creation.groups
.- Type: property values
- Default: Kafka broker value
More details can be found in the Debezium connector properties documentation.
Note
Portions of the information provided here derives from documentation originally produced by the Debezium Community. Work produced by Debezium is licensed under Creative Commons 3.0.