Configuration Reference for Debezium SQL Server Source Connector for Confluent Platform

The SQL Server Source connector can be configured using a variety of configuration properties.

Note

These are properties for the self-managed connector. If you are using Confluent Cloud, see Debezium SQL Server Source Connector for Confluent Cloud.

Required properties

name
A unique name for the connector. Trying to register again with the same name will fail.
connector.class
The name of the Java class for the connector. You must use io.debezium.connector.sqlserver.SqlServerConnector for the SQL Server connector.
tasks.max

The maximum number of tasks that should be created for this connector. The SQL Server connector always uses a single task.

  • Type: int
  • Default: 1
database.hostname

IP address or hostname of the SQL Server database server.

  • Type: string
  • Importance: high
database.port

Integer port number of the SQL Server database server.

  • Type: int
  • Importance: low
  • Default: 1433
database.user

Username to use when when connecting to the SQL Server database server.

  • Type: string
  • Importance: high
database.password

Password to use when when connecting to the SQL Server database server.

  • Type: password
  • Importance: high
database.instance

Specifies the instance name of the SQL Server named instance.

  • Type: string
  • Importance: No default
database.names

The comma-separated list of the SQL Server database names from which to stream the changes. Currently, only one database name is supported. This property must not be used with database.dbname.

  • Type: list of string
  • Default: No default
topic.prefix

Topic prefix that provides a namespace for the particular PostgreSQL database server or cluster in which Debezium is capturing changes. The prefix should be unique across all other connectors, since it is used as a topic name prefix for all Kafka topics that receive records from this connector. Only alphanumeric characters, hyphens, dots and underscores must be used in the database server logical name. Do not change the value of this property. If you change the name value, after a restart, instead of continuing to emit events to the original topics, the connector emits subsequent events to topics whose names are based on the new value.

  • Type: string
  • Default: No default
schema.include.list

An optional comma-separated list of regular expressions that match schema names to be monitored. Any schema name not included in the whitelist will be excluded from monitoring. By default all non-system schemas are monitored. May not be used with schema.exclude.list.

  • Type: list of strings
  • Importance: low
schema.exclude.list

An optional comma-separated list of regular expressions that match schema names to be excluded from monitoring. Any schema name not included in the exclude list will be monitored, with the exception of system schemas. May not be used with schema.whitelist.

  • Type: list of strings
  • Importance: low
table.include.list

A comma-separated list of regular expressions that match fully-qualified table identifiers for tables you want Debezium to capture; any table that is not included in table.include.list is excluded from capture. Each identifier is of the form schemaName.tableName. By default, the connector captures all non-system tables for the designated schemas. Do not use with table.exclude.list.

  • Type: list of strings
  • Importance: low
table.exclude.list

An comma-separated list of regular expressions that match fully-qualified table identifiers for the tables you want to exclude from being captured. Each identifier is of the form schemaName.tableName. Do not use with table.include.list.

  • Type: list of strings
  • Importance: low
column.include.list

An optional, comma-separated list of regular expressions that match the fully-qualified names of columns that should be included in change event record values. Fully-qualified names for columns use the form schemaName.tableName.columnName. Do not also set the column.exclude.list property.

  • Type: list of strings
  • Importance: low
  • Default: “” (Empty string)
column.exclude.list

A comma-separated list of regular expressions that match the fully-qualified names of columns that should be excluded from change event message values. Fully-qualified names for columns use the form schemaName.tableName.columnName. Do not also set the column.include.list property.

  • Type: list of strings
  • Importance: low
  • Default: Empty string
skip.messages.without.change

Specifies whether to skip publishing messages when there is no change in included columns. This would essentially filter messages if there is no change in columns included as per the column.include.list or column.exclude.list properties.

  • Type: boolean
  • Default: false
column.mask​.hash.hashAlgorithm​.with.salt.salt; column.mask.hash.v2.hashAlgorithm.with.salt.salt

An optional comma-separated list of regular expressions that match the fully-qualified names of character-based columns; the character-based column values should be pseudonyms in the change event message values with a field value consisting of the hashed value using the algorithm hashAlgorithm and salt. For more details, see the Debezium documentation.

  • Type: list of strings
  • Default: No default
decimal.handling.mode

Specifies how the connector should handle values for DECIMAL and NUMERIC columns:

  • precise: Represents them precisely using java.math.BigDecimal values represented in change events in a binary form.
  • double: Represents them using double values, which may result in a loss of precision but is easier to use.
  • string: Encodes values as formatted strings, which is easy to consume but semantic information about the real type is lost.
  • Type: string
  • Default: precise
time.precision.mode

Time, date, and timestamps can be represented with different kinds of precision, including:

  • adaptive: (the default) captures the time and timestamp values exactly as in the database using either millisecond, microsecond, or nanosecond precision values based on the database column’s type.
  • connect: always represents time and timestamp values using Kafka Connect’s built-in representations for Time, Date, and Timestamp, which uses millisecond precision regardless of the database columns’ precision. For more information, see temporal values.
  • Type: string
  • Default: adaptive
include.schema.changes

Use this property to specify whether the connector should publish changes in the database schema to a Kafka topic with the same name as the database server ID. Each schema change is recorded with a key that contains the database name and a value that is a JSON structure that describes the schema update. This is independent of how the connector internally records database history.

  • Type: boolean
  • Default: true
tombstones.on.delete

This property controls whether a tombstone event should be generated after a delete event. When set to true, the delete operations are represented by a delete event and a subsequent tombstone event. When set to false, only a delete event is sent. Emitting the tombstone event (the default behavior) allows Kafka to completely delete all events pertaining to the given key once the source record is deleted.

  • Type: string
  • Importance: low
  • Default: true
column.truncate.to.length.chars

An optional comma-separated list of regular expressions that match the fully-qualified names of character-based columns; the character-based column values should be truncated in the change event message values if the field values are longer than the specified number of characters. Fully-qualified names for columns use the form schemaName.tableName.columnName.

  • Type: list of strings
  • Default: No default
column.mask.with.length.chars

An optional, comma-separated list of regular expressions that match the fully-qualified names of character-based columns; the character-based columns should be replaced in the change event message values with a field value consisting of the specified number of asterisk (*) characters. Fully-qualified names for columns use the form schemaName.tableName.columnName.

  • Type: list of strings
  • Default: No default
column.propagate.source.type

An optional comma-separated list of regular expressions that match the fully-qualified names of columns whose original type and length should be added as a parameter to the corresponding field schemas in the emitted change messages. The schema parameters __debezium.source.column.type, __debezium.source.column.length and __debezium.source.column.scale will be used to propagate the original type name and length (for variable-width types), respectively. Useful to properly size corresponding columns in sink databases. Fully-qualified names for columns use the form`` schemaName.tableName.columnName``.

  • Type: list of strings
  • Importance: low
  • Default: No default
datatype.propagate.source.type+

An optional comma-separated list of regular expressions. These expressions match the database-specific data type name of columns whose original type and length should be added as a parameter to the corresponding field schemas in the emitted change messages. The schema parameters __debezium.source.column.type, __debezium.source.column.length and __debezium.source.column.scale will be used to propagate the original type name and length (for variable-width types), respectively. Fully-qualified data type names use the form: schemaName.tableName.typeName. For a list of SQL Server-specific data type names, see SQL Server data types.

  • Type: list of strings
  • Default: No default
message.key.columns

A semicolon list of regular expressions that match fully-qualified tables and columns to map a primary key. Each item (regular expression) must be of the form <fully-qualified table>:<a comma-separated list of columns>, representing the custom key. Fully-qualified tables could be defined as schemaName.tableName.

  • Type: list of strings
  • Default: No default
binary.handling.mode

This property specifies how binary (binary, varbinary) columns should be represented in change events. bytes represents binary data as byte array. base64 represents binary data as a base64-encoded string, and hex represents binary data as a base16-encoded (hex-encoded) string.

  • Type: string
  • Default: bytes
schema.name.adjustment.mode

Specifies how schema names should be adjusted for compatibility with the message converter used by the connector. Possible settings are: avro, which replaces the characters that cannot be used in the Avro type name with an underscore, and none which does not apply any adjustment.

  • Type: string
  • Default: avro
field.name.adjustment.mode

Specifies how schema names should be adjusted for compatibility with the message converter used by the connector. The following are possible settings:

  • avro: Replaces the characters that cannot be used in the Avro type name with an underscore
  • none: Does not apply any adjustment
  • avro_unicode: Replaces the underscore or characters that cannot be used in the Avro type name with corresponding unicode like _uxxxx. Note that _ is an escape sequence like backslash in Java.

For more details, see Avro naming.

  • Type: string
  • Default: none

Advanced properties

converters

Enumerates a comma-separated list of the symbolic names of the custom converter instances that the connector can use. For more details about this property, see the Debezium documentation

  • Default: No default
snapshot.mode

A mode for taking an initial snapshot of the structure and optionally data of captured tables. Once the snapshot is complete, the connector will continue reading change events from the database’s redo logs.

  • Type: string
  • Default: initial
  • Valid values: [always, initial, initial_only never, custom]
snapshot.include.collection.list

A optional comma-separated list of regular expressions that match the fully-qualified names (<schemaName>.<tableName>) of the tables to include in a snapshot. The specified items must be named in the connector’s table.include.list property. This property takes effect only if the connector’s snapshot.mode property is set to a value other than never.

  • Default: All tables specified in table.include.list
snapshot.isolation.mode

Mode to control which transaction isolation level is used and how long the connector locks tables that are designated for capture.

  • Type: string
  • Default: repeatable_read
  • Valid values: [read_uncommitted, read_committed, repeatable_read, snapshot, exclusive]
event.processing.failure.handling.mode

Specifies how the connector should react to exceptions during processing of events.

  • Type: string
  • Default: fail
  • Valid values: [fail, warn, skip]
poll.interval.ms

Specifies the number of milliseconds the connector should wait during each iteration for new change events to appear.

  • Type: int
  • Importance: low
  • Default: 500
max.queue.size

Specifies the maximum size of the blocking queue into which change events received via streaming replication are placed before they are written to Kafka. This queue can provide backpressure when, for example, writes to Kafka are slower or if Kafka is not available.

  • Type: int
  • Importance: low
  • Default: 8192
max.queue.size.in.bytes

A long value that specifies the maximum volume of the blocking queue in bytes. By default, volume limits are not specified for the blocking queue. To specify the number of bytes that the queue can consume, you must set this property to a positive long value. If max.queue.size is also set, writing to the queue is blocked when the size of the queue reaches the limit specified by either property. For example, if you set max.queue.size to 1000, and set max.queue.size.in.bytes to 5000, writing to the queue is blocked after the queue contains 1000 records, or after the volume of the records in the queue reaches 5000 bytes.

  • Type: long
  • Default: 0
max.batch.size

Specifies the maximum size of each batch of events that should be processed during each iteration of this connector.

  • Type: int
  • Importance: low
  • Default: 2048
heartbeat.interval.ms

Controls how frequently heartbeat messages are sent. This property (which is disabled by default) contains an interval in milliseconds that defines how frequently the connector sends messages to a heartbeat topic. This can be used to monitor whether the connector is still receiving change events from the database. You also should use heartbeat messages when records in non-captured tables are changed for a longer period of time. In this case, the connector proceeds to read the log from the database but never emits any change messages into Kafka. This means that no offset updates are committed to Kafka.

This causes WAL files to be retained by the database longer than needed because the connector processed the files already but did not flush the latest retrieved Log Sequence Number (LSN) to the database. Using heartbeat messages may also result in more re-sent change events after a connector restart. Set this parameter to 0 to not send heartbeat messages.

  • Type: int
  • Importance: low
  • Default: 0
snapshot.delay.ms

An interval in milliseconds that the connector should wait before taking a snapshot after starting up. This setting can be used to avoid snapshot interruptions when starting multiple connectors in a cluster, which can cause connector re-balancing.

  • Type: int
  • Importance: low
  • Default: No default
snapshot.fetch.size

During a snapshot, the connector reads table content in batches of rows. This property specifies the maximum number of rows in a batch.

  • Type: int
  • Default: 2000
query.fetch.size

Specifies the number of rows that will be fetched for each database of a given query.

  • Type: int
  • Default: JDBC driver’s default fetch size
snapshot.lock.timeout.ms

Positive integer value that specifies the maximum amount of time (in milliseconds) to wait to obtain table locks when performing a snapshot. If table locks cannot be acquired in this time interval, the snapshot will fail.

  • Type: string
  • Importance: low
  • Default: 10000
snapshot.select.statement.overrides
Controls which rows from tables will be included in snapshot. This property contains a comma-separated list of fully-qualified tables (DB_NAME.TABLE_NAME). Select statements for the individual tables are specified in additional configuration properties, one for each table, identified by the id
source.struct.version

Schema version for the source block in CDC events.

  • Type: string
  • Default: v2
provide.transaction.metadata

Determines whether the connector generates events with transaction boundaries and enriches change event envelopes with transaction metadata. Specify true if you want the connector to do this.

  • Type: boolean
  • Default: false
retriable.restart.connector.wait.ms

The number of milliseconds to wait before restarting a connector after a retriable error occurs.

  • Type: int
  • Default: 10000
skipped.operations

Comma-separated list of operation types that will be skipped during streaming. The operations include: c for inserts/create, u for updates, and d for deletes.

  • Type: list of strings
  • Default: By default, no operations are skipped
signal.data.collection

The fully-qualified name of the data collection that is used to send signals to the connector. Use the following format to specify the collection name: <databaseName>.<tableName>.

  • Type: string
  • Default: No default
signal.enabled.channels

A list of the signal channel names that are enabled for the connector. The following channels are available by default: source, kafka, file and jmx.

Note that you may also implement a custom signaling channel.

  • Type: string
  • Default: source
notification.enabled.channels

A list of the notification channel names that are enabled for the connector. The following channels are available by default: sink, log and jmx.

Note that you may also implement a custom notification channel.

  • Type: string
  • Default: No default
incremental.snapshot.allow.schema.changes

When set to true this property allows schema changes during an incremental snapshot.

  • Type: boolean
  • Default: false
incremental.snapshot.chunk.size

The number of rows fetched from the database for each incremental snapshot iteration.

  • Type: int
  • Default: 1024
max.iteration.transactions

Specifies the maximum number of transactions per iteration to be used to reduce the memory footprint when streaming changes from multiple tables in a database.

  • Type: int
  • Default: 0
incremental.snapshot.option.recompile

Uses the OPTION(RECOMPILE) query option for all SELECT statements used during an incremental snapshot. This can help to solve parameter sniffing issues that may occur but can cause increased CPU load on the source database, depending on the frequency of query execution.

  • Type: int
  • Default: false
topic.naming.strategy

The name of the TopicNamingStrategy class that should be used to determine the topic name for data change, schema change, transaction, heartbeat event, and so forth.

  • Type: string
  • Defaults: io.debezium.schema.DefaultTopicNamingStrategy
topic.delimiter

Specifies the delimiter for topic name.

  • Type: string
  • Default: .
topic.cache.size

The size used for holding the topic names in a bounded concurrent hash map. This cache will help to determine the topic name corresponding to a given data collection.

  • Type: int
  • Default: 10000
topic.heartbeat.prefix

Sets the name of the topic to which heartbeat messages are sent. The topic is named according to the pattern <topics.heartbeat.prefix>.<topic.prefix>. For example, if the topic prefix is fulfillment, the default topic name is __debezium-heartbeat.fulfillment.

  • Type: string
  • Importance: low
  • Default: __debezium-heartbeat
topic.transaction

Controls the name of the topic to which the connector sends transaction metadata messages. The topic name has the following pattern: <topic.prefix>.<topic.transaction>. For example, if the topic prefix is fulfillment, the default topic name is fulfillment.transaction.

  • Type: string
  • Default: transaction
snapshot.max.threads

Specifies the number of threads that the connector uses when performing an initial snapshot. To enable parallel initial snapshots, set the property to a value greater than 1. In a parallel initial snapshot, the connector processes multiple tables concurrently.

  • Type: int
  • Importance: medium
  • Default: 1
custom.metric.tags

Accepts key-value pairs to customize the MBean object name. For more details, see the Debezium documentation

  • Type: string
  • Default: No default
errors.max.retries

The maximum number of retries on retriable errors (for example, connection errors) before failing.

  • Type: int
  • Default: -1

Database schema history properties

schema.history.internal.kafka.topic

The full name of the Kafka topic where the connector will store the database schema history.

Important

Because Debezium uses multiple topics–of which certain limitations may apply–for storing data, Confluent recommends you view Configuring Debezium Topics before you create a database schema history topic.

  • Type: string
  • Importance: high
  • Default: No default
schema.history.internal.kafka.bootstrap.servers

A list of host/port pairs that the connector will use for establishing an initial connection to the Kafka cluster. This connection will be used for retrieving database schema history previously stored by the connector, and for writing each DDL statement read from the source database. This should point to the same Kafka cluster used by the Kafka Connect process.

  • Type: list of strings
  • Importance: high
  • Default: No Default
schema.history.internal.kafka.recovery.poll.interval.ms

The maximum time duration, in milliseconds, the connector waits during startup or recovery while polling for persisted data.

  • Type: int
  • Default: 100
schema.history.internal.kafka.query.timeout.ms

The maximum time duration, in milliseconds, the connector waits for cluster information to be fetched using the Kafka admin client.

  • Type: int
  • Default: 3000
schema.history.internal.kafka.create.timeout.ms

An integer value that specifies the maximum number of milliseconds the connector should wait while creating the Kafka history topic using the Kafka admin client.

  • Type: int
  • Default: 30000
schema.history.internal.kafka.recovery.attempts

The maximum number of times that the connector should try to read persisted history data before the connector recovery fails with an error. The maximum amount of time to wait after receiving no data is recovery.attempts x recovery.poll.interval.ms.

  • Type: int
  • Default: 100
schema.history.skip.unparseable.ddl

A Boolean value that specifies whether the connector should ignore malformed or unknown database statements or stop processing so a human can fix the issue. The safe default is false. Skipping should be used only with care as it can lead to data loss or mangling when the binlog is being processed.

  • Type: boolean
  • Default: false
schema.history.internal.store.only.captured.tables.ddl

A Boolean value that specifies whether the connector records schema structures from all tables in a schema or database, or only from tables that are designated for capture.

  • Type: boolean
  • Default: false
schema.history.internal.store.only.captured.databases.ddl

A Boolean value that specifies whether the connector should record schema structures from all logical databases instance. When set to true the connector records schema structures only for tables in the logical database and schema from which Debezium captures change events. When set to false the connector records schema structures for all logical databases.

  • Type: boolean
  • Default: false

Auto topic creation

For more information about Auto topic creation, see Configuring Auto Topic Creation for Source Connectors.

Configuration properties accept regular expressions (regex) that are defined as Java regex.

topic.creation.groups

A list of group aliases that are used to define per-group topic configurations for matching topics. A default group always exists and matches all topics.

  • Type: List of String types
  • Default: empty
  • Possible Values: The values of this property refer to any additional groups. A default group is always defined for topic configurations.
topic.creation.$alias.replication.factor

The replication factor for new topics created by the connector. This value must not be larger than the number of brokers in the Kafka cluster. If this value is larger than the number of Kafka brokers, an error occurs when the connector attempts to create a topic. This is a required property for the default group. This property is optional for any other group defined in topic.creation.groups. Other groups use the Kafka broker default value.

  • Type: int
  • Default: n/a
  • Possible Values: >= 1 for a specific valid value or -1 to use the Kafka broker’s default value.
topic.creation.$alias.partitions

The number of topic partitions created by this connector. This is a required property for the default group. This property is optional for any other group defined in topic.creation.groups. Other groups use the Kafka broker default value.

  • Type: int
  • Default: n/a
  • Possible Values: >= 1 for a specific valid value or -1 to use the Kafka broker’s default value.
topic.creation.$alias.include

A list of strings that represent regular expressions that match topic names. This list is used to include topics with matching values, and apply this group’s specific configuration to the matching topics. $alias applies to any group defined in topic.creation.groups. This property does not apply to the default group.

  • Type: List of String types
  • Default: empty
  • Possible Values: Comma-separated list of exact topic names or regular expressions.
topic.creation.$alias.exclude

A list of strings representing regular expressions that match topic names. This list is used to exclude topics with matching values from getting the group’s specfic configuration. $alias applies to any group defined in topic.creation.groups. This property does not apply to the default group. Note that exclusion rules override any inclusion rules for topics.

  • Type: List of String types
  • Default: empty
  • Possible Values: Comma-separated list of exact topic names or regular expressions.
topic.creation.$alias.${kafkaTopicSpecificConfigName}

Any of the Changing Broker Configurations Dynamically for the version of the Kafka broker where the records will be written. The broker’s topic-level configuration value is used if the configuration is not specified for the rule. $alias applies to the default group as well as any group defined in topic.creation.groups.

  • Type: property values
  • Default: Kafka broker value

More details can be found in the Debezium connector properties documentation.

Note

Portions of the information provided here derives from documentation originally produced by the Debezium Community. Work produced by Debezium is licensed under Creative Commons 3.0.