Configuration Reference for Debezium MongoDB Source Connector for Confluent Platform

The MongoDB Source Connector can be configured using a variety of configuration properties.

Note

These are properties for the self-managed connector. If you are using Confluent Cloud, see Debezium MongoDB Source Connector for Confluent Cloud.

name

A unique name for the connector. Trying to register again with the same name will fail. This property is required by all Kafka Connect connectors.

  • Type: string
  • Default: No default
connector.class

The name of the Java class for the connector. You should always use a value of io.debezium.connector.mongodb.MongoDbConnector for the MongoDB connector.

  • Type: string
  • Default: No default
mongodb.connection.string

This property replaces mongodb.hosts and specifies a connection string that the connector uses during the initial discovery of a MongoDB replica set. To use this option, you must set the value of mongodb.members.auto.discover to true. Note that the connector uses this connection string only during the initial replica set discovery process. For more details, see the Debezium documentation.

  • Type: string
  • Default: No default
mongodb.connection.string.shard.params

Specifies the URL parameters of the MongoDB connection string, including read preferences, that the connector uses to connect to individual shards of a MongoDB sharded cluster.

  • Type: string
  • Default: No default
mongodb.connection.mode

Specifies the strategy that the connector uses when it connects to a sharded MongoDB cluster. You can set the property to a value of either replica_set or sharded. If set to replica_set, the connector establishes individual connections to the replica set for each shard. When set to a value of sharded The connector establishes a single connection to the database, based on the value of the mongodb.connection.string. For more details, see the Debezium documentation.

  • Type: string
  • Default: replica_set
topic.prefix

A unique name that identifies the connector and/or MongoDB replica set or sharded cluster that this connector monitors. Each server should be monitored by at most one Debezium connector, since this server name prefixes all persisted Kafka topics emanating from the MongoDB replica set or cluster. For more details, see the Debezium documentation.

  • Type: string
  • Default: No default
mongodb.authentication.class

A full Java class name that is an implementation of the io.debezium.connector.mongodb.connection.MongoDbAuthProvider interface. This class handles setting the credentials on the MongoDB connection (called on each app boot). The default behavior uses the mongodb.user, mongodb.password, and mongodb.authsource properties according to each of their documentation, but other implementations may use them differently or ignore them altogether. Note that any setting in mongodb.connection.string will override settings set by this class.

  • Type: string
  • Default: DefaultMongoDbAuthProvider
mongodb.user

Name of the database user to be used when connecting to MongoDB. This is required only when MongoDB is configured to use authentication.

  • Type: string
  • Importance: medium
mongodb.password

Password to be used when connecting to MongoDB. This is required only when MongoDB is configured to use authentication.

  • Type: password
  • Importance: medium
mongodb.authsource

Database (authentication source) containing MongoDB credentials. This is required only when MongoDB is configured to use authentication with another authentication database than admin.

  • Default: admin
mongodb.ssl.enabled

Connector uses SSL to connect to MongoDB instances.

  • Type: string
  • Importance: low
  • Default: false
mongodb.ssl.invalid.hostname.allowed

When SSL is enabled this setting controls whether strict hostname checking is disabled during connection phase. If true the connection does not prevent man-in-the-middle attacks.

  • Type: string
  • Importance: low
  • Default: false
filters.match.mode

The mode used to match events based on included/excluded database and collection names. You can set the property to one of the following values:

  • regex: Database and collection includes/excludes are evaluated as comma-separated list of regular expressions.
  • literal: Database and collection includes/excludes are evaluated as comma-separated list of string literals. Whitespace characters surrounding these literals are stripped.
  • Type: string
  • Default: regex
database.include.list

An optional comma-separated list of regular expressions that match database names to be monitored; any database name not included in database.include.list is excluded from monitoring. By default all databases are monitored. Must not be used with database.exclude.list.

  • Type: list of strings
  • Importance: medium
  • Default: empty string
database.exclude.list

An optional comma-separated list of regular expressions that match database names to be excluded from monitoring; any database name not included in database.exclude.list is monitored. Must not be used with database.include.list.

  • Type: list of strings
  • Importance: medium
  • Default: empty string
collection.include.list

An optional comma-separated list of regular expressions that match fully-qualified namespaces for MongoDB collections to be monitored; any collection not included in collection.include.list is excluded from monitoring. Each identifier is of the form databaseName.collectionName. By default the connector will monitor all collections except those in the local and admin databases. Must not be used with collection.exclude.list.

  • Type: list of strings
  • Importance: medium
  • Default: empty string
collection.exclude.list

An optional comma-separated list of regular expressions that match fully-qualified namespaces for MongoDB collections to be excluded from monitoring; any collection not included in collection.exclude.list is monitored. Each identifier is of the form databaseName.collectionName. Must not be used with collection.include.list.

  • Type: list of strings
  • Importance: medium
  • Default: empty string
capture.mode

Specifies the method the connector uses to capture changes from a MongoDB server. The default, change_streams_update_full, specifies that the connector captures changes through MongoDB Change Streams mechanism and that update events should contain the full document. The change_streams mode uses the same capturing method but update events don’t contain the full document. For a full list of valid values and their descriptions, see the Debezium documentation.

  • Type: string
  • Default: change_streams_update_full
capture.scope

Specifies the scope of the change streams that the connector opens. For a full list of valid values and their descriptions, see the Debezium documentation.

  • Type: string
  • Default: deployment
capture.target

Specifies the database that the connector monitors for changes. This property applies only if the capture.scope is set to database.

  • Type: string
  • Default: deployment
field.exclude.list

An optional comma-separated list of the fully-qualified names of fields that should be excluded from change event message values. Fully-qualified names for fields have the following form: databaseName.collectionName.fieldName.nestedFieldName where databaseName and collectionName may contain the wildcard *, which matches any characters.

  • Type: string
  • Default: empty string
field.renames

An optional comma-separated list of the fully-qualified replacements of fields that should be used to rename fields in change event message values. Fully-qualified replacements for fields are in the form databaseName.collectionName.fieldName.nestedFieldName:newNestedFieldName, where databaseName and collectionName may contain the wildcard (*) which matches any characters. The colon character (:) is used to determine rename mapping of field. Note that the next field replacement is applied to the result of the previous field replacement in the list. Keep this in mind when renaming multiple fields that are in the same path.

  • Type: string
  • Importance: high
  • Default: empty string
tasks.max

The maximum number of tasks that should be created for this connector. The MongoDB connector attempts to use a separate task for each replica set, so the default is acceptable when using the connector with a single MongoDB replica set. When using the connector with a MongoDB sharded cluster, Confluent recommends specifying a value that is equal to or more than the number of shards in the cluster, so that the work for each replica set can be distributed by Kafka Connect.

  • Type: int
  • Importance: medium
  • Default: 1
tombstones.on.delete

Controls whether a tombstone event should be generated after a delete event. When set to true, the delete operations are represented by a delete event and a subsequent tombstone event. When set to false, only a delete event is sent. Emitting the tombstone event (the default behavior) allows Kafka to completely delete all events pertaining to the given key once the source record is deleted.

  • Type: string
  • Importance: low
  • Default: true
schema.name.adjustment.mode

Specifies how schema names should be adjusted for compatibility with the message converter used by the connector. Possible settings are: avro, which replaces the characters that cannot be used in the Avro type name with an underscore, and none which does not apply any adjustment.

  • Type: string
  • Default: none
field.name.adjustment.mode

Specifies how schema names should be adjusted for compatibility with the message converter used by the connector. The following are possible settings:

  • avro: Replaces the characters that cannot be used in the Avro type name with an underscore
  • none: Does not apply any adjustment
  • avro_unicode: Replaces the underscore or characters that cannot be used in the Avro type name with corresponding unicode like _uxxxx. Note that _ is an escape sequence like backslash in Java.

For more details, see Avro naming.

  • Type: string
  • Default: none

Auto topic creation

For more information about Auto topic creation, see Configuring Auto Topic Creation for Source Connectors.

Configuration properties accept regular expressions (regex) that are defined as Java regex.

topic.creation.groups

A list of group aliases that are used to define per-group topic configurations for matching topics. A default group always exists and matches all topics.

  • Type: List of String types
  • Default: empty
  • Possible Values: The values of this property refer to any additional groups. A default group is always defined for topic configurations.
topic.creation.$alias.replication.factor

The replication factor for new topics created by the connector. This value must not be larger than the number of brokers in the Kafka cluster. If this value is larger than the number of Kafka brokers, an error occurs when the connector attempts to create a topic. This is a required property for the default group. This property is optional for any other group defined in topic.creation.groups. Other groups use the Kafka broker default value.

  • Type: int
  • Default: n/a
  • Possible Values: >= 1 for a specific valid value or -1 to use the Kafka broker’s default value.
topic.creation.$alias.partitions

The number of topic partitions created by this connector. This is a required property for the default group. This property is optional for any other group defined in topic.creation.groups. Other groups use the Kafka broker default value.

  • Type: int
  • Default: n/a
  • Possible Values: >= 1 for a specific valid value or -1 to use the Kafka broker’s default value.
topic.creation.$alias.include

A list of strings that represent regular expressions that match topic names. This list is used to include topics with matching values, and apply this group’s specific configuration to the matching topics. $alias applies to any group defined in topic.creation.groups. This property does not apply to the default group.

  • Type: List of String types
  • Default: empty
  • Possible Values: Comma-separated list of exact topic names or regular expressions.
topic.creation.$alias.exclude

A list of strings representing regular expressions that match topic names. This list is used to exclude topics with matching values from getting the group’s specfic configuration. $alias applies to any group defined in topic.creation.groups. This property does not apply to the default group. Note that exclusion rules override any inclusion rules for topics.

  • Type: List of String types
  • Default: empty
  • Possible Values: Comma-separated list of exact topic names or regular expressions.
topic.creation.$alias.${kafkaTopicSpecificConfigName}

Any of the Changing Broker Configurations Dynamically for the version of the Kafka broker where the records will be written. The broker’s topic-level configuration value is used if the configuration is not specified for the rule. $alias applies to the default group as well as any group defined in topic.creation.groups.

  • Type: property values
  • Default: Kafka broker value

Advanced Configuration Properties

max.queue.size

Positive integer value that specifies the maximum size of the blocking queue into which change events read from the database log are placed before they are written to Kafka. This queue can provide backpressure to the oplog reader when, for example, writes to Kafka are slower or if Kafka is not available. Events that appear in the queue are not included in the offsets periodically recorded by this connector. Defaults to 8192, and should always be larger than the maximum batch size specified in the max.batch.size property.

  • Type: int
  • Importance: low
  • Default: 8192
max.batch.size

Positive integer value that specifies the maximum size of each batch of events that should be processed during each iteration of this connector. Defaults to 2048.

  • Type: int
  • Importance: low
  • Default: 2048
max.queue.size.in.bytes

A long value that specifies the maximum volume of the blocking queue in bytes. By default, volume limits are not specified for the blocking queue. To specify the number of bytes that the queue can consume, you must set this property to a positive long value. If max.queue.size is also set, writing to the queue is blocked when the size of the queue reaches the limit specified by either property. For example, if you set max.queue.size to 1000, and set max.queue.size.in.bytes to 5000, writing to the queue is blocked after the queue contains 1000 records, or after the volume of the records in the queue reaches 5000 bytes.

  • Type: long
  • Default: 0
poll.interval.ms

Positive integer value that specifies the number of milliseconds the connector should wait during each iteration for new change events to appear. Defaults to 500 milliseconds (.5 seconds).

  • Type: int
  • Importance: low
  • Default: 1000
connect.backoff.initial.delay.ms

Positive integer value that specifies the initial delay when trying to reconnect to a primary after the first failed connection attempt or when no primary is available. Defaults to 1000 milliseconds (1 second).

  • Type: int
  • Importance: low
  • Default: 1000
connect.backoff.max.delay.ms

Positive integer value that specifies the maximum delay when trying to reconnect to a primary after repeated failed connection attempts or when no primary is available. Defaults to 120000 milliseconds (120 seconds).

  • Type: int
  • Importance: low
  • Default: 1000
connect.max.attempts

Positive integer value that specifies the maximum number of failed connection attempts to a replica set primary before an exception occurs and task is aborted. Defaults to 16 which results in a little over 20 minutes of attempts before failing, if using the defaults for connect.backoff.initial.delay.ms and connect.backoff.max.delay.ms.

  • Type: int
  • Importance: low
  • Default: 16
source.struct.version

Schema version for the source block in CDC events. Debezium 0.10 introduced a few breaking changes to the structure of the source block in order to unify the exposed structure across all the connectors. By setting this option to v1 the structure used in earlier versions can be produced. Note that this setting is not recommended and is planned for removal in a future Debezium version.

  • Type: string
  • Importance: low
  • Default: v2
heartbeat.interval.ms

Controls how frequently heartbeat messages are sent. This property contains an interval in milliseconds that defines how frequently the connector sends messages into a heartbeat topic. This can be used to monitor whether the connector is still receiving change events from the database. You also should leverage heartbeat messages in cases where only records in non-captured collections are changed for a longer period of time. In such situation the connector would proceed to read the oplog from the database but never emit any change messages into Kafka, which in turn means that no offset updates are committed to Kafka. This will cause the oplog files to be rotated out but connector will not notice it so on restart some events are no longer available which leads to the need of re-execution of the initial snapshot.

Set this parameter to 0 to not send heartbeat messages at all. Disabled by default.

  • Type: int
  • Default: 0
skipped.operations
Comma-separated list of operation types that will be skipped during streaming. The operations include: c for inserts/create, u for updates, and d for deletes. By default, no operations are skipped.
snapshot.collection.filter.overrides
This property controls which collection items are included in a snapshot and affects snapshots only. You must define a comma-separated list of collection names in the form databaseName.collectionName. For each collection you define, also specify the snapshot.collection.filter.overrides.databaseName.collectionName configuration property.
snapshot.delay.ms

An interval in milliseconds that the connector should wait before taking a snapshot after starting up. Can be used to avoid snapshot interruptions (and potential rebalancing) when starting multiple connectors in a cluster.

  • Type: list of strings
  • Importance: low
  • Default: No default
snapshot.fetch.size

Specifies the maximum number of documents that should be read in one go from each collection while taking a snapshot. The connector will read the collection contents in several batches of this size.

  • Type: int
  • Default: 0 (indicates the server chooses an appropriate fetch size.)
snapshot.include.collection.list

An optional, comma-separated list of regular expressions that match names of schemas specified in collection.include.list for which you want to take the snapshot.

  • Type: string
  • Default: All collections specified in collection.include.list
snapshot.max.threads

Positive integer value that specifies the maximum number of threads used to perform an initial sync of the collections in a replica set.

  • Type: int
  • Importance: medium
  • Default: 1
snapshot.mode

Specifies the criteria for running a snapshot (for example, the initial sync) upon startup of the connector. The default is initial, and specifies the connector reads a snapshot when either no offset is found or if the oplog no longer contains the previous offset. The never option specifies that the connector should never use snapshots; instead, the connector should tail the log.

  • Type: string
  • Importance: low
  • Default: initial
provide.transaction.metadata

When set to true Debezium generates events with transaction boundaries and enriches data events envelope with transaction metadata. For more information, see Transaction Metadata.

  • Type: boolean
  • Default: false
retriable.restart.connector.wait.ms

The number of milliseconds to wait before restarting a connector after a retriable error occurs.

  • Default: 10000 ms (10 seconds)
mongodb.poll.interval.ms

The interval in which the connector polls for new, removed, or changed replica sets.

  • Default: 30000 ms (30 seconds)
mongodb.connect.timeout.ms

The number of milliseconds before a send/receive on the socket can take before a timeout occurs. A value of 0 disables this behavior.

  • Default: 10000 ms (10 seconds)
mongodb.heartbeat.frequency.ms

The frequency that the cluster monitor attempts to reach each server.

  • Default: 10000 ms (10 seconds)
mongodb.socket.timeout.ms

The number of milliseconds before a send/receive on the socket can take before a timeout occurs. A value of 0 disables this behavior.

  • Default: 0
mongodb.server.selection.timeout.ms

The number of milliseconds the driver will wait to select a server before it times out and throws an error.

  • Type: string
  • Importance: low
  • Default: 30000 ms (30 seconds)
cursor.pipeline
When streaming changes, this setting applies processing to change stream events as part of the standard MongoDB aggregation stream pipeline. The value of this property must be an array of permitted aggregation pipeline stages in JSON format.
cursor.pipeline.order

The order used to construct the effective MongoDB aggregation stream pipeline. You can set the property to one of the following values:

  • internal_first: Internal stages defined by the connector are applied first
  • user_first: Stages defined by the cursor.pipeline property are applied first

For more details about each of these values, see the Debezium documentation.

  • Default: internal_first
cursor.oversize.handling.mode

The strategy used to handle change events for documents exceeding specified BSON size. You can the set the property to one of the following values:

  • fail: The connector fails if the total size of change event exceed the maximum BSON size.
  • skip: Any change events for documents exceeding the maximum (specified by the cursor.oversize.skip.threshold property) size will be ignored.
  • Type: string
  • Default: fail
cursor.oversize.skip.threshold

The maximum allowed size in bytes of the stored document for which change events are processed. This includes both, the size before and after database operation, more specifically this limits the size of fullDocument and fullDocumentBeforeChange filed of MongoDB change events.

  • Type: int
  • Default: 0
cursor.max.await.time.ms

The maximum number of milliseconds the oplog/change stream cursor will wait for the server to produce a result before causing an execution timeout exception. A value of 0 indicates using the server/driver default wait timeout.

  • Default: 0
signal.data.collection

Fully-qualified name of the data collection that is used to send signals to the connector. Use the following format to specify the collection name: <databaseName>.<collectionName>.

  • Default: No default
signal.enabled.channels

A list of the signal channel names that are enabled for the connector. The following channels are available by default: source, kafka, file and jmx.

Note that you may also implement a custom signaling channel.

  • Type: string
  • Default: source
notification.enabled.channels

A list of the notification channel names that are enabled for the connector. The following channels are available by default: sink, log and jmx.

Note that you may also implement a custom notification channel.

  • Type: string
  • Default: No default
incremental.snapshot.chunk.size

The maximum number of documents that the connector fetches and reads into memory during an incremental snapshot chunk. Increasing the chunk size provides greater efficiency, because the snapshot runs fewer snapshot queries of a greater size. However, larger chunk sizes also require more memory to buffer the snapshot data. Adjust the chunk size to a value that provides the best performance in your environment.

  • Type: int
  • Default: 1024
topic.naming.strategy

The name of the TopicNamingStrategy class that should be used to determine the topic name for data change, schema change, transaction, heartbeat event, and so forth.

  • Type: string
  • Defaults: io.debezium.schema.DefaultTopicNamingStrategy
topic.delimiter

Specifies the delimiter for topic name.

  • Type: string
  • Default: .
topic.cache.size

The size used for holding the topic names in a bounded concurrent hash map. This cache will help to determine the topic name corresponding to a given data collection.

  • Type: int
  • Default: 10000
topic.heartbeat.prefix

Sets the name of the topic to which heartbeat messages are sent. The topic is named according to the pattern <topics.heartbeat.prefix>.<topic.prefix>. For example, if the topic prefix is fulfillment, the default topic name is __debezium-heartbeat.fulfillment.

  • Type: string
  • Importance: low
  • Default: __debezium-heartbeat
topic.transaction

Controls the name of the topic to which the connector sends transaction metadata messages. The topic name has the following pattern: <topic.prefix>.<topic.transaction>. For example, if the topic prefix is fulfillment, the default topic name is fulfillment.transaction.

  • Type: string
  • Default: transaction
custom.metric.tags

Accepts key-value pairs to customize the MBean object name. For more details, see Debezium documentation

  • Type: string
  • Default: No default
error.max.retries

The maximum number of retries on retriable errors—for example, connection errors—before failing. Possible values are:

  • -1: No limit
  • 0: Disabled
  • < 0: Number of retries
  • Type: int
  • Default: -1 (no limit)

Signal parameters

signal.kafka.topic

The name of the Kafka topic that connector monitors for ad-hoc signals.

  • Type: string
  • Default: <topic.prefix>-signal
signal.kafka.groupId

The name of the group ID that is used by Kafka consumers.

  • Type: string
  • Default: kafka-signal
signal.kafka.bootstrap.servers

A list of host/port pairs that the connector uses for establishing an initial connection to the Kafka cluster. Each pair should point to the same Kafka cluster used by the Kafka Connect process.

  • Type: list
  • Default: No default
signal.kafka.poll.timeout.ms

An integer value that specifies the maximum number of milliseconds the connector should wait when polling signals. The default is 100ms.

  • Type: int
  • Default: 100

More details can be found in the Debezium connector properties documentation.

Note

Portions of the information provided here derives from documentation originally produced by the Debezium Community. Work produced by Debezium is licensed under Creative Commons 3.0.