MongoDB Source Connector (Debezium) Configuration Properties¶
The MongoDB Source Connector can be configured using a variety of configuration properties.
Note
These are properties for the self-managed connector. If you are using Confluent Cloud, see Debezium MongoDB Source Connector for Confluent Cloud.
name
- A unique name for the connector. Trying to register again with the same name will fail. This property is required by all Kafka Connect connectors.
connector.class
- The name of the Java class for the connector. You should always use a value of
io.debezium.connector.mongodb.MongoDbConnector
for the MongoDB connector. mongodb.hosts
The comma-separated list of hostname and port pairs (in the form
host
orhost:port
) of the MongoDB servers in the replica set. The list can contain a single hostname and port pair. Ifmongodb.members.auto.discover
is set to false, then the host and port pair should be prefixed with the replica set name (e.g.,rs0/localhost:27017
)- Type: list of strings
- Importance: high
mongodb.name
A unique name that identifies the connector and/or MongoDB replica set or sharded cluster that this connector monitors. Each server should be monitored by at most one Debezium connector, since this server name prefixes all persisted Kafka topics emanating from the MongoDB replica set or cluster.
- Type: int
- Importance: high
mongodb.user
Name of the database user to be used when connecting to MongoDB. This is required only when MongoDB is configured to use authentication.
- Type: string
- Importance: medium
mongodb.password
Password to be used when connecting to MongoDB. This is required only when MongoDB is configured to use authentication.
- Type: password
- Importance: medium
mongodb.authsource
Database (authentication source) containing MongoDB credentials. This is required only when MongoDB is configured to use authentication with another authentication database than admin.
- Default: admin
mongodb.ssl.enabled
Connector uses SSL to connect to MongoDB instances.
- Type: string
- Importance: low
- Default:
false
mongodb.ssl.invalid.hostname.allowed
When SSL is enabled this setting controls whether strict hostname checking is disabled during connection phase. If true the connection does not prevent man-in-the-middle attacks.
- Type: string
- Importance: low
- Default: false
database.include.list
An optional comma-separated list of regular expressions that match database names to be monitored; any database name not included in
database.include.list
is excluded from monitoring. By default all databases are monitored. Must not be used withdatabase.exclude.list
.- Type: list of strings
- Importance: medium
- Default: empty string
database.exclude.list
An optional comma-separated list of regular expressions that match database names to be excluded from monitoring; any database name not included in
database.exclude.list
is monitored. Must not be used withdatabase.include.list
.- Type: list of strings
- Importance: medium
- Default: empty string
collection.include.list
An optional comma-separated list of regular expressions that match fully-qualified namespaces for MongoDB collections to be monitored; any collection not included in collection.include.list is excluded from monitoring. Each identifier is of the form
databaseName.collectionName
. By default the connector will monitor all collections except those in the local and admin databases. Must not be used withcollection.exclude.list
.- Type: list of strings
- Importance: medium
- Default: empty string
collection.exclude.list
An optional comma-separated list of regular expressions that match fully-qualified namespaces for MongoDB collections to be excluded from monitoring; any collection not included in
collection.exclude.list
is monitored. Each identifier is of the formdatabaseName.collectionName
. Must not be used withcollection.include.list
.- Type: list of strings
- Importance: medium
- Default: empty string
snapshot.mode
Specifies the criteria for running a snapshot (for example, the initial sync) upon startup of the connector. The default is
initial
, and specifies the connector reads a snapshot when either no offset is found or if the oplog no longer contains the previous offset. Thenever
option specifies that the connector should never use snapshots, instead the connector should tail the log.- Type: string
- Importance: low
- Default:
intial
snapshot.include.collection.list
An optional, comma-separated list of regular expressions that match names of schemas specified in
collection.include.list
for which you want to take the snapshot.- Default: All collections specified in
collection.include.list
- Default: All collections specified in
field.renames
An optional comma-separated list of the fully-qualified replacements of fields that should be used to rename fields in change event message values. Fully-qualified replacements for fields are in the form
databaseName.collectionName.fieldName.nestedFieldName:newNestedFieldName
, wheredatabaseName
andcollectionName
may contain the wildcard (*) which matches any characters. The colon character (:) is used to determine rename mapping of field. Note that the next field replacement is applied to the result of the previous field replacement in the list. Keep this in mind when renaming multiple fields that are in the same path.- Type: string
- Importance: high
- Default: empty string
tasks.max
The maximum number of tasks that should be created for this connector. The MongoDB connector attempts to use a separate task for each replica set, so the default is acceptable when using the connector with a single MongoDB replica set. When using the connector with a MongoDB sharded cluster, Confluent recommends specifying a value that is equal to or more than the number of shards in the cluster, so that the work for each replica set can be distributed by Kafka Connect.
- Type: int
- Importance: medium
- Default: 1
snapshot.max.threads
Positive integer value that specifies the maximum number of threads used to perform an intial sync of the collections in a replica set.
- Type: int
- Importance: medium
- Default: 1
tombstones.on.delete
Controls whether a tombstone event should be generated after a delete event. When set to true, the delete operations are represented by a delete event and a subsequent tombstone event. When set to false, only a delete event is sent. Emitting the tombstone event (the default behavior) allows Kafka to completely delete all events pertaining to the given key once the source record is deleted.
- Type: string
- Importance: low
- Default:
true
snapshot.delay.ms
An interval in milliseconds that the connector should wait before taking a snapshot after starting up. Can be used to avoid snapshot interruptions (and potential rebalancing) when starting multiple connectors in a cluster.
- Type: list of strings
- Importance: low
- Default: empty string
snapshot.fetch.size
Specifies the maximum number of documents that should be read in one go from each collection while taking a snapshot. The connector will read the collection contents in several batches of this size. Defaults to 0, which indicates that the server chooses an appropriate fetch size.
- Default: 0
Auto topic creation¶
For more information about Auto topic creation, see Configuring Auto Topic Creation for Source Connectors.
Note
Configuration properties accept regular expressions (regex) that are defined as Java regex.
topic.creation.groups
A list of group aliases that are used to define per-group topic configurations for matching topics. A
default
group always exists and matches all topics.- Type: List of String types
- Default: empty
- Possible Values: The values of this property refer to any additional groups. A
default
group is always defined for topic configurations.
topic.creation.$alias.replication.factor
The replication factor for new topics created by the connector. This value must not be larger than the number of brokers in the Kafka cluster. If this value is larger than the number of Kafka brokers, an error occurs when the connector attempts to create a topic. This is a required property for the
default
group. This property is optional for any other group defined intopic.creation.groups
. Other groups use the Kafka broker default value.- Type: int
- Default: n/a
- Possible Values:
>= 1
for a specific valid value or-1
to use the Kafka broker’s default value.
topic.creation.$alias.partitions
The number of topic partitions created by this connector. This is a required property for the
default
group. This property is optional for any other group defined intopic.creation.groups
. Other groups use the Kafka broker default value.- Type: int
- Default: n/a
- Possible Values:
>= 1
for a specific valid value or-1
to use the Kafka broker’s default value.
topic.creation.$alias.include
A list of strings that represent regular expressions that match topic names. This list is used to include topics with matching values, and apply this group’s specific configuration to the matching topics.
$alias
applies to any group defined intopic.creation.groups
. This property does not apply to thedefault
group.- Type: List of String types
- Default: empty
- Possible Values: Comma-separated list of exact topic names or regular expressions.
topic.creation.$alias.exclude
A list of strings representing regular expressions that match topic names. This list is used to exclude topics with matching values from getting the group’s specfic configuration.
$alias
applies to any group defined intopic.creation.groups
. This property does not apply to thedefault
group. Note that exclusion rules override any inclusion rules for topics.- Type: List of String types
- Default: empty
- Possible Values: Comma-separated list of exact topic names or regular expressions.
topic.creation.$alias.${kafkaTopicSpecificConfigName}
Any of the Changing Broker Configurations Dynamically for the version of the Kafka broker where the records will be written. The broker’s topic-level configuration value is used if the configuration is not specified for the rule.
$alias
applies to thedefault
group as well as any group defined intopic.creation.groups
.- Type: property values
- Default: Kafka broker value
Advanced Configuration Properties¶
max.queue.size
Positive integer value that specifies the maximum size of the blocking queue into which change events read from the database log are placed before they are written to Kafka. This queue can provide backpressure to the oplog reader when, for example, writes to Kafka are slower or if Kafka is not available. Events that appear in the queue are not included in the offsets periodically recorded by this connector. Defaults to
8192
, and should always be larger than the maximum batch size specified in themax.batch.size property
.- Type: int
- Importance: low
- Default: 8192
max.batch.size
Positive integer value that specifies the maximum size of each batch of events that should be processed during each iteration of this connector. Defaults to
2048
.- Type: int
- Importance: low
- Default: 2048
max.queue.size.in.bytes
Long value for the maximum size in bytes of the blocking queue. The feature is disabled by default, it will be active if it’s set with a positive long value.
- Type: int
- Importance: low
- Default: 0
poll.interval.ms
Positive integer value that specifies the number of milliseconds the connector should wait during each iteration for new change events to appear. Defaults to
500
milliseconds (.5 seconds).- Type: int
- Importance: low
- Default: 500
connect.backoff.initial.delay.ms
Positive integer value that specifies the initial delay when trying to reconnect to a primary after the first failed connection attempt or when no primary is available. Defaults to
1000
milliseconds (1 second).- Type: int
- Importance: low
- Default: 1000
connect.backoff.max.delay.ms
Positive integer value that specifies the maximum delay when trying to reconnect to a primary after repeated failed connection attempts or when no primary is available. Defaults to
120000
milliseconds (120 seconds).- Type: int
- Importance: low
- Default: 120000
connect.max.attempts
Positive integer value that specifies the maximum number of failed connection attempts to a replica set primary before an exception occurs and task is aborted. Defaults to
16
which results in a little over 20 minutes of attempts before failing, if using the defaults forconnect.backoff.initial.delay.ms
andconnect.backoff.max.delay.ms
.- Type: int
- Importance: low
- Default: 16
mongodb.members.auto.discover
Boolean value that specifies whether the addresses in
mongodb.hosts
are seeds that should be used to discover all members of the cluster or replica set (true
), or whether the address(es) inmongodb.hosts
should be used as is (false
). The default istrue
and should be used in all cases except where MongoDB is fronted by a proxy.- Type: string
- Importance: low
- Default:
true
source.struct.version
Schema version for the source block in CDC events. Debezium 0.10 introduced a few breaking changes to the structure of the source block in order to unify the exposed structure across all the connectors. By setting this option to v1 the structure used in earlier versions can be produced. Note that this setting is not recommended and is planned for removal in a future Debezium version.
- Type: string
- Importance: low
- Default: v2
heartbeat.interval.ms
Controls how frequently heartbeat messages are sent. This property contains an interval in milliseconds that defines how frequently the connector sends messages into a heartbeat topic. This can be used to monitor whether the connector is still receiving change events from the database. You also should leverage heartbeat messages in cases where only records in non-captured collections are changed for a longer period of time. In such situation the connector would proceed to read the oplog from the database but never emit any change messages into Kafka, which in turn means that no offset updates are committed to Kafka. This will cause the oplog files to be rotated out but connector will not notice it so on restart some events are no longer available which leads to the need of re-execution of the initial snapshot.
Set this parameter to
0
to not send heartbeat messages at all. Disabled by default.- Default: 0
heartbeat.topics.prefix
Controls the naming of the topic to which heartbeat messages are sent. The topic is named according to the pattern
<heartbeat.topics.prefix>.<server.name>
.- Default:
__debezium-heartbeat
- Default:
sanitize.field.names
Whether field names are sanitized to adhere to Avro naming requirements. See Avro naming for more details.
- Default:
true
when connector configuration explicitly specifies thekey.converter
orvalue.converter
parameters to use Avro, otherwise defaults tofalse
.
- Default:
skipped.operations
- Comma-separated list of operation types that will be skipped during streaming.
The operations include:
c
for inserts/create,u
for updates, andd
for deletes. By default, no operations are skipped. snapshot.collection.filter.overrides
Controls which collection items are included in snapshot. This property affects snapshots only. Specify a comma-separated list of collection names in the form
databaseName.collectionName
.For each collection that you specify, also specify another configuration property:
snapshot.collection.filter.overrides.databaseName.collectionName
. For example, the name of the other configuration property might be:snapshot.collection.filter.overrides.customers.orders
. Set this property to a valid filter expression that retrieves only the items that you want in the snapshot. When the connector performs a snapshot, it retrieves only the items that matches the filter expression.provide.transaction.metadata
When set to true Debezium generates events with transaction boundaries and enriches data events envelope with transaction metadata. For more information, see Transaction Metadata.
- Default:
false
- Default:
transaction.topic
Controls the name of the topic to which the connector sends transaction metadata messages. The placeholder
${database.server.name}
can be used for referring to the connector’s logical name (see Logical connector name).- Default:
${database.server.name}.transaction
(for example,dbserver1.transaction
)
- Default:
retriable.restart.connector.wait.ms
The number of milliseconds to wait before restarting a connector after a retriable error occurs.
- Default: 10000 ms (10 seconds)
mongodb.poll.interval.ms
The interval in which the connector polls for new, removed, or changed replica sets.
- Default: 30000 ms (30 seconds)
mongodb.connect.timeout.ms
The number of milliseconds before a send/receive on the socket can take before a timeout occurs. A value of 0 disables this behavior.
- Default: 10000 ms (10 seconds)
mongodb.socket.timeout.ms
The number of milliseconds before a send/receive on the socket can take before a timeout occurs. A value of 0 disables this behavior.
- Default: 0
mongodb.server.selection.timeout.ms
The number of milliseconds the driver will wait to select a server before it times out and throws an error.
- Type: string
- Importance: low
- Default: 30000 ms (30 seconds)
cursor.max.await.time.ms
Specifies the maximum number of milliseconds the oplog cursor will wait for the server to produce a result before causing an execution timeout exception. A value of 0 indicates using the server/driver default wait timeout.
- Default: 0
More details can be found in the Debezium connector properties documentation.
Note
Portions of the information provided here derives from documentation originally produced by the Debezium Community. Work produced by Debezium is licensed under Creative Commons 3.0.