SQL Server Source Connector (Debezium) Configuration Properties

The SQL Server Source connector can be configured using a variety of configuration properties.

Note

These are properties for the self-managed connector. If you are using Confluent Cloud, see Debezium SQL Server Source Connector for Confluent Cloud.

Required properties

name
A unique name for the connector. Trying to register again with the same name will fail.
connector.class
The name of the Java class for the connector. You must use io.debezium.connector.sqlserver.SqlServerConnector for the SQL Server connector.
tasks.max

The maximum number of tasks that should be created for this connector. The SQL Server connector always uses a single task.

  • Type: int
  • Default: 1
database.hostname

IP address or hostname of the SQL Server database server.

  • Type: string
  • Importance: high
database.port

Integer port number of the SQL Server database server.

  • Type: int
  • Importance: low
  • Default: 1433
database.user

Username to use when when connecting to the SQL Server database server.

  • Type: string
  • Importance: high
database.password

Password to use when when connecting to the SQL Server database server.

  • Type: password
  • Importance: high
database.dbname

The name of the SQL Server database from which to stream the changes.

  • Type: string
  • Importance: high
database.names

The comma-separated list of the SQL Server database names from which to stream the changes. Currently, only one database name is supported. Must not be used with database.dbname. This option is experimental and must not be used in production. Using it will make the behavior of the connector incompatible with the default configuration with no upgrade or downgrade path:

The connector will use different keys for its committed offset messages.

The SQL statements used in snapshot.select.statement.overrides will have to use the database name as part of the fully-qualified table name.

The structure of the exposed connector metrics will be different.

  • Type: string
  • Importance: high
database.server.name

Logical name that identifies and provides a namespace for the particular SQL Server database server being monitored. The logical name should be unique across all other connectors, since it is used as a prefix for all Kafka topic names coming from this connector.

  • Type: string
  • Importance: high
table.include.list

A comma-separated list of regular expressions that match fully-qualified table identifiers for tables you want Debezium to capture; any table that is not included in table.include.list is excluded from capture. Each identifier is of the form schemaName.tableName. By default, the connector captures all non-system tables for the designated schemas. Do not use with table.exclude.list.

  • Type: list of strings
  • Importance: low
table.exclude.list

An comma-separated list of regular expressions that match fully-qualified table identifiers for the tables you want to exclude from being captured. Each identifier is of the form schemaName.tableName. Do not use with table.include.list.

  • Type: list of strings
  • Importance: low
column.include.list

An optional, comma-separated list of regular expressions that match the fully-qualified names of columns that should be included in change event record values. Fully-qualified names for columns are of the form schemaName.tableName.columnName. Do not also set the column.exclude.list property.

  • Type: list of strings
  • Importance: low
  • Default: “” (Empty string)
column.exclude.list

A comma-separated list of regular expressions that match the fully-qualified names of columns that should be excluded from change event message values. Fully-qualified names for columns are of the form schemaName.tableName.columnName. Do not also set the column.include.list property.

  • Type: list of strings
  • Importance: low
  • Default: Empty string
column.propagate.source.type

An optional comma-separated list of regular expressions that match the fully-qualified names of columns whose original type and length should be added as a parameter to the corresponding field schemas in the emitted change messages. The schema parameters __debezium.source.column.type, __debezium.source.column.length and __debezium.source.column.scale will be used to propagate the original type name and length (for variable-width types), respectively. Useful to properly size corresponding columns in sink databases. Fully-qualified names for columns are of the form`` schemaName.tableName.columnName``.

  • Type: list of strings
  • Importance: low
  • Default: n/a

Advanced properties

snapshot.mode

A mode for taking an initial snapshot of the structure and optionally data of captured tables. Once the snapshot is complete, the connector will continue reading change events from the database’s redo logs.

  • Type: string
  • Default: initial
  • Valid values: [always, initial, initial_only never, custom]
snapshot.include.collection.list

A optional comma-separated list of regular expressions that match the fully-qualified names (<schemaName>.<tableName>) of the tables to include in a snapshot. The specified items must be named in the connector’s table.include.list property. This property takes effect only if the connector’s snapshot.mode property is set to a value other than never.

  • Default: All tables specified in table.include.list
snapshot.isolation.mode

Mode to control which transaction isolation level is used and how long the connector locks tables that are designated for capture.

  • Default: repeatable_read
  • Valid values: [read_uncommitted, read_committed, repeatable_read, snapshot, exclusive]
snapshot.select.statement.overrides

Controls which rows from tables will be included in snapshot. This property contains a comma-separated list of fully-qualified tables (DB_NAME.TABLE_NAME). Select statements for the individual tables are specified in additional configuration properties, one for each table, identified by the id

snapshot.select.statement.overrides.[DB_NAME].[TABLE_NAME]. The value of these properties is the SELECT statement to use when retrieving data from the specific table during the snapshot process. A possible use case for large append-only tables is setting a specific point where to start (resume) the snapshot process, in case a previous snapshot process was interrupted.

Note: This setting has impact on snapshots only. Events generated by logical decoder are not affected by it at all.

  • Type: list of strings
  • Importance: low
event.processing.failure.handling.mode

Specifies how the connector should react to exceptions during processing of events.

  • Default: fail
  • Valid values: [fail, warn, skip]
poll.interval.ms

Specifies the number of milliseconds the connector should wait during each iteration for new change events to appear.

  • Type: int
  • Importance: low
  • Default: 1000
max.queue.size

Specifies the maximum size of the blocking queue into which change events received via streaming replication are placed before they are written to Kafka. This queue can provide backpressure when, for example, writes to Kafka are slower or if Kafka is not available.

  • Type: int
  • Importance: low
  • Default: 8192
max.batch.size

Specifies the maximum size of each batch of events that should be processed during each iteration of this connector.

  • Type: int
  • Importance: low
  • Default: 2048
heartbeat.interval.ms

Controls how frequently heartbeat messages are sent. This property (which is disabled by default) contains an interval in milliseconds that defines how frequently the connector sends messages to a heartbeat topic. This can be used to monitor whether the connector is still receiving change events from the database. You also should use heartbeat messages when records in non-captured tables are changed for a longer period of time. In this case, the connector proceeds to read the log from the database but never emits any change messages into Kafka. This means that no offset updates are committed to Kafka.

This causes WAL files to be retained by the database longer than needed because the connector processed the files already but did not flush the latest retrieved Log Sequence Number (LSN) to the database. Using heartbeat messages may also result in more re-sent change events after a connector restart. Set this parameter to 0 to not send heartbeat messages.

  • Type: int
  • Importance: low
  • Default: 0
heartbeat.topics.prefix

Sets the name of the topic to which heartbeat messages are sent. The topic is named according to the pattern <heartbeat.topics.prefix>.<server.name>.

  • Type: string
  • Importance: low
  • Default: __debezium-heartbeat
snapshot.delay.ms

An interval in milliseconds that the connector should wait before taking a snapshot after starting up. This setting can be used to avoid snapshot interruptions when starting multiple connectors in a cluster, which can cause connector re-balancing.

  • Type: int
  • Importance: low
snapshot.fetch.size

During a snapshot, the connector reads table content in batches of rows. This property specifies the maximum number of rows in a batch.

  • Type: int
  • Default: 2000
snapshot.lock.timeout.ms

Positive integer value that specifies the maximum amount of time (in milliseconds) to wait to obtain table locks when performing a snapshot. If table locks cannot be acquired in this time interval, the snapshot will fail.

  • Type: string
  • Importance: low
  • Default: 10000
snapshot.select.statement.overrides
Controls which rows from tables will be included in snapshot. This property contains a comma-separated list of fully-qualified tables (DB_NAME.TABLE_NAME). Select statements for the individual tables are specified in additional configuration properties, one for each table, identified by the id
source.struct.version

Schema version for the source block in CDC events.

  • Default: v2
sanitize.field.names

Indicates whether field names are sanitized to adhere to Avro naming requirements. true if connector configuration sets the key.converter or value.converter property to the Avro converter.

  • Type: boolean
  • Default: false
provide.transaction.metadata

Determines whether the connector generates events with transaction boundaries and enriches change event envelopes with transaction metadata. Specify true if you want the connector to do this.

  • Type: boolean
  • Default: false
transaction.topic

Controls the name of the topic to which the connector sends transaction metadata messages. The placeholder ${database.server.name} can be used for referring to the connector’s logical name.

  • Default: ${database.server.name}.transaction
retriable.restart.connector.wait.ms

The number of milliseconds to wait before restarting a connector after a retriable error occurs.

  • Type: int
  • Default: 10000
skipped.operations

A comma-separated list of operation types that will be skipped during streaming.

  • Type: int
  • Value values: [c, d, u]
  • Default: By default, no operations are skipped
signal.data.collection
Fully-qualified name of the data collection that is used to send signals to the connector. Specify the collection name using the following format: <schemaName>.<tableName>.
incremental.snapshot.allow.schema.changes

Specifies whether to allow schema changes during an incremental snapshot.

  • Type: boolean
  • Default: false
incremental.snapshot.chunk.size

The maximum number of rows that the connector fetches and reads into memory during an incremental snapshot chunk.

  • Type: int
  • Default: 1024
max.iteration.transactions

Specifies the maximum number of transactions per iteration to be used to reduce the memory footprint when streaming changes from multiple tables in a database.

  • Type: int
  • Default: 0
incremental.snapshot.option.recompile

Uses OPTION(RECOMPILE) query option to all SELECT statements used during an incremental snapshot. Helps to solve parameter sniffing issues that may occur but can cause increased CPU load on the source database.

  • Type: boolean
  • Default: false

Database history properties

database.history.kafka.topic

The full name of the Kafka topic where the connector will store the database schema history.

Important

  • Because Debezium uses multiple topics–of which certain limitations may apply–for storing data, Confluent recommends you view Configuring Debezium Topics before you create a database schema history topic.
  • No two connectors can have the same topic configured for database.history.kafka.topic. The history topic tracks the connector activity and must be unique for each connector instance.
  • Type: string
  • Importance: high
database.history.kafka.bootstrap.servers

A list of host/port pairs that the connector will use for establishing an initial connection to the Kafka cluster. This connection will be used for retrieving database schema history previously stored by the connector and for writing each DDL statement read from the source database. This should point to the same Kafka cluster used by the Kafka Connect process.

  • Type: list of strings
  • Importance: high

Note

If the Kafka cluster is secured, you must add the security properties prefixed with database.history.consumer.* and database.history.producer.* to the connector configuration, as shown below:

"database.history.consumer.security.protocol": "SASL_SSL",
"database.history.consumer.sasl.mechanism": "PLAIN",
"database.history.consumer.sasl.jaas.config": "org.apache.kafka.common.security.plain.PlainLoginModule required username=\"key\" password=\"secret\";",

"database.history.producer.security.protocol": "SASL_SSL",
"database.history.producer.sasl.mechanism": "PLAIN",
"database.history.producer.sasl.jaas.config": "org.apache.kafka.common.security.plain.PlainLoginModule required username=\"key\" password=\"secret\";",

"database.history.producer.ssl.keystore.location=/kafka/kafka.server.keystore.jks",
"database.history.producer.ssl.keystore.password=changeit",
"database.history.producer.ssl.truststore.location=/kafka/kafka.truststore.keystore.jks",
"database.history.producer.ssl.truststore.password=changeit",
"database.history.producer.ssl.key.password=changeit",

"database.history.consumer.ssl.keystore.location=/kafka/kafka.server.keystore.jks",
"database.history.consumer.ssl.keystore.password=changeit",
"database.history.consumer.ssl.truststore.location=/kafka/kafka.truststore.keystore.jks",
"database.history.consumer.ssl.truststore.password=changeit",
"database.history.consumer.ssl.key.password=changeit"
database.history.kafka.recovery.poll.interval.ms

Specifies the maximum number of milliseconds the connector should wait during startup or recovery while polling for persisted data.

  • Type: int
  • Default: 100
database.history.kafka.query.timeout.ms

Specifies the maximum number of milliseconds the connector should wait while fetching cluster information using the Kafka administration client.

  • Type: int
  • Default: 3000
database.history.kafka.recovery.attempts

The maximum number of times that the connector should try to read persisted history data before the connector recovery fails with an error.

  • Type: int
  • Default: 4
database.history.skip.unparseable.ddl

Specifies whether the connector should ignore malformed or unknown database statements or stop processing so a human can fix the issue.

  • Type: boolean
  • Default: false
database.history.store.only.captured.tables.ddl

Specifies whether the connector should record all DDL statements. When true records only those DDL statements that are relevant to tables whose changes are being captured by Debezium.

  • Type: boolean
  • Default: false
database.history.kafka.bootstrap.servers

The kafka topic to write the data to.

  • Type: string
  • Importance: high

Auto topic creation

For more information about Auto topic creation, see Configuring Auto Topic Creation for Source Connectors.

Note

Configuration properties accept regular expressions (regex) that are defined as Java regex.

topic.creation.groups

A list of group aliases that are used to define per-group topic configurations for matching topics. A default group always exists and matches all topics.

  • Type: List of String types
  • Default: empty
  • Possible Values: The values of this property refer to any additional groups. A default group is always defined for topic configurations.
topic.creation.$alias.replication.factor

The replication factor for new topics created by the connector. This value must not be larger than the number of brokers in the Kafka cluster. If this value is larger than the number of Kafka brokers, an error occurs when the connector attempts to create a topic. This is a required property for the default group. This property is optional for any other group defined in topic.creation.groups. Other groups use the Kafka broker default value.

  • Type: int
  • Default: n/a
  • Possible Values: >= 1 for a specific valid value or -1 to use the Kafka broker’s default value.
topic.creation.$alias.partitions

The number of topic partitions created by this connector. This is a required property for the default group. This property is optional for any other group defined in topic.creation.groups. Other groups use the Kafka broker default value.

  • Type: int
  • Default: n/a
  • Possible Values: >= 1 for a specific valid value or -1 to use the Kafka broker’s default value.
topic.creation.$alias.include

A list of strings that represent regular expressions that match topic names. This list is used to include topics with matching values, and apply this group’s specific configuration to the matching topics. $alias applies to any group defined in topic.creation.groups. This property does not apply to the default group.

  • Type: List of String types
  • Default: empty
  • Possible Values: Comma-separated list of exact topic names or regular expressions.
topic.creation.$alias.exclude

A list of strings representing regular expressions that match topic names. This list is used to exclude topics with matching values from getting the group’s specfic configuration. $alias applies to any group defined in topic.creation.groups. This property does not apply to the default group. Note that exclusion rules override any inclusion rules for topics.

  • Type: List of String types
  • Default: empty
  • Possible Values: Comma-separated list of exact topic names or regular expressions.
topic.creation.$alias.${kafkaTopicSpecificConfigName}

Any of the Changing Broker Configurations Dynamically for the version of the Kafka broker where the records will be written. The broker’s topic-level configuration value is used if the configuration is not specified for the rule. $alias applies to the default group as well as any group defined in topic.creation.groups.

  • Type: property values
  • Default: Kafka broker value

More details can be found in the Debezium connector properties documentation.

Note

Portions of the information provided here derives from documentation originally produced by the Debezium Community. Work produced by Debezium is licensed under Creative Commons 3.0.