MongoDB Source Connector (Debezium) Configuration Properties

The MongoDB Source Connector can be configured using a variety of configuration properties.

Note

These are properties for the self-managed connector. If you are using Confluent Cloud, see Debezium MongoDB Source Connector for Confluent Cloud.

name
A unique name for the connector. Trying to register again with the same name will fail. This property is required by all Kafka Connect connectors.
connector.class
The name of the Java class for the connector. You should always use a value of io.debezium.connector.mongodb.MongoDbConnector for the MongoDB connector.
mongodb.hosts

The comma-separated list of hostname and port pairs (in the form host or host:port) of the MongoDB servers in the replica set. The list can contain a single hostname and port pair. If mongodb.members.auto.discover is set to false, then the host and port pair should be prefixed with the replica set name (e.g., rs0/localhost:27017)

  • Type: list of strings
  • Importance: high
mongodb.name

A unique name that identifies the connector and/or MongoDB replica set or sharded cluster that this connector monitors. Each server should be monitored by at most one Debezium connector, since this server name prefixes all persisted Kafka topics emanating from the MongoDB replica set or cluster.

  • Type: int
  • Importance: high
mongodb.user

Name of the database user to be used when connecting to MongoDB. This is required only when MongoDB is configured to use authentication.

  • Type: string
  • Importance: medium
mongodb.password

Password to be used when connecting to MongoDB. This is required only when MongoDB is configured to use authentication.

  • Type: password
  • Importance: medium
mongodb.authsource

Database (authentication source) containing MongoDB credentials. This is required only when MongoDB is configured to use authentication with another authentication database than admin.

  • Default: admin
mongodb.ssl.enabled

Connector uses SSL to connect to MongoDB instances.

  • Type: string
  • Importance: low
  • Default: false
mongodb.ssl.invalid.hostname.allowed

When SSL is enabled this setting controls whether strict hostname checking is disabled during connection phase. If true the connection does not prevent man-in-the-middle attacks.

  • Type: string
  • Importance: low
  • Default: false
database.include.list

An optional comma-separated list of regular expressions that match database names to be monitored; any database name not included in database.include.list is excluded from monitoring. By default all databases are monitored. Must not be used with database.exclude.list.

  • Type: list of strings
  • Importance: medium
  • Default: empty string
database.exclude.list

An optional comma-separated list of regular expressions that match database names to be excluded from monitoring; any database name not included in database.exclude.list is monitored. Must not be used with database.include.list.

  • Type: list of strings
  • Importance: medium
  • Default: empty string
collection.include.list

An optional comma-separated list of regular expressions that match fully-qualified namespaces for MongoDB collections to be monitored; any collection not included in collection.include.list is excluded from monitoring. Each identifier is of the form databaseName.collectionName. By default the connector will monitor all collections except those in the local and admin databases. Must not be used with collection.exclude.list.

  • Type: list of strings
  • Importance: medium
  • Default: empty string
collection.exclude.list

An optional comma-separated list of regular expressions that match fully-qualified namespaces for MongoDB collections to be excluded from monitoring; any collection not included in collection.exclude.list is monitored. Each identifier is of the form databaseName.collectionName. Must not be used with collection.include.list.

  • Type: list of strings
  • Importance: medium
  • Default: empty string
snapshot.mode

Specifies the criteria for running a snapshot (for example, the initial sync) upon startup of the connector. The default is initial, and specifies the connector reads a snapshot when either no offset is found or if the oplog no longer contains the previous offset. The never option specifies that the connector should never use snapshots, instead the connector should tail the log.

  • Type: string
  • Importance: low
  • Default: intial
snapshot.include.collection.list

An optional, comma-separated list of regular expressions that match names of schemas specified in collection.include.list for which you want to take the snapshot.

  • Default: All collections specified in collection.include.list
field.renames

An optional comma-separated list of the fully-qualified replacements of fields that should be used to rename fields in change event message values. Fully-qualified replacements for fields are in the form databaseName.collectionName.fieldName.nestedFieldName:newNestedFieldName, where databaseName and collectionName may contain the wildcard (*) which matches any characters. The colon character (:) is used to determine rename mapping of field. Note that the next field replacement is applied to the result of the previous field replacement in the list. Keep this in mind when renaming multiple fields that are in the same path.

  • Type: string
  • Importance: high
  • Default: empty string
tasks.max

The maximum number of tasks that should be created for this connector. The MongoDB connector attempts to use a separate task for each replica set, so the default is acceptable when using the connector with a single MongoDB replica set. When using the connector with a MongoDB sharded cluster, Confluent recommends specifying a value that is equal to or more than the number of shards in the cluster, so that the work for each replica set can be distributed by Kafka Connect.

  • Type: int
  • Importance: medium
  • Default: 1
snapshot.max.threads

Positive integer value that specifies the maximum number of threads used to perform an intial sync of the collections in a replica set.

  • Type: int
  • Importance: medium
  • Default: 1
tombstones.on.delete

Controls whether a tombstone event should be generated after a delete event. When set to true, the delete operations are represented by a delete event and a subsequent tombstone event. When set to false, only a delete event is sent. Emitting the tombstone event (the default behavior) allows Kafka to completely delete all events pertaining to the given key once the source record is deleted.

  • Type: string
  • Importance: low
  • Default: true
snapshot.delay.ms

An interval in milliseconds that the connector should wait before taking a snapshot after starting up. Can be used to avoid snapshot interruptions (and potential rebalancing) when starting multiple connectors in a cluster.

  • Type: list of strings
  • Importance: low
  • Default: empty string
snapshot.fetch.size

Specifies the maximum number of documents that should be read in one go from each collection while taking a snapshot. The connector will read the collection contents in several batches of this size. Defaults to 0, which indicates that the server chooses an appropriate fetch size.

  • Default: 0

Auto topic creation

For more information about Auto topic creation, see Configuring Auto Topic Creation for Source Connectors.

Note

Configuration properties accept regular expressions (regex) that are defined as Java regex.

topic.creation.groups

A list of group aliases that are used to define per-group topic configurations for matching topics. A default group always exists and matches all topics.

  • Type: List of String types
  • Default: empty
  • Possible Values: The values of this property refer to any additional groups. A default group is always defined for topic configurations.
topic.creation.$alias.replication.factor

The replication factor for new topics created by the connector. This value must not be larger than the number of brokers in the Kafka cluster. If this value is larger than the number of Kafka brokers, an error occurs when the connector attempts to create a topic. This is a required property for the default group. This property is optional for any other group defined in topic.creation.groups. Other groups use the Kafka broker default value.

  • Type: int
  • Default: n/a
  • Possible Values: >= 1 for a specific valid value or -1 to use the Kafka broker’s default value.
topic.creation.$alias.partitions

The number of topic partitions created by this connector. This is a required property for the default group. This property is optional for any other group defined in topic.creation.groups. Other groups use the Kafka broker default value.

  • Type: int
  • Default: n/a
  • Possible Values: >= 1 for a specific valid value or -1 to use the Kafka broker’s default value.
topic.creation.$alias.include

A list of strings that represent regular expressions that match topic names. This list is used to include topics with matching values, and apply this group’s specific configuration to the matching topics. $alias applies to any group defined in topic.creation.groups. This property does not apply to the default group.

  • Type: List of String types
  • Default: empty
  • Possible Values: Comma-separated list of exact topic names or regular expressions.
topic.creation.$alias.exclude

A list of strings representing regular expressions that match topic names. This list is used to exclude topics with matching values from getting the group’s specfic configuration. $alias applies to any group defined in topic.creation.groups. This property does not apply to the default group. Note that exclusion rules override any inclusion rules for topics.

  • Type: List of String types
  • Default: empty
  • Possible Values: Comma-separated list of exact topic names or regular expressions.
topic.creation.$alias.${kafkaTopicSpecificConfigName}

Any of the Changing Broker Configurations Dynamically for the version of the Kafka broker where the records will be written. The broker’s topic-level configuration value is used if the configuration is not specified for the rule. $alias applies to the default group as well as any group defined in topic.creation.groups.

  • Type: property values
  • Default: Kafka broker value

Advanced Configuration Properties

max.queue.size

Positive integer value that specifies the maximum size of the blocking queue into which change events read from the database log are placed before they are written to Kafka. This queue can provide backpressure to the oplog reader when, for example, writes to Kafka are slower or if Kafka is not available. Events that appear in the queue are not included in the offsets periodically recorded by this connector. Defaults to 8192, and should always be larger than the maximum batch size specified in the max.batch.size property.

  • Type: int
  • Importance: low
  • Default: 8192
max.batch.size

Positive integer value that specifies the maximum size of each batch of events that should be processed during each iteration of this connector. Defaults to 2048.

  • Type: int
  • Importance: low
  • Default: 2048
max.queue.size.in.bytes

Long value for the maximum size in bytes of the blocking queue. The feature is disabled by default, it will be active if it’s set with a positive long value.

  • Type: int
  • Importance: low
  • Default: 0
poll.interval.ms

Positive integer value that specifies the number of milliseconds the connector should wait during each iteration for new change events to appear. Defaults to 500 milliseconds (.5 seconds).

  • Type: int
  • Importance: low
  • Default: 500
connect.backoff.initial.delay.ms

Positive integer value that specifies the initial delay when trying to reconnect to a primary after the first failed connection attempt or when no primary is available. Defaults to 1000 milliseconds (1 second).

  • Type: int
  • Importance: low
  • Default: 1000
connect.backoff.max.delay.ms

Positive integer value that specifies the maximum delay when trying to reconnect to a primary after repeated failed connection attempts or when no primary is available. Defaults to 120000 milliseconds (120 seconds).

  • Type: int
  • Importance: low
  • Default: 120000
connect.max.attempts

Positive integer value that specifies the maximum number of failed connection attempts to a replica set primary before an exception occurs and task is aborted. Defaults to 16 which results in a little over 20 minutes of attempts before failing, if using the defaults for connect.backoff.initial.delay.ms and connect.backoff.max.delay.ms.

  • Type: int
  • Importance: low
  • Default: 16
mongodb.members.auto.discover

Boolean value that specifies whether the addresses in mongodb.hosts are seeds that should be used to discover all members of the cluster or replica set (true), or whether the address(es) in mongodb.hosts should be used as is (false). The default is true and should be used in all cases except where MongoDB is fronted by a proxy.

  • Type: string
  • Importance: low
  • Default: true
source.struct.version

Schema version for the source block in CDC events. Debezium 0.10 introduced a few breaking changes to the structure of the source block in order to unify the exposed structure across all the connectors. By setting this option to v1 the structure used in earlier versions can be produced. Note that this setting is not recommended and is planned for removal in a future Debezium version.

  • Type: string
  • Importance: low
  • Default: v2
heartbeat.interval.ms

Controls how frequently heartbeat messages are sent. This property contains an interval in milliseconds that defines how frequently the connector sends messages into a heartbeat topic. This can be used to monitor whether the connector is still receiving change events from the database. You also should leverage heartbeat messages in cases where only records in non-captured collections are changed for a longer period of time. In such situation the connector would proceed to read the oplog from the database but never emit any change messages into Kafka, which in turn means that no offset updates are committed to Kafka. This will cause the oplog files to be rotated out but connector will not notice it so on restart some events are no longer available which leads to the need of re-execution of the initial snapshot.

Set this parameter to 0 to not send heartbeat messages at all. Disabled by default.

  • Default: 0
heartbeat.topics.prefix

Controls the naming of the topic to which heartbeat messages are sent. The topic is named according to the pattern <heartbeat.topics.prefix>.<server.name>.

  • Default: __debezium-heartbeat
sanitize.field.names

Whether field names are sanitized to adhere to Avro naming requirements. See Avro naming for more details.

  • Default: true when connector configuration explicitly specifies the key.converter or value.converter parameters to use Avro, otherwise defaults to false.
skipped.operations
Comma-separated list of operation types that will be skipped during streaming. The operations include: c for inserts/create, u for updates, and d for deletes. By default, no operations are skipped.
snapshot.collection.filter.overrides

Controls which collection items are included in snapshot. This property affects snapshots only. Specify a comma-separated list of collection names in the form databaseName.collectionName.

For each collection that you specify, also specify another configuration property: snapshot.collection.filter.overrides.databaseName.collectionName. For example, the name of the other configuration property might be: snapshot.collection.filter.overrides.customers.orders. Set this property to a valid filter expression that retrieves only the items that you want in the snapshot. When the connector performs a snapshot, it retrieves only the items that matches the filter expression.

provide.transaction.metadata

When set to true Debezium generates events with transaction boundaries and enriches data events envelope with transaction metadata. For more information, see Transaction Metadata.

  • Default: false
transaction.topic

Controls the name of the topic to which the connector sends transaction metadata messages. The placeholder ${database.server.name} can be used for referring to the connector’s logical name (see Logical connector name).

  • Default: ${database.server.name}.transaction (for example, dbserver1.transaction)
retriable.restart.connector.wait.ms

The number of milliseconds to wait before restarting a connector after a retriable error occurs.

  • Default: 10000 ms (10 seconds)
mongodb.poll.interval.ms

The interval in which the connector polls for new, removed, or changed replica sets.

  • Default: 30000 ms (30 seconds)
mongodb.connect.timeout.ms

The number of milliseconds before a send/receive on the socket can take before a timeout occurs. A value of 0 disables this behavior.

  • Default: 10000 ms (10 seconds)
mongodb.socket.timeout.ms

The number of milliseconds before a send/receive on the socket can take before a timeout occurs. A value of 0 disables this behavior.

  • Default: 0
mongodb.server.selection.timeout.ms

The number of milliseconds the driver will wait to select a server before it times out and throws an error.

  • Type: string
  • Importance: low
  • Default: 30000 ms (30 seconds)
cursor.max.await.time.ms

Specifies the maximum number of milliseconds the oplog cursor will wait for the server to produce a result before causing an execution timeout exception. A value of 0 indicates using the server/driver default wait timeout.

  • Default: 0

More details can be found in the Debezium connector properties documentation.

Note

Portions of the information provided here derives from documentation originally produced by the Debezium Community. Work produced by Debezium is licensed under Creative Commons 3.0.