MongoDB Atlas Sink Connector for Confluent Cloud

Note

If you are installing the connector locally for Confluent Platform, see the MongoDB Kafka Connector documentation.

The Kafka Connect MongoDB Atlas Sink connector for Confluent Cloud maps and persists events from Apache Kafka® topics directly to a MongoDB Atlas database collection. The connector supports Avro, JSON Schema, Protobuf, JSON (schemaless), String, or BSON data from Apache Kafka® topics. The connector ingests events from Kafka topics directly into a MongoDB Atlas database, exposing the data to services for querying, enrichment, and analytics.

Important

If you are still on Confluent Cloud Enterprise, please contact your Confluent Account Executive for more information about using this connector.

Features

Note

This connector supports MongoDB Atlas only and will not work with a self-managed MongoDB database.

The MongoDB Atlas sink connector provides the following features:

  • Collections: Collections can be auto-created based on topic names.
  • Database authentication: Uses password authentication.
  • Input data formats: The connector supports Avro, JSON Schema, Protobuf, JSON (schemaless), String, or BSON input data formats. Schema Registry must be enabled to use a Schema Registry-based format (for example, Avro, JSON_SR (JSON Schema), or Protobuf).
  • Select configuration properties:
    • "max.num.retries": How often retries should be attempted on write errors.
    • "max.batch.size": The maximum number of sink records to batch together for processing.
    • "delete.on.null.values": Whether the connector should delete documents with matching key values when the value is null.
    • "doc.id.strategy": The strategy to generate a unique document ID (_id).
    • "write.strategy": Defines the behavior of bulk write operations made on a MongoDB collection.

Configuration properties that are not shown in the Confluent Cloud UI use the default values. For more information, see the MongoDB Sink Connector Configuration Properties.

For more information, see the Confluent Cloud connector limitations.

Quick Start

Use this quick start to get up and running with the Confluent Cloud MongoDB Atlas sink connector. The quick start provides the basics of selecting the connector and configuring it to consume data from Kafka and persist the data to a MongoDB database.

Note

This connector supports MongoDB Atlas only and will not work with a self-managed MongoDB database.

Prerequisites
  • Kafka cluster credentials. You can use one of the following ways to get credentials:
    • Create a Confluent Cloud API key and secret. To create a key and secret, go to Kafka API keys in your cluster or you can autogenerate the API key and secret directly in the UI when setting up the connector.
    • Create a Confluent Cloud service account for the connector.

Adding an IP Whitelist Entry

Important

By default, MongoDB Atlas does not allow external network connections from the Internet. To allow external connections, you can add a specific IP or a CIDR IP range using the IP Whitelist entry dialog box under the Network Access menu in MongoDB.

In order for Confluent Cloud to connect to MongoDB Atlas, you need to specify the public IP address of your Confluent Cloud cluster. Add all of the Confluent Cloud egress IP addresses to the whitelist entry to your MongoDB Atlas cluster.

MongoDB IP Whitelist Entery

Using the Confluent Cloud GUI

Step 1: Launch your Confluent Cloud cluster.

See the Quick Start for Apache Kafka using Confluent Cloud for installation instructions.

Step 2: Add a connector.

Click Connectors. If you already have connectors in your cluster, click Add connector.

Step 3: Select your connector.

Click the MongoDB Atlas Sink connector icon.

MongoDB Atlas Sink Connector Icon

Step 4: Set up the connection.

Complete the following and click Continue.

Note

  • Make sure you have all your prerequisites completed.
  • An asterisk ( * ) designates a required entry.
  1. Select one or more topics.

  2. Enter a connector name.

  3. Select an Input message format (data coming from the Kafka topic): AVRO, JSON_SR (JSON Schema), PROTOBUF, JSON (schemaless), String, or BSON. A valid schema must be available in Schema Registry to use a schema-based message format (for example, Avro, JSON_SR (JSON Schema), or Protobuf).

  4. Enter your Kafka Cluster credentials. The credentials are either the API key and secret or the service account API key and secret.

  5. Enter the MongoDB database details. For the Connection host, use only the hostname and not a full URL. For example: cluster4-r5q3r7.gcp.mongodb.net.

  6. Enter your MongoDB collection name. For multiple topics, this is the default collection the topics are mapped to.

    The following are optional (with the exception of the number of tasks).

  7. Enter the maximum number of retries on a write error. The default value is 3 retries.

  8. Enter value in milliseconds (ms) that a retry gets deferred. The default is 5000 ms (5 seconds).

  9. Enter the maximum number of records to batch together for processing. The default is 0.

  10. Select whether or not the connector deletes documents with matching key values when the value is null. The default is false.

  11. Select the strategy to generate a unique document ID (_id). To delete document when the value is null, this has to be set to FullKeyStrategy, PartialKeyStrategy, or ProvidedInKeyStrategy. The default is BsonOidStrategy. For more information, see DocumentIdAdder.

  12. Depending on the selected strategy, complete the appropriate Document ID strategy projection list fields:

    • If you selected PartialKeyStrategy, allow or block the custom key fields to be projected for ID strategy.
    • If the PartialValueStrategy is chosen, allow or block the custom value fields to be projected for ID strategy.
  13. Select the write model strategy for bulk write operations. The default is ReplaceOneDefaultStrategy.

  14. Enter the number of tasks for the connector. Refer to Confluent Cloud connector limitations for additional information.

Note

Configuration properties that are not listed use the default values. For default values and property definitions, see the MongoDB Sink Connector Configuration Properties.

Step 5: Launch the connector.

Verify the connection details and click Launch.

Launch the connector

Step 6: Check the connector status.

The status for the connector should go from Provisioning to Running. It may take a few minutes.

Check the connector status

Step 7: Check MongoDB

After the connector is running, verify that messages are populating your MongoDB database.

Tip

When you launch a connector, a Dead Letter Queue topic is automatically created. See Dead Letter Queue for details.

For additional information about this connector, see the MongoDB Kafka Connector documentation. Note that not all connector features are provided in the Confluent Cloud connector.

See also

For an example that shows fully-managed Confluent Cloud connectors in action with Confluent Cloud ksqlDB, see the Cloud ETL Demo. This example also shows how to use Confluent Cloud CLI to manage your resources in Confluent Cloud.

../_images/topology.png

Using the Confluent Cloud CLI

Complete the following steps to set up and run the connector using the Confluent Cloud CLI.

Note

Make sure you have all your prerequisites completed.

Step 1: List the available connectors.

Enter the following command to list available connectors:

ccloud connector-catalog list

Step 2: Show the required connector configuration properties.

Enter the following command to show the required connector properties:

ccloud connector-catalog describe <connector-catalog-name>

For example:

ccloud connector-catalog describe MongoDbAtlasSink

Example output:

Following are the required configs:
connector.class: MongoDbAtlasSink
name
kafka.api.key
kafka.api.secret
input.data.format
connection.host
connection.user
connection.password
database
tasks.max
topics

Step 3: Create the connector configuration file.

Create a JSON file that contains the connector configuration properties. The following example shows the required connector properties.

{
    "connector.class": "MongoDbAtlasSink",
    "name": "confluent-mongodb-sink",
    "kafka.api.key": "<my-kafka-api-key",
    "kafka.api.secret": "<my-kafka-api-secret>",
    "input.data.format" : "JSON",
    "connection.host": "<database-host-address>",
    "connection.user": "<my-username>",
    "connection.password": "<my-password>",
    "topics": "<kafka-topic-name>",
    "max.num.retries": "3",
    "retries.defer.timeout": "5000",
    "max.batch.size": "0",
    "database": "<database-name>",
    "collection": "<collection-name>",
    "tasks.max": "1"
}

Note the following property definitions:

  • "connector.class": Identifies the connector plugin name.

  • "name": Sets a name for your new connector.

  • "input.data.format": Sets the input message format (data coming from the Kafka topic). Valid entries are AVRO, JSON_SR, PROTOBUF, JSON, STRING, or BSON. You must have Confluent Cloud Schema Registry configured if using a schema-based message format (for example, Avro, JSON_SR (JSON Schema), or Protobuf).

  • "connection.host": The MongoDB host. Use a hostname address and not a full URL. For example: cluster4-r5q3r7.gcp.mongodb.net.

  • "collection": The MongoDB collection name. For multiple topics, this is the default collection the topics are mapped to.

    The following are optional (with the exception of the number of tasks).

  • "max.num.retries": How often retries should be attempted on write errors. If not used, this property defaults to 3.

  • "retries.defer.timeout": How long (in milliseconds) a retry should get deferred. If not used, the default is 5000 ms.

  • "max.batch.size": The maximum number of sink records to batch together for processing. If not used, this property defaults to 0.

  • "delete.on.null.values": Whether the connector should delete documents with matching key values, when the value is null. If not used, this property defaults to false.

  • "doc.id.strategy": Sets the strategy to generate unique document ID (_id). Enter the strategy to generate a unique document ID (_id). To delete document when the value is null, this has to be set to FullKeyStrategy, PartialKeyStrategy, or ProvidedInKeyStrategy. The default is BsonOidStrategy. For more information, see DocumentIdAdder.

  • Depending on the selected strategy, add the appropriate Document ID strategy projection list:

    • "key.projection.type": For use with PartialKeyStrategy. Use either allowlist or blocklist to allow or block the custom key fields to be projected for ID strategy. If not used, this property defaults to none.
    • "key.projection.list": For use with PartialKeyStrategy. A comma-separated list of key fields to be projected for ID strategy.
    • "value.projection.type": For use with PartialValueStrategy. Use either allowlist or blocklist to allow or block the custom value fields to be projected for ID strategy. If not used, this property defaults to none.
    • "value.projection.list": For use with PartialValueStrategy. A comma-separated list of value fields to be projected for ID strategy.
  • "write.strategy": Sets the write model for bulk write operations. If not used, this property defaults to ReplaceOneDefaultStrategy.

  • Enter the number of tasks for the connector. Refer to Confluent Cloud connector limitations for additional information.

Note

Configuration properties that are not listed use the default values. For default values and property definitions, see the MongoDB Sink Connector Configuration Properties.

Step 4: Load the properties file and create the connector.

Enter the following command to load the configuration and start the connector:

ccloud connector create --config <file-name>.json

For example:

ccloud connector create --config mongo-db-sink.json

Example output:

Created connector confluent-mongodb-sink lcc-ix4dl

Step 5: Check the connector status.

Enter the following command to check the connector status:

ccloud connector list

Example output:

ID          |            Name         | Status  | Type
+-----------+-------------------------+---------+------+
lcc-ix4dl   | confluent-mongodb-sink  | RUNNING | sink

Step 6: Check MongoDB

After the connector is running, verify that records are populating your MongoDB database.

Tip

When you launch a connector, a Dead Letter Queue topic is automatically created. See Dead Letter Queue for details.

For additional information about this connector, see the MongoDB Kafka Connector documentation. Note that not all connector features are provided in the Confluent Cloud connector.

Next Steps

See also

For an example that shows fully-managed Confluent Cloud connectors in action with Confluent Cloud ksqlDB, see the Cloud ETL Demo. This example also shows how to use Confluent Cloud CLI to manage your resources in Confluent Cloud.

../_images/topology.png