MongoDB Atlas Source Connector for Confluent Cloud¶
Note
This is a Quick Start for the managed cloud connector. If you are installing the connector locally for Confluent Platform, see MongoDB Source Connector for Confluent Platform.
The Kafka Connect MongoDB Atlas Source connector for Confluent Cloud moves data from a MongoDB replica set into an Apache Kafka® cluster. The connector configures and consumes change stream event documents and publishes them to a Kafka topic.
Features¶
Note
This connector supports MongoDB Atlas only and will not work with a self-managed MongoDB database.
The MongoDB Atlas Source connector provides the following features:
- Topics created automatically: The connector automatically creates Kafka topics using the naming convention:
<prefix>.<database-name>.<collection-name>
. The tables are created with the properties:topic.creation.default.partitions=1
andtopic.creation.default.replication.factor=3
. You add the prefix when setting up the connection in the Quick Start steps. For more information, see Maximum message size. Note that if you want to create topics with specific settings, create the topics before running this connector. - Database authentication: Uses password authentication.
- Output data formats: Supports Avro, Byte, JSON (schemaless), JSON Schema, Protobuf or String output data. Schema Registry must be enabled to use a Schema Registry-based format (for example, Avro, JSON_SR (JSON Schema), or Protobuf). See Schema Registry Enabled Environments for additional information.
- Large size records: Supports MongoDb documents up to 8 MB in size on dedicated Kafka clusters and 2 MB on other clusters.
- Select configuration properties:
poll.await.time.ms
: The amount of time to wait before checking for new results in the change stream.poll.max.batch.size
: The maximum number of change stream documents to include in a single batch when polling for new data. This setting can be used to limit the amount of data buffered internally in the connector.
For more information and examples to use with the Confluent Cloud API for Connect, see the Confluent Cloud API for Connect section.
Limitations¶
Be sure to review the following information.
- For connector limitations, see MongoDB Atlas Source Connector limitations.
- If you plan to use one or more Single Message Transforms (SMTs), see SMT Limitations.
- If you plan to use Confluent Cloud Schema Registry, see Schema Registry Enabled Environments.
Maximum message size¶
This connector creates topics automatically. When it creates topics, the internal connector configuration property max.message.bytes
is set to the following:
- Basic cluster:
2 MB
- Standard cluster:
2 MB
- Dedicated cluster:
20 MB
For more information about Confluent Cloud clusters, see Confluent Cloud Features and Limits by Cluster Type.
Quick Start¶
Use this quick start to get up and running with the Confluent Cloud MongoDB Atlas Source connector. The quick start provides the basics of selecting the connector and configuring it to consume data from Kafka and persist the data to a MongoDB database.
Note
This connector supports MongoDB Atlas only and will not work with a self-managed MongoDB database.
- Prerequisites
- Authorized access to a Confluent Cloud cluster on Amazon Web Services (AWS), Microsoft Azure (Azure), or Google Cloud Platform (GCP).
- The Confluent CLI installed and configured for the cluster. See Install the Confluent CLI.
- Schema Registry must be enabled to use a Schema Registry-based format (for example, Avro, JSON_SR (JSON Schema), or Protobuf). See Schema Registry Enabled Environments for additional information.
- Access to a MongoDB database. Note that the connection user must have privileged action “find” to query the MongoDB database. For more information, see Query and Write Actions.
- The connector automatically creates Kafka topics using the naming convention:
<prefix>.<database-name>.<collection-name>
. The tables are created with the properties:topic.creation.default.partitions=1
andtopic.creation.default.replication.factor=3
. If you want to create topics with specific settings, please create the topics before running this connector. - If you have a VPC-peered cluster in Confluent Cloud, consider configuring a PrivateLink Connection between MongoDB Atlas and the VPC. For additional networking considerations, see Networking, DNS, and service endpoints. To use static egress IPs, see Static Egress IP Addresses for Confluent Cloud Connectors.
- Kafka cluster credentials. The following lists the different ways you can provide credentials.
- Enter an existing service account resource ID.
- Create a Confluent Cloud service account for the connector. Make sure to review the ACL entries required in the service account documentation. Some connectors have specific ACL requirements.
- Create a Confluent Cloud API key and secret. To create a key and secret, you can use confluent api-key create or you can autogenerate the API key and secret directly in the Cloud Console when setting up the connector.
Using the Confluent Cloud Console¶
Step 1: Launch your Confluent Cloud cluster.¶
See the Quick Start for Confluent Cloud for installation instructions.
Step 2: Add a connector.¶
In the left navigation menu, click Connectors. If you already have connectors in your cluster, click + Add connector.
Step 4: Enter the connector details.¶
Note
- Make sure you have all your prerequisites completed.
- An asterisk ( * ) designates a required entry.
At the MongoDB Atlas Source Connector screen, complete the following:
<topic.prefix><tableName>
.- Select the way you want to provide Kafka Cluster credentials. You can
choose one of the following options:
- Global Access: Allows your connector to access everything you have access to. With global access, connector access will be linked to your account. This option is not recommended for production.
- Granular access: Limits the access for your connector. You will be able to manage connector access through a service account. This option is recommended for production.
- Use an existing API key: Allows you to enter an API key and secret part you have stored. You can enter an API key and secret (or generate these in the Cloud Console).
- Click Continue.
- Add the following database connection details:
- Connection host: MongoDB Atlas connection host. For example:
cluster4-r5q3r7.gcp.mongodb.net
. - Connection user: MongoDB Atlas connection user.
- Connection password: MongoDB Atlas connection password.
- Database name: MongoDB Atlas database name. If not set, all databases in the cluster are watched.
- Collection name: Single MongoDB collection to watch. If not set, all collections databases in the cluster are watched.
- Connection host: MongoDB Atlas connection host. For example:
- Click Continue.
Configure the following:
- Select the output record value format (data going to the Kafka topic): AVRO, BSON, JSON, JSON_SR (JSON Schema), PROTOBUF, or STRING. Schema Registry must be enabled to use a Schema Registry-based format (for example, Avro, JSON Schema, or Protobuf). For additional information, see Schema Registry Enabled Environments.
Show advanced configurations
JSON output decimal format: Specify the JSON/JSON_SR serialization format for Connect
DECIMAL
logical type values with two allowed literals:BASE64
to serialize DECIMAL logical types as base64 encoded binary data andNUMERIC
to serialize Connect DECIMAL logical type values in JSON/JSON_SR as a number representing the decimal value.Topic separator: A separator to use when the connector joins prefix, database, collection, and suffix values. These joined values create the Kafka topic name where data is published. Defaults to
.
.Topic suffix: A suffix to append to database and collection names to generate the name of the Kafka topic the connector creates.
Topic namespace map: JSON object that maps change stream document namespaces to topics.
Publish full document only: Set whether to only publish the changed document instead of the full change stream document. Sets the
change.stream.full.document=updateLookup
setting so updated documents will be included.Change stream full document: Select what to return for update operations when using a change Stream. When set to
updateLookup
, the change stream for partial updates will include both a delta describing the changes to the document as well as a copy of the entire document that was changed from some point in time after the change occurred.Output JSON formatter: Sets the output format of JSON strings. The format can be either,
DefaultJson
,ExtendedJson
, orSimplifiedJson
.Poll wait time (ms): The amount of time to wait before checking for new results on the change stream.
Maximum documents to include in a batch: The maximum number of change stream documents to include in a single batch when polling for new data.
Pipeline: An array of JSON objects describing the pipeline operations to filter or modify the change events output.
Copy existing data: Whether to copy existing data from source collections. Setting this to true may cause record duplications.”
Copy existing namespace regex: Regular expression that matches the namespaces (
databaseName.collectionName
) from which to copy data.Copy existing pipeline: An array of JSON objects describing the pipeline operations to run when copying existing data. It will only be applied for existing documents which are being copied.
Cursor batch size: The number of documents to return in a batch. The value defaults to
0
. The maximum cursor batch size is50
.Heartbeat interval: The number of milliseconds the connector waits between sending heartbeat messages.
Heartbeat topic name: The name of the topic on which the connector should publish heartbeat messages. You must provide a positive value in the
heartbeat.interval.ms
setting to enable this feature.Error tolerance: Allows you to customize how the connector handles errors. By default, this is set to
NONE
and the connector handles errors using the error handling tolerance configured for the Connect framework.Output errors: Whether or not the connector sends output conversion errors to the dead letter queue (DLQ). When using a schema, this prevents unprocessable (poison) messages from causing the connector task to fail. The connector outputs messages to the DLQ as extended JSON for the specified topic. Enabling this property requires that the Error tolerance property be set to
all
. By default, the connector does not output messages to the DLQ.Server API version: The MongoDB server API version to use. This property is disabled by default.
Deprecation errors: Whether or not to require the connector to report the use of deprecated server APIs as errors. This property is disabled by default.
Server API: Whether or not to require strict server API version enforcement. This property is disabled by default.
Transforms and Predicates: For details, see the Single Message Transforms (SMT) documentation.
For all property values and definitions, see Configuration Properties.
Click Continue.
The connector supports running a single task.
Click Continue.
Verify the connection details by previewing the running configuration.
Tip
For information about previewing your connector output, see Confluent Cloud Connector Data Previews.
After you’ve validated that the properties are configured to your satisfaction, click Launch.
The status for the connector should go from Provisioning to Running.
Step 5: Check the Kafka topic.¶
After the connector is running, verify that MongoDB documents are populating the
Kafka topic. If the config copy.existing
is set to true and the connector
restarts due to any reason, you may see duplicate records in the topic.
For more information and examples to use with the Confluent Cloud API for Connect, see the Confluent Cloud API for Connect section.
Using the Confluent CLI¶
Complete the following steps to set up and run the connector using the Confluent CLI.
Note
Make sure you have all your prerequisites completed.
Step 1: List the available connectors.¶
Enter the following command to list available connectors:
confluent connect plugin list
Step 2: List the connector configuration properties.¶
Enter the following command to show the connector configuration properties:
confluent connect plugin describe <connector-catalog-name>
The command output shows the required and optional configuration properties.
Step 3: Create the connector configuration file.¶
Create a JSON file that contains the connector configuration properties. The following example shows the required connector properties.
{
"connector.class": "MongoDbAtlasSource",
"name": "<my-connector-name>",
"kafka.auth.mode": "KAFKA_API_KEY",
"kafka.api.key": "<my-kafka-api-key>",
"kafka.api.secret": "<my-kafka-api-secret>",
"topic.prefix": "<topic-prefix>",
"connection.host": "<database-host-address>",
"connection.user": "<database-username>",
"connection.password": "<database-password>",
"database": "<database-name>",
"collection": "<database-collection-name>",
"poll.await.time.ms": "5000",
"poll.max.batch.size": "1000",
"copy.existing": "true",
"output.data.format": "JSON"
"tasks.max": "1"
}
Note the following property definitions:
"connector.class"
: Identifies the connector plugin name."name"
: Sets a name for your new connector.
"kafka.auth.mode"
: Identifies the connector authentication mode you want to use. There are two options:SERVICE_ACCOUNT
orKAFKA_API_KEY
(the default). To use an API key and secret, specify the configuration propertieskafka.api.key
andkafka.api.secret
, as shown in the example configuration (above). To use a service account, specify the Resource ID in the propertykafka.service.account.id=<service-account-resource-ID>
. To list the available service account resource IDs, use the following command:confluent iam service-account list
For example:
confluent iam service-account list Id | Resource ID | Name | Description +---------+-------------+-------------------+------------------- 123456 | sa-l1r23m | sa-1 | Service account 1 789101 | sa-l4d56p | sa-2 | Service account 2
(Optional)
"topic.prefix"
: Enter a topic prefix. The connector automatically creates Kafka topics using the naming convention:<prefix>.<database-name>.<collection-name>
. The tables are created with the properties:topic.creation.default.partitions=1
andtopic.creation.default.replication.factor=3
. If you want to create topics with specific settings, create the topics before running this connector. If you are using a dedicated cluster and have a MongoDb document of size more than 2MB, create the topic beforehand with topic configmax.message.bytes
set equals to more than the largest document size (max is 8388608 bytes).(Optional)
"topic.namespace.map"
: A JSON map that maps change stream document namespaces to topics. For example:{\"db\": \"dbTopic\", \"db.coll\": \"dbCollTopic\"}
will map all change stream documents from thedb
database todbTopic.<collectionName>
apart from any documents from thedb.coll
namespace which map to thedbCollTopic
topic. If you want to map all messages to a single topic use*
. For example:{\"*\": \"everyThingTopic\", \"db.coll\": \"exceptionToTheRuleTopic\"}
will map all change stream documents to theeveryThingTopic
apart from thedb.coll
messages. Note that any prefix configuration will still apply. If multiple collections with records having varying schema are mapped to a single topic with AVRO, JSON_SR, and PROTOBUF, then multiple schemas will be registered under a single subject name. If these schemas are not backward compatible to each other, the connector will fail until you change the schema compatibility in Confluent Cloud Schema Registry."connection.host"
: The MongoDB host. Use a hostname address and not a full URL. For example:cluster4-r5q3r7.gcp.mongodb.net
.(Optional)
"collection"
: The collection name. If the property is not used, all collections are watched in the supplied database.(Optional)
"poll.await.time.ms"
: The amount of time to wait before checking for new results in the change stream. If not used, this property defaults to 5000 ms (5 seconds).(Optional)
"poll.max.batch.size"
: The maximum number of change stream documents to include in a single batch when polling for new data. This setting can be used to limit the amount of data buffered internally in the connector. If not used, this property defaults to 1000 records.(Optional)
"pipeline"
: An array of JSON objects that represents the pipeline operations to filter or modify the change stream output. For example:[{"$match": {"ns.coll": {"$regex": /^(collection1|collection2)$/}}}]
sets the connector to listen to thecollection1
andcollection2
collections only. If not used, this property defaults to an empty array.(Optional)
"copy.existing"
: Select whether to copy existing data from source collections and convert them to change stream events on the respective topics. Any change to the data that occurs during the copy process is applied once the copy is completed. Note that settingcopy.existing
totrue
can lead to duplicate records if the connector restarts. For example: when using Schema Registry-based output format, if the schemas are not backward compatible to each other, the connector will fail and restart, which creates duplicate records. If not used, this property defaults tofalse
.(Optional)
"copy.existing.namespace.regex"
: Regex that matches the namespaces from which the existing documents are copied. A namespace is represented asdatabaseName.collectionName
. For example,stats\.page.*
matches all collections that start withpage
in thestats
database.(Optional)
"copy.existing.pipeline"
: An array of JSON objects that describes the pipeline operations to run when copying existing data. It is applied to existing documents that are being copied. If not used, this property defaults to an empty array."output.data.format"
: Sets the output Kafka record value format (data coming from the connector). Valid entries are AVRO, JSON_SR, PROTOBUF, or JSON. You must have Confluent Cloud Schema Registry configured if using a schema-based message format (for example, Avro, JSON_SR (JSON Schema), or Protobuf).(Optional)
"heartbeat.interval.ms"
: The number of milliseconds the connector waits between sending heartbeat messages. If not used, this property defaults to 0. Thus, no heartbeat message is sent by default. If set to a positive number, the connector sends heartbeat messages when source records are not published in the specified interval. This mechanism improves resumability of the connector for low volume namespaces. See the Invalid Resume Token page in MongoDb documentation for more information on this feature. When using SMTs, use predicates to prevent SMTs from processing the heartbeat messages. For example, if the heartbeat topic name is__mongodb_heartbeats
and the connector is writing the actual database records into topics that do not share common prefix with the heartbeat topic; use the following configuration to prevent heartbeat messages from being processed by the transform with an alias say,mongoTransform
:"predicates": "isHeartbeatTopicPrefix"
,"predicates.isHeartbeatTopicPrefix.type": "org.apache.kafka.connect.transforms.predicates.TopicNameMatches"
,"predicates.isHeartbeatTopicPrefix.pattern": "__mongodb.*"
,"transforms.mongoTransform.predicate": "isHeartbeatTopicPrefix"
,"transforms.mongoTransform.negate": "true"
.(Optional)
"heartbeat.topic.name"
: The name of the topic on which the connector should publish heartbeat messages. You must provide a positive value in theheartbeat.interval.ms
setting to enable this feature. If setting the heartbeat messages for multiple connectors, you must ensure that the heartbeat topic names for the connectors are unique. If not set, this defaults to__mongodb_heartbeats
."tasks.max"
: The connector supports running a single task.
Single Message Transforms: See the Single Message Transforms (SMT) documentation for details about adding SMTs using the CLI.
See Configuration Properties for all property values and definitions.
Step 4: Load the properties file and create the connector.¶
Enter the following command to load the configuration and start the connector:
confluent connect create --config <file-name>.json
For example:
confluent connect create --config mongo-db-source.json
Example output:
Created connector confluent-mongodb-source lcc-ix4dl
Step 5: Check the connector status.¶
Enter the following command to check the connector status:
confluent connect list
Example output:
ID | Name | Status | Type
+-----------+---------------------------+---------+-------+
lcc-ix4dl | confluent-mongodb-source | RUNNING | source
Step 6: Check the Kafka topic.¶
After the connector is running, verify that MongoDB documents are populating the
Kafka topic. If the config copy.existing
is set to true and the connector
restarts due to any reason, you may see duplicate records in the topic.
For more information and examples to use with the Confluent Cloud API for Connect, see the Confluent Cloud API for Connect section.
Configuration Properties¶
Use the following configuration properties with this connector.
Note
These are properties for the managed cloud connector. If you are installing the connector locally for Confluent Platform, see MongoDB Source Connector for Confluent Platform.
How should we connect to your data?¶
name
Sets a name for your connector.
- Type: string
- Valid Values: A string at most 64 characters long
- Importance: high
Kafka Cluster credentials¶
kafka.auth.mode
Kafka Authentication mode. It can be one of KAFKA_API_KEY or SERVICE_ACCOUNT. It defaults to KAFKA_API_KEY mode.
- Type: string
- Default: KAFKA_API_KEY
- Valid Values: KAFKA_API_KEY, SERVICE_ACCOUNT
- Importance: high
kafka.api.key
- Type: password
- Importance: high
kafka.service.account.id
The Service Account that will be used to generate the API keys to communicate with Kafka Cluster.
- Type: string
- Importance: high
kafka.api.secret
- Type: password
- Importance: high
How do you want to name your topic(s)?¶
topic.prefix
Prefix to prepend to table names to generate the name of the Apache Kafka® topic to publish data to.
- Type: string
- Importance: high
topic.namespace.map
JSON object that maps change stream document namespaces to topics. Any prefix configuration will still apply. In case multiple collections with records having varying schema are mapped to single topic with AVRO, JSON_SR, and PROTOBUF, then multiple schemas will be registered under single subject name. If these schemas are not backward compatible to each other, the connector will fail until you change the schema compatibility in Confluent Cloud Schema Registry.
- Type: string
- Default: “”
- Importance: low
How should we connect to your MongoDB Atlas database?¶
connection.host
MongoDB Atlas connection host (e.g. confluent-test.mycluster.mongodb.net).
- Type: string
- Default: “”
- Importance: high
connection.user
MongoDB Atlas connection user.
- Type: string
- Importance: high
connection.password
MongoDB Atlas connection password.
- Type: password
- Importance: high
database
MongoDB Atlas database name. If not set, all databases in the cluster are watched.
- Type: string
- Importance: high
Database details¶
collection
Single MongoDB Atlas collection to watch. If not set, all collections in the specified database are watched.
- Type: string
- Importance: medium
Connection details¶
poll.await.time.ms
The amount of time to wait before checking for new results on the change stream.
- Type: int
- Default: 5000 (5 seconds)
- Valid Values: [1,…]
- Importance: low
poll.max.batch.size
Maximum number of change stream documents to include in a single batch when polling for new data. This setting can be used to limit the amount of data buffered internally in the connector.
- Type: int
- Default: 100
- Valid Values: [1,…,1000]
- Importance: low
pipeline
An array of JSON objects describing the pipeline operations to filter or modify the change events output. For example, [{“$match”: {“ns.coll”: {“$regex”: /^(collection1|collection2)$/}}}] will set your source connector to listen to the “collection1” and “collection2” collections only.
- Type: string
- Default: []
- Importance: medium
copy.existing
Whether to copy existing data from source collections. Setting this to true may cause record duplications.
- Type: boolean
- Default: false
- Importance: high
copy.existing.namespace.regex
Regular expression that matches the namespaces (databaseName.collectionName) from which to copy data. For example, stats.page.* matches all collections that starts with “page” in “stats” database.
- Type: string
- Default: “”
- Importance: medium
copy.existing.pipeline
An array of JSON objects describing the pipeline operations to run when copying existing data. It will only be applied for existing documents which are being copied.
- Type: string
- Default: []
- Importance: medium
batch.size
The number of documents to return in a batch.
- Type: int
- Default: 0
- Valid Values: […,50]
- Importance: low
Output messages¶
output.data.format
Sets the output Kafka record value format. Valid entries are AVRO, JSON_SR, PROTOBUF, JSON, STRING or BSON. Note that you need to have Confluent Cloud Schema Registry configured if using a schema-based message format like AVRO, JSON_SR, and PROTOBUF
- Type: string
- Default: STRING
- Importance: high
publish.full.document.only
Only publish the changed document instead of the full change stream document. Sets the change.stream.full.document=updateLookup automatically so updated documents will be included.
- Type: boolean
- Default: false
- Importance: high
change.stream.full.document
Determines what to return for update operations when using a Change Stream. When set to ‘updateLookup’, the change stream for partial updates will include both a delta describing the changes to the document as well as a copy of the entire document that was changed from some point in time after the change occurred.
- Type: string
- Default: default
- Importance: high
json.output.decimal.format
Specify the JSON/JSON_SR serialization format for Connect DECIMAL logical type values with two allowed literals:
BASE64 to serialize DECIMAL logical types as base64 encoded binary data and
NUMERIC to serialize Connect DECIMAL logical type values in JSON/JSON_SR as a number representing the decimal value.
- Type: string
- Default: BASE64
- Importance: low
output.json.format
The output format of json strings can be configured to be either: DefaultJson: The legacy strict json formatter. ExtendedJson: The fully type safe extended json formatter. SimplifiedJson: Simplified Json, with ObjectId, Decimals, Dates and Binary values represented as strings. Users can provide their own implementation of the com.mongodb.kafka.connect.source.json.formatter.
- Type: string
- Default: DefaultJson
- Importance: high
topic.separator
Separator to use when joining prefix, database, collection, and suffix values. This generates the name of the Kafka topic to publish data to. Used by the ‘DefaultTopicMapper’.
- Type: string
- Default: .
- Importance: low
topic.suffix
Suffix to append to database and collection names to generate the name of the Kafka topic to publish data to.
- Type: string
- Importance: low
Error handling¶
heartbeat.interval.ms
The number of milliseconds the connector waits between sending heartbeat messages. The connector sends heartbeat messages when source records are not published in the specified interval. This mechanism improves resumability of the connector for low volume namespaces. When using SMTs, use predicates to prevent SMTs from processing the heartbeat messages. See connector documentation for more details.
- Type: int
- Default: 0
- Importance: medium
heartbeat.topic.name
The name of the topic on which the connector should publish heartbeat messages. You must provide a positive value in the “heartbeat.interval.ms” setting to enable this feature.
- Type: string
- Default: __mongodb_heartbeats
- Importance: medium
mongo.errors.tolerance
Use this property if you would like to configure the connector’s error handling behavior differently from the Connect framework’s.
- Type: string
- Default: NONE
- Importance: medium
mongo.errors.deadletterqueue.topic.name
Whether to output conversion errors to the dead letter queue. Stops poison messages when using schemas, any message will be outputted as extended json on the specified topic. By default messages are not outputted to the dead letter queue. Also requires errors.tolerance=all.
- Type: string
- Importance: medium
Server API¶
server.api.version
The server API version to use. Disabled by default.
- Type: string
- Importance: low
server.api.deprecation.errors
Sets whether the connector requires use of deprecated server APIs to be reported as errors.
- Type: boolean
- Default: false
- Importance: low
server.api.strict
Sets whether the application requires strict server API version enforcement.
- Type: boolean
- Default: false
- Importance: low
Number of tasks for this connector¶
tasks.max
- Type: int
- Valid Values: [1,…,1]
- Importance: high
Suggested Reading¶
Blog post: Using the Fully Managed MongoDB Atlas Connector in a Secure Environment
Blog post: Announcing the MongoDB Atlas Sink and Source connectors in Confluent Cloud
Next Steps¶
For an example that shows fully-managed Confluent Cloud connectors in action with Confluent Cloud ksqlDB, see the Cloud ETL Demo. This example also shows how to use Confluent CLI to manage your resources in Confluent Cloud.