Configuration Reference for Debezium MongoDB Source Connector for Confluent Platform¶
The MongoDB Source Connector can be configured using a variety of configuration properties.
Note
These are properties for the self-managed connector. If you are using Confluent Cloud, see Debezium MongoDB Source Connector for Confluent Cloud.
name
A unique name for the connector. Trying to register again with the same name will fail. This property is required by all Kafka Connect connectors.
- Type: string
- Default: No default
connector.class
The name of the Java class for the connector. You should always use a value of
io.debezium.connector.mongodb.MongoDbConnector
for the MongoDB connector.- Type: string
- Default: No default
mongodb.connection.string
This property replaces
mongodb.hosts
and specifies a connection string that the connector uses during the initial discovery of a MongoDB replica set. To use this option, you must set the value ofmongodb.members.auto.discover
totrue
. Note that the connector uses this connection string only during the initial replica set discovery process. For more details, see the Debezium documentation.- Type: string
- Default: No default
mongodb.connection.string.shard.params
Specifies the URL parameters of the MongoDB connection string, including read preferences, that the connector uses to connect to individual shards of a MongoDB sharded cluster.
- Type: string
- Default: No default
mongodb.connection.mode
Specifies the strategy that the connector uses when it connects to a sharded MongoDB cluster. You can set the property to a value of either
replica_set
orsharded
. If set toreplica_set
, the connector establishes individual connections to the replica set for each shard. When set to a value ofsharded
The connector establishes a single connection to the database, based on the value of themongodb.connection.string
. For more details, see the Debezium documentation.- Type: string
- Default:
replica_set
topic.prefix
A unique name that identifies the connector and/or MongoDB replica set or sharded cluster that this connector monitors. Each server should be monitored by at most one Debezium connector, since this server name prefixes all persisted Kafka topics emanating from the MongoDB replica set or cluster. For more details, see the Debezium documentation.
- Type: string
- Default: No default
mongodb.authentication.class
A full Java class name that is an implementation of the
io.debezium.connector.mongodb.connection.MongoDbAuthProvider
interface. This class handles setting the credentials on the MongoDB connection (called on each app boot). The default behavior uses themongodb.user
,mongodb.password
, andmongodb.authsource
properties according to each of their documentation, but other implementations may use them differently or ignore them altogether. Note that any setting inmongodb.connection.string
will override settings set by this class.- Type: string
- Default:
DefaultMongoDbAuthProvider
mongodb.user
Name of the database user to be used when connecting to MongoDB. This is required only when MongoDB is configured to use authentication.
- Type: string
- Importance: medium
mongodb.password
Password to be used when connecting to MongoDB. This is required only when MongoDB is configured to use authentication.
- Type: password
- Importance: medium
mongodb.authsource
Database (authentication source) containing MongoDB credentials. This is required only when MongoDB is configured to use authentication with another authentication database than admin.
- Default: admin
mongodb.ssl.enabled
Connector uses SSL to connect to MongoDB instances.
- Type: string
- Importance: low
- Default:
false
mongodb.ssl.invalid.hostname.allowed
When SSL is enabled this setting controls whether strict hostname checking is disabled during connection phase. If true the connection does not prevent man-in-the-middle attacks.
- Type: string
- Importance: low
- Default: false
filters.match.mode
The mode used to match events based on included/excluded database and collection names. You can set the property to one of the following values:
regex
: Database and collection includes/excludes are evaluated as comma-separated list of regular expressions.literal
: Database and collection includes/excludes are evaluated as comma-separated list of string literals. Whitespace characters surrounding these literals are stripped.
- Type: string
- Default:
regex
database.include.list
An optional comma-separated list of regular expressions that match database names to be monitored; any database name not included in
database.include.list
is excluded from monitoring. By default all databases are monitored. Must not be used withdatabase.exclude.list
.- Type: list of strings
- Importance: medium
- Default: empty string
database.exclude.list
An optional comma-separated list of regular expressions that match database names to be excluded from monitoring; any database name not included in
database.exclude.list
is monitored. Must not be used withdatabase.include.list
.- Type: list of strings
- Importance: medium
- Default: empty string
collection.include.list
An optional comma-separated list of regular expressions that match fully-qualified namespaces for MongoDB collections to be monitored; any collection not included in collection.include.list is excluded from monitoring. Each identifier is of the form
databaseName.collectionName
. By default the connector will monitor all collections except those in the local and admin databases. Must not be used withcollection.exclude.list
.- Type: list of strings
- Importance: medium
- Default: empty string
collection.exclude.list
An optional comma-separated list of regular expressions that match fully-qualified namespaces for MongoDB collections to be excluded from monitoring; any collection not included in
collection.exclude.list
is monitored. Each identifier is of the formdatabaseName.collectionName
. Must not be used withcollection.include.list
.- Type: list of strings
- Importance: medium
- Default: empty string
capture.mode
Specifies the method the connector uses to capture changes from a MongoDB server. The default,
change_streams_update_full
, specifies that the connector captures changes through MongoDB Change Streams mechanism and that update events should contain the full document. Thechange_streams
mode uses the same capturing method but update events don’t contain the full document. For a full list of valid values and their descriptions, see the Debezium documentation.- Type: string
- Default:
change_streams_update_full
capture.scope
Specifies the scope of the change streams that the connector opens. For a full list of valid values and their descriptions, see the Debezium documentation.
- Type: string
- Default:
deployment
capture.target
Specifies the database that the connector monitors for changes. This property applies only if the
capture.scope
is set todatabase
.- Type: string
- Default:
deployment
field.exclude.list
An optional comma-separated list of the fully-qualified names of fields that should be excluded from change event message values. Fully-qualified names for fields have the following form:
databaseName.collectionName.fieldName.nestedFieldName
wheredatabaseName
andcollectionName
may contain the wildcard*
, which matches any characters.- Type: string
- Default: empty string
field.renames
An optional comma-separated list of the fully-qualified replacements of fields that should be used to rename fields in change event message values. Fully-qualified replacements for fields are in the form
databaseName.collectionName.fieldName.nestedFieldName:newNestedFieldName
, wheredatabaseName
andcollectionName
may contain the wildcard (*) which matches any characters. The colon character (:) is used to determine rename mapping of field. Note that the next field replacement is applied to the result of the previous field replacement in the list. Keep this in mind when renaming multiple fields that are in the same path.- Type: string
- Importance: high
- Default: empty string
tasks.max
The maximum number of tasks that should be created for this connector. The MongoDB connector attempts to use a separate task for each replica set, so the default is acceptable when using the connector with a single MongoDB replica set. When using the connector with a MongoDB sharded cluster, Confluent recommends specifying a value that is equal to or more than the number of shards in the cluster, so that the work for each replica set can be distributed by Kafka Connect.
- Type: int
- Importance: medium
- Default: 1
tombstones.on.delete
Controls whether a tombstone event should be generated after a delete event. When set to true, the delete operations are represented by a delete event and a subsequent tombstone event. When set to false, only a delete event is sent. Emitting the tombstone event (the default behavior) allows Kafka to completely delete all events pertaining to the given key once the source record is deleted.
- Type: string
- Importance: low
- Default:
true
schema.name.adjustment.mode
Specifies how schema names should be adjusted for compatibility with the message converter used by the connector. Possible settings are:
avro
, which replaces the characters that cannot be used in the Avro type name with an underscore, andnone
which does not apply any adjustment.- Type: string
- Default:
none
field.name.adjustment.mode
Specifies how schema names should be adjusted for compatibility with the message converter used by the connector. The following are possible settings:
avro
: Replaces the characters that cannot be used in the Avro type name with an underscorenone
: Does not apply any adjustmentavro_unicode
: Replaces the underscore or characters that cannot be used in the Avro type name with corresponding unicode like_uxxxx
. Note that_
is an escape sequence like backslash in Java.
For more details, see Avro naming.
- Type: string
- Default:
none
Auto topic creation¶
For more information about Auto topic creation, see Configuring Auto Topic Creation for Source Connectors.
Configuration properties accept regular expressions (regex) that are defined as Java regex.
topic.creation.groups
A list of group aliases that are used to define per-group topic configurations for matching topics. A
default
group always exists and matches all topics.- Type: List of String types
- Default: empty
- Possible Values: The values of this property refer to any additional groups. A
default
group is always defined for topic configurations.
topic.creation.$alias.replication.factor
The replication factor for new topics created by the connector. This value must not be larger than the number of brokers in the Kafka cluster. If this value is larger than the number of Kafka brokers, an error occurs when the connector attempts to create a topic. This is a required property for the
default
group. This property is optional for any other group defined intopic.creation.groups
. Other groups use the Kafka broker default value.- Type: int
- Default: n/a
- Possible Values:
>= 1
for a specific valid value or-1
to use the Kafka broker’s default value.
topic.creation.$alias.partitions
The number of topic partitions created by this connector. This is a required property for the
default
group. This property is optional for any other group defined intopic.creation.groups
. Other groups use the Kafka broker default value.- Type: int
- Default: n/a
- Possible Values:
>= 1
for a specific valid value or-1
to use the Kafka broker’s default value.
topic.creation.$alias.include
A list of strings that represent regular expressions that match topic names. This list is used to include topics with matching values, and apply this group’s specific configuration to the matching topics.
$alias
applies to any group defined intopic.creation.groups
. This property does not apply to thedefault
group.- Type: List of String types
- Default: empty
- Possible Values: Comma-separated list of exact topic names or regular expressions.
topic.creation.$alias.exclude
A list of strings representing regular expressions that match topic names. This list is used to exclude topics with matching values from getting the group’s specfic configuration.
$alias
applies to any group defined intopic.creation.groups
. This property does not apply to thedefault
group. Note that exclusion rules override any inclusion rules for topics.- Type: List of String types
- Default: empty
- Possible Values: Comma-separated list of exact topic names or regular expressions.
topic.creation.$alias.${kafkaTopicSpecificConfigName}
Any of the Changing Broker Configurations Dynamically for the version of the Kafka broker where the records will be written. The broker’s topic-level configuration value is used if the configuration is not specified for the rule.
$alias
applies to thedefault
group as well as any group defined intopic.creation.groups
.- Type: property values
- Default: Kafka broker value
Advanced Configuration Properties¶
max.queue.size
Positive integer value that specifies the maximum size of the blocking queue into which change events read from the database log are placed before they are written to Kafka. This queue can provide backpressure to the oplog reader when, for example, writes to Kafka are slower or if Kafka is not available. Events that appear in the queue are not included in the offsets periodically recorded by this connector. Defaults to
8192
, and should always be larger than the maximum batch size specified in themax.batch.size property
.- Type: int
- Importance: low
- Default: 8192
max.batch.size
Positive integer value that specifies the maximum size of each batch of events that should be processed during each iteration of this connector. Defaults to
2048
.- Type: int
- Importance: low
- Default: 2048
max.queue.size.in.bytes
A long value that specifies the maximum volume of the blocking queue in bytes. By default, volume limits are not specified for the blocking queue. To specify the number of bytes that the queue can consume, you must set this property to a positive long value. If
max.queue.size
is also set, writing to the queue is blocked when the size of the queue reaches the limit specified by either property. For example, if you setmax.queue.size
to 1000, and setmax.queue.size.in.bytes
to 5000, writing to the queue is blocked after the queue contains 1000 records, or after the volume of the records in the queue reaches 5000 bytes.- Type: long
- Default: 0
poll.interval.ms
Positive integer value that specifies the number of milliseconds the connector should wait during each iteration for new change events to appear. Defaults to
500
milliseconds (.5 seconds).- Type: int
- Importance: low
- Default: 1000
connect.backoff.initial.delay.ms
Positive integer value that specifies the initial delay when trying to reconnect to a primary after the first failed connection attempt or when no primary is available. Defaults to
1000
milliseconds (1 second).- Type: int
- Importance: low
- Default: 1000
connect.backoff.max.delay.ms
Positive integer value that specifies the maximum delay when trying to reconnect to a primary after repeated failed connection attempts or when no primary is available. Defaults to
120000
milliseconds (120 seconds).- Type: int
- Importance: low
- Default: 1000
connect.max.attempts
Positive integer value that specifies the maximum number of failed connection attempts to a replica set primary before an exception occurs and task is aborted. Defaults to
16
which results in a little over 20 minutes of attempts before failing, if using the defaults forconnect.backoff.initial.delay.ms
andconnect.backoff.max.delay.ms
.- Type: int
- Importance: low
- Default: 16
source.struct.version
Schema version for the source block in CDC events. Debezium 0.10 introduced a few breaking changes to the structure of the source block in order to unify the exposed structure across all the connectors. By setting this option to v1 the structure used in earlier versions can be produced. Note that this setting is not recommended and is planned for removal in a future Debezium version.
- Type: string
- Importance: low
- Default: v2
heartbeat.interval.ms
Controls how frequently heartbeat messages are sent. This property contains an interval in milliseconds that defines how frequently the connector sends messages into a heartbeat topic. This can be used to monitor whether the connector is still receiving change events from the database. You also should leverage heartbeat messages in cases where only records in non-captured collections are changed for a longer period of time. In such situation the connector would proceed to read the oplog from the database but never emit any change messages into Kafka, which in turn means that no offset updates are committed to Kafka. This will cause the oplog files to be rotated out but connector will not notice it so on restart some events are no longer available which leads to the need of re-execution of the initial snapshot.
Set this parameter to
0
to not send heartbeat messages at all. Disabled by default.- Type: int
- Default: 0
skipped.operations
- Comma-separated list of operation types that will be skipped during streaming.
The operations include:
c
for inserts/create,u
for updates, andd
for deletes. By default, no operations are skipped. snapshot.collection.filter.overrides
- This property controls which collection items are included in a snapshot and
affects snapshots only. You must define a comma-separated list of collection
names in the form
databaseName.collectionName
. For each collection you define, also specify thesnapshot.collection.filter.overrides.databaseName.collectionName
configuration property. snapshot.delay.ms
An interval in milliseconds that the connector should wait before taking a snapshot after starting up. Can be used to avoid snapshot interruptions (and potential rebalancing) when starting multiple connectors in a cluster.
- Type: list of strings
- Importance: low
- Default: No default
snapshot.fetch.size
Specifies the maximum number of documents that should be read in one go from each collection while taking a snapshot. The connector will read the collection contents in several batches of this size.
- Type: int
- Default: 0 (indicates the server chooses an appropriate fetch size.)
snapshot.include.collection.list
An optional, comma-separated list of regular expressions that match names of schemas specified in
collection.include.list
for which you want to take the snapshot.- Type: string
- Default: All collections specified in
collection.include.list
snapshot.max.threads
Positive integer value that specifies the maximum number of threads used to perform an initial sync of the collections in a replica set.
- Type: int
- Importance: medium
- Default: 1
snapshot.mode
Specifies the criteria for running a snapshot (for example, the initial sync) upon startup of the connector. The default is
initial
, and specifies the connector reads a snapshot when either no offset is found or if the oplog no longer contains the previous offset. Thenever
option specifies that the connector should never use snapshots; instead, the connector should tail the log.- Type: string
- Importance: low
- Default:
initial
provide.transaction.metadata
When set to true Debezium generates events with transaction boundaries and enriches data events envelope with transaction metadata. For more information, see Transaction Metadata.
- Type: boolean
- Default:
false
retriable.restart.connector.wait.ms
The number of milliseconds to wait before restarting a connector after a retriable error occurs.
- Default: 10000 ms (10 seconds)
mongodb.poll.interval.ms
The interval in which the connector polls for new, removed, or changed replica sets.
- Default: 30000 ms (30 seconds)
mongodb.connect.timeout.ms
The number of milliseconds before a send/receive on the socket can take before a timeout occurs. A value of 0 disables this behavior.
- Default: 10000 ms (10 seconds)
mongodb.heartbeat.frequency.ms
The frequency that the cluster monitor attempts to reach each server.
- Default: 10000 ms (10 seconds)
mongodb.socket.timeout.ms
The number of milliseconds before a send/receive on the socket can take before a timeout occurs. A value of 0 disables this behavior.
- Default: 0
mongodb.server.selection.timeout.ms
The number of milliseconds the driver will wait to select a server before it times out and throws an error.
- Type: string
- Importance: low
- Default: 30000 ms (30 seconds)
cursor.pipeline
- When streaming changes, this setting applies processing to change stream events as part of the standard MongoDB aggregation stream pipeline. The value of this property must be an array of permitted aggregation pipeline stages in JSON format.
cursor.pipeline.order
The order used to construct the effective MongoDB aggregation stream pipeline. You can set the property to one of the following values:
internal_first
: Internal stages defined by the connector are applied firstuser_first
: Stages defined by thecursor.pipeline
property are applied first
For more details about each of these values, see the Debezium documentation.
- Default:
internal_first
cursor.oversize.handling.mode
The strategy used to handle change events for documents exceeding specified BSON size. You can the set the property to one of the following values:
fail
: The connector fails if the total size of change event exceed the maximum BSON size.skip
: Any change events for documents exceeding the maximum (specified by thecursor.oversize.skip.threshold
property) size will be ignored.
- Type: string
- Default:
fail
cursor.oversize.skip.threshold
The maximum allowed size in bytes of the stored document for which change events are processed. This includes both, the size before and after database operation, more specifically this limits the size of
fullDocument
andfullDocumentBeforeChange
filed of MongoDB change events.- Type: int
- Default: 0
cursor.max.await.time.ms
The maximum number of milliseconds the oplog/change stream cursor will wait for the server to produce a result before causing an execution timeout exception. A value of 0 indicates using the server/driver default wait timeout.
- Default: 0
signal.data.collection
Fully-qualified name of the data collection that is used to send signals to the connector. Use the following format to specify the collection name:
<databaseName>.<collectionName>
.- Default: No default
signal.enabled.channels
A list of the signal channel names that are enabled for the connector. The following channels are available by default:
source
,kafka
,file
andjmx
.Note that you may also implement a custom signaling channel.
- Type: string
- Default:
source
notification.enabled.channels
A list of the notification channel names that are enabled for the connector. The following channels are available by default:
sink
,log
andjmx
.Note that you may also implement a custom notification channel.
- Type: string
- Default: No default
incremental.snapshot.chunk.size
The maximum number of documents that the connector fetches and reads into memory during an incremental snapshot chunk. Increasing the chunk size provides greater efficiency, because the snapshot runs fewer snapshot queries of a greater size. However, larger chunk sizes also require more memory to buffer the snapshot data. Adjust the chunk size to a value that provides the best performance in your environment.
- Type: int
- Default: 1024
topic.naming.strategy
The name of the TopicNamingStrategy class that should be used to determine the topic name for data change, schema change, transaction, heartbeat event, and so forth.
- Type: string
- Defaults:
io.debezium.schema.DefaultTopicNamingStrategy
topic.delimiter
Specifies the delimiter for topic name.
- Type: string
- Default:
.
topic.cache.size
The size used for holding the topic names in a bounded concurrent hash map. This cache will help to determine the topic name corresponding to a given data collection.
- Type: int
- Default: 10000
topic.heartbeat.prefix
Sets the name of the topic to which heartbeat messages are sent. The topic is named according to the pattern
<topics.heartbeat.prefix>.<topic.prefix>
. For example, if the topic prefix isfulfillment
, the default topic name is__debezium-heartbeat.fulfillment
.- Type: string
- Importance: low
- Default:
__debezium-heartbeat
topic.transaction
Controls the name of the topic to which the connector sends transaction metadata messages. The topic name has the following pattern:
<topic.prefix>.<topic.transaction>
. For example, if the topic prefix isfulfillment
, the default topic name isfulfillment.transaction
.- Type: string
- Default:
transaction
custom.metric.tags
Accepts key-value pairs to customize the MBean object name. For more details, see Debezium documentation
- Type: string
- Default: No default
error.max.retries
The maximum number of retries on retriable errors—for example, connection errors—before failing. Possible values are:
-1
: No limit0
: Disabled< 0
: Number of retries
- Type: int
- Default: -1 (no limit)
Signal parameters¶
signal.kafka.topic
The name of the Kafka topic that connector monitors for ad-hoc signals.
- Type: string
- Default:
<topic.prefix>-signal
signal.kafka.groupId
The name of the group ID that is used by Kafka consumers.
- Type: string
- Default:
kafka-signal
signal.kafka.bootstrap.servers
A list of host/port pairs that the connector uses for establishing an initial connection to the Kafka cluster. Each pair should point to the same Kafka cluster used by the Kafka Connect process.
- Type: list
- Default: No default
signal.kafka.poll.timeout.ms
An integer value that specifies the maximum number of milliseconds the connector should wait when polling signals. The default is 100ms.
- Type: int
- Default: 100
More details can be found in the Debezium connector properties documentation.
Note
Portions of the information provided here derives from documentation originally produced by the Debezium Community. Work produced by Debezium is licensed under Creative Commons 3.0.