MongoDB Atlas Sink Connector for Confluent Cloud

Note

This is a Quick Start for the managed cloud connector. If you are installing the connector locally for Confluent Platform, see the MongoDB Kafka Connector documentation.

The Kafka Connect MongoDB Atlas Sink connector for Confluent Cloud maps and persists events from Apache Kafka® topics directly to a MongoDB Atlas database collection. The connector supports Avro, JSON Schema, Protobuf, JSON (schemaless), String, or BSON data from Apache Kafka® topics. The connector ingests events from Kafka topics directly into a MongoDB Atlas database, exposing the data to services for querying, enrichment, and analytics.

Features

Note

This connector supports MongoDB Atlas only and will not work with a self-managed MongoDB database.

The MongoDB Atlas Sink connector provides the following features:

  • Collections: Collections can be auto-created based on topic names.

  • Database authentication: Uses password authentication.

  • Input data formats: The connector supports Avro, JSON Schema, Protobuf, JSON (schemaless), String, or BSON input data formats. Schema Registry must be enabled to use a Schema Registry-based format (for example, Avro, JSON_SR (JSON Schema), or Protobuf). See Schema Registry Enabled Environments for additional information.

  • Select configuration properties:

    • "max.num.retries": How often retries should be attempted on write errors.
    • "max.batch.size": The maximum number of sink records to batch together for processing.
    • "delete.on.null.values": Whether the connector should delete documents with matching key values when the value is null.
    • "doc.id.strategy": The strategy to generate a unique document ID (_id).
    • "write.strategy": Defines the behavior of bulk write operations made on a MongoDB collection.

    See Configuration Properties for all property values and definitions.

For more information and examples to use with the Confluent Cloud API for Connect, see the Confluent Cloud API for Connect section.

Limitations

Be sure to review the following information.

Quick Start

Use this quick start to get up and running with the Confluent Cloud MongoDB Atlas sink connector. The quick start provides the basics of selecting the connector and configuring it to consume data from Kafka and persist the data to a MongoDB database.

Note

This connector supports MongoDB Atlas only and will not work with a self-managed MongoDB database.

Prerequisites
  • Kafka cluster credentials. The following lists the different ways you can provide credentials.
    • Enter an existing service account resource ID.
    • Create a Confluent Cloud service account for the connector. Make sure to review the ACL entries required in the service account documentation. Some connectors have specific ACL requirements.
    • Create a Confluent Cloud API key and secret. To create a key and secret, you can use confluent api-key create or you can autogenerate the API key and secret directly in the Cloud Console when setting up the connector.

Adding an IP Whitelist Entry

Important

By default, MongoDB Atlas does not allow external network connections from the Internet. To allow external connections, you can add a specific IP or a CIDR IP range using the IP Whitelist entry dialog box under the Network Access menu in MongoDB.

In order for Confluent Cloud to connect to MongoDB Atlas, you need to specify the public IP address of your Confluent Cloud cluster. Add all of the Confluent Cloud egress IP addresses to the whitelist entry to your MongoDB Atlas cluster.

MongoDB IP Whitelist Entry

Using the Confluent Cloud Console

Step 1: Launch your Confluent Cloud cluster.

See the Quick Start for Confluent Cloud for installation instructions.

Step 2: Add a connector.

In the left navigation menu, click Data integration, and then click Connectors. If you already have connectors in your cluster, click + Add connector.

Step 3: Select your connector.

Click the MongoDB Atlas Sink connector card.

MongoDB Atlas Sink Connector Card

Step 4: Enter the connector details.

Note

  • Ensure you have all your prerequisites completed.
  • An asterisk ( * ) designates a required entry.

At the Add MongoDB Atlas Sink Connector screen, complete the following:

If you’ve already populated your Kafka topics, select the topic(s) you want to connect from the Topics list.

To create a new topic, click +Add new topic.

Step 5: Check MongoDB

After the connector is running, verify that messages are populating your MongoDB database.

Tip

When you launch a connector, a Dead Letter Queue topic is automatically created. See Dead Letter Queue for details.

See also

For an example that shows fully-managed Confluent Cloud connectors in action with Confluent Cloud ksqlDB, see the Cloud ETL Demo. This example also shows how to use Confluent CLI to manage your resources in Confluent Cloud.

../_images/topology.png

Using the Confluent CLI

Complete the following steps to set up and run the connector using the Confluent CLI.

Note

  • Make sure you have all your prerequisites completed.
  • The example commands use Confluent CLI version 2. For more information see, Confluent CLI v2.

Step 1: List the available connectors.

Enter the following command to list available connectors:

confluent connect plugin list

Step 2: Show the required connector configuration properties.

Enter the following command to show the required connector properties:

confluent connect plugin describe <connector-catalog-name>

For example:

confluent connect plugin describe MongoDbAtlasSink

Example output:

Following are the required configs:
connector.class: MongoDbAtlasSink
name
kafka.auth.mode
kafka.api.key
kafka.api.secret
input.data.format
connection.host
connection.user
connection.password
database
tasks.max
topics

Step 3: Create the connector configuration file.

Create a JSON file that contains the connector configuration properties. The following example shows the required connector properties.

{
    "connector.class": "MongoDbAtlasSink",
    "name": "confluent-mongodb-sink",
    "kafka.auth.mode": "KAFKA_API_KEY",
    "kafka.api.key": "<my-kafka-api-key",
    "kafka.api.secret": "<my-kafka-api-secret>",
    "input.data.format" : "JSON",
    "connection.host": "<database-host-address>",
    "connection.user": "<my-username>",
    "connection.password": "<my-password>",
    "topics": "<kafka-topic-name>",
    "max.num.retries": "3",
    "retries.defer.timeout": "5000",
    "max.batch.size": "0",
    "database": "<database-name>",
    "collection": "<collection-name>",
    "tasks.max": "1"
}

Note the following property definitions:

  • "connector.class": Identifies the connector plugin name.
  • "name": Sets a name for your new connector.
  • "kafka.auth.mode": Identifies the connector authentication mode you want to use. There are two options: SERVICE_ACCOUNT or KAFKA_API_KEY (the default). To use an API key and secret, specify the configuration properties kafka.api.key and kafka.api.secret, as shown in the example configuration (above). To use a service account, specify the Resource ID in the property kafka.service.account.id=<service-account-resource-ID>. To list the available service account resource IDs, use the following command:

    confluent iam service-account list
    

    For example:

    confluent iam service-account list
    
       Id     | Resource ID |       Name        |    Description
    +---------+-------------+-------------------+-------------------
       123456 | sa-l1r23m   | sa-1              | Service account 1
       789101 | sa-l4d56p   | sa-2              | Service account 2
    
  • "input.data.format": Sets the input Kafka record value format (data coming from the Kafka topic). Valid entries are AVRO, JSON_SR, PROTOBUF, JSON, STRING, or BSON. You must have Confluent Cloud Schema Registry configured if using a schema-based message format (for example, Avro, JSON_SR (JSON Schema), or Protobuf).

  • "connection.host": The MongoDB host. Use a hostname address and not a full URL. For example: cluster4-r5q3r7.gcp.mongodb.net.

  • "collection": The MongoDB collection name. For multiple topics, this is the default collection the topics are mapped to.

    The following are optional (with the exception of the number of tasks).

  • "max.num.retries": How often retries should be attempted on write errors. If not used, this property defaults to 3.

  • "retries.defer.timeout": How long (in milliseconds) a retry should get deferred. If not used, the default is 5000 ms.

  • "max.batch.size": The maximum number of sink records to batch together for processing. If not used, this property defaults to 0.

  • "delete.on.null.values": Whether the connector should delete documents with matching key values, when the value is null. If not used, this property defaults to false.

  • "doc.id.strategy": Sets the strategy to generate a unique document ID (_id). Enter the strategy to generate a unique document ID (_id). Valid entries are BsonOidStrategy, KafkaMetaDataStrategy, FullKeyStrategy, PartialKeyStrategy, PartialValueStrategy, ProvidedInKeyStrategy, ProvidedInValueStrategy, or UuidStrategy. To delete the document when the value is null, you must set the strategy to FullKeyStrategy, PartialKeyStrategy, or ProvidedInKeyStrategy. The default value is BsonOidStrategy. For more information, see DocumentIdAdder.

  • Depending on the selected strategy, add the appropriate Document ID strategy projection list:

    • "key.projection.type": For use with PartialKeyStrategy. Use either allowlist or blocklist to allow or block the custom key fields to be projected for ID strategy. If not used, this property defaults to none.
    • "key.projection.list": For use with PartialKeyStrategy. A comma-separated list of key fields to be projected for ID strategy.
    • "value.projection.type": For use with PartialValueStrategy. Use either allowlist or blocklist to allow or block the custom value fields to be projected for ID strategy. If not used, this property defaults to none.
    • "value.projection.list": For use with PartialValueStrategy. A comma-separated list of value fields to be projected for ID strategy.
  • "write.strategy": Sets the write model for bulk write operations. Valid entries are DefaultWriteModelStrategy, ReplaceOneDefaultStrategy, InsertOneDefaultStrategy, ReplaceOneBusinessKeyStrategy, DeleteOneDefaultStrategy, UpdateOneTimestampsStrategy, or UpdateOneBusinessKeyTimestampStrategy. If not used, this property defaults to DefaultWriteModelStrategy. For time-series collections, the DefaultWriteModelStrategy will internally default to InsertOneDefaultStrategy. For normal collections, it defaults to ReplaceOneDefaultStrategy.

  • "cdc.handler": Sets the class name of CDC handler to use for processing. You can capture CDC events with the MongoDB Kafka Sink connector and perform corresponding insert, update, and delete operations to a destination MongoDB cluster. Valid entries are None, MongoDbChangeStreamHandler, DebeziumMongoDbHandler, DebeziumMySqlHandler, DebeziumPostgresHandler, or QlikRdbmsHandler. If not used, this property defaults to None. For more information, see mongodb-sink-cdc.

  • "timeseries.timefield": Sets the name of the top-level time field that contains the date in each time-series document. Setting this property will create a time-series collection where each document will have a BSON date as the value for the time field. Time-series collections were introduced in MongoDB v5.0, which is only available for dedicated clusters in MongoDB Atlas.

  • "timeseries.timefield.auto.convert": Whether to convert the data in the time field to BSON date format. Supported formats for data include integer, long and string. If not used, this property defaults to false.

  • "timeseries.timefield.auto.convert.date.format": Sets the DateTimeFormatter format to convert the source data from. The setting expects the string representation to contain both date and time information and uses the Java DateTimeFormatter.ofPattern(pattern, locale) API for the conversion. If the string only contains date information, then the time since epoch is from the start of that day. If a string representation does not contain time-zone offset, then the setting interprets the extracted date and time as UTC. If not used, this property defaults to yyyy-MM-dd[['T'][ ]][HH:mm:ss[[.][SSSSSS][SSS]][ ]VV[ ]'['VV']'][HH:mm:ss[[.][SSSSSS][SSS]][ ]X][HH:mm:ss[[.][SSSSSS][SSS]]].

  • "timeseries.timefield.auto.convert.locale.language.tag": Sets the DateTimeFormatter locale language tag to use with the date pattern. See Language tags in HTML and XML for more information on constructing tags. If not used, this property defaults to en.

  • "timeseries.metafield": Sets the name of the top-level field that contains metadata in each time-series document. The metadata in the specified field should be data that is used to label a unique series of documents. The field can be of any type except array.

  • "timeseries.expire.after.seconds": Sets the number of seconds after which the document expires. MongoDB deletes expired documents automatically. If not used, this property default to 0, which means data will not be deleted automatically.

  • "ts.granularity": Sets the interval granularity for subsequent measurements for a time-series. Valid entries are None, seconds, minutes, or hours. If not used, this property defaults to None. For normal collections, None is the only applicable value. For time-series collections, all entries are applicable and None internally defaults to seconds.

  • Enter the number of tasks for the connector. Refer to Confluent Cloud connector limitations for additional information.

Single Message Transforms: See the Single Message Transforms (SMT) documentation for details about adding SMTs using the CLI.

See Configuration Properties for all property values and definitions.

Step 4: Load the properties file and create the connector.

Enter the following command to load the configuration and start the connector:

confluent connect create --config <file-name>.json

For example:

confluent connect create --config mongo-db-sink.json

Example output:

Created connector confluent-mongodb-sink lcc-ix4dl

Step 5: Check the connector status.

Enter the following command to check the connector status:

confluent connect list

Example output:

ID          |            Name         | Status  | Type
+-----------+-------------------------+---------+------+
lcc-ix4dl   | confluent-mongodb-sink  | RUNNING | sink

Step 6: Check MongoDB

After the connector is running, verify that records are populating your MongoDB database.

Tip

When you launch a connector, a Dead Letter Queue topic is automatically created. See Dead Letter Queue for details.

Configuration Properties

Use the following configuration properties with this connector.

Note

These are properties for the managed cloud connector. If you are installing the connector locally for Confluent Platform, see the MongoDB Kafka Connector documentation.

How should we connect to your data?

name

Sets a name for your connector.

  • Type: string
  • Valid Values: A string at most 64 characters long
  • Importance: high

Input messages

input.data.format

Sets the input Kafka record value format. Valid entries are AVRO, JSON_SR, PROTOBUF, JSON, STRING or BSON. Note that you need to have Confluent Cloud Schema Registry configured if using a schema-based message format like AVRO, JSON_SR, and PROTOBUF.

  • Type: string
  • Importance: high
cdc.handler

The class name of the CDC handler to use for processing. You can capture CDC events with the MongoDB Kafka sink connector and perform corresponding insert, update, and delete operations to a destination MongoDB cluster.

  • Type: string
  • Default: None
  • Importance: low

Writes

delete.on.null.values

Whether or not the connector should try to delete documents based on key when value is null.

  • Type: boolean
  • Default: false
  • Importance: low
max.batch.size

The maximum number of sink records to possibly batch together for processing.

  • Type: int
  • Default: 0
  • Valid Values: [0,…]
  • Importance: low
bulk.write.ordered

Whether the batches controlled by ‘max.batch.size’ must be written via ordered bulk writes.

  • Type: boolean
  • Default: true
  • Importance: low
rate.limiting.timeout

How long in ms processing should wait before continuing after triggering a rate limit.

  • Type: int
  • Default: 0
  • Importance: low
rate.limiting.every.n

The number of processed batches that will trigger rate limiting. The default value of 0 sets no rate limiting.

  • Type: int
  • Default: 0
  • Importance: low
write.strategy

The class that specifies the WriteModel to use for bulk writes.

  • Type: string
  • Default: DefaultWriteModelStrategy
  • Importance: low

Kafka Cluster credentials

kafka.auth.mode

Kafka Authentication mode. It can be one of KAFKA_API_KEY or SERVICE_ACCOUNT. It defaults to KAFKA_API_KEY mode.

  • Type: string
  • Default: KAFKA_API_KEY
  • Valid Values: KAFKA_API_KEY, SERVICE_ACCOUNT
  • Importance: high
kafka.api.key
  • Type: password
  • Importance: high
kafka.service.account.id

The Service Account that will be used to generate the API keys to communicate with Kafka Cluster.

  • Type: string
  • Importance: high
kafka.api.secret
  • Type: password
  • Importance: high

Which topics do you want to get data from?

topics

Identifies the topic name or a comma-separated list of topic names.

  • Type: list
  • Importance: high

How should we connect to your MongoDB Atlas database?

connection.host

MongoDB Atlas connection host (e.g. confluent-test.mycluster.mongodb.net).

  • Type: string
  • Importance: high
connection.user

MongoDB Atlas connection user.

  • Type: string
  • Importance: high
connection.password

MongoDB Atlas connection password.

  • Type: password
  • Importance: high
database

MongoDB Atlas database name.

  • Type: string
  • Importance: high

Database details

collection

Collection name to write to. If the connector is sinking data from multiple topics, this is the default collection the topics are mapped to.

  • Type: string
  • Importance: medium

ID strategies

doc.id.strategy

The IdStrategy class name to use for generating a unique document id (_id).

  • Type: string
  • Default: BsonOidStrategy
  • Importance: low
doc.id.strategy.overwrite.existing

Whether the connector should overwrite existing values in the _id field when the strategy defined in doc.id.strategy is applied.

  • Type: boolean
  • Default: false
  • Importance: low
document.id.strategy.uuid.format

The bson output format when using the UuidStrategy. Can be either String or Binary.

  • Type: string
  • Default: string
  • Importance: low
key.projection.type

For use with the PartialKeyStrategy allows custom key fields to be projected for the ID strategy. Use either AllowList or BlockList.

  • Type: string
  • Default: none
  • Importance: low
key.projection.list

For use with the PartialKeyStrategy allows custom key fields to be projected for the ID strategy. A comma-separated list of field names for key projection.

  • Type: string
  • Importance: low
value.projection.type

For use with the PartialValueStrategy allows custom value fields to be projected for the ID strategy. Use either AllowList or BlockList.

  • Type: string
  • Default: none
  • Importance: low
value.projection.list

For use with the PartialValueStrategy allows custom value fields to be projected for the ID strategy. A comma-separated list of field names for value projection.

  • Type: string
  • Importance: low

Namespace mapping

namespace.mapper.class

The class that determines the namespace to write the sink data to. By default this will be based on the ‘database’ configuration and either the topic name or the ‘collection’ configuration.

  • Type: string
  • Default: DefaultNamespaceMapper
  • Importance: low
namespace.mapper.key.database.field

The key field to use as the destination database name.

  • Type: string
  • Importance: low
namespace.mapper.key.collection.field

The key field to use as the destination collection name.

  • Type: string
  • Importance: low
namespace.mapper.value.database.field

The value field to use as the destination database name.

  • Type: string
  • Importance: low
namespace.mapper.value.collection.field

The value field to use as the destination collection name.

  • Type: string
  • Importance: low
namespace.mapper.error.if.invalid

Whether to throw an error if the mapped field is missing or invalid. Defaults to false.

  • Type: boolean
  • Default: false
  • Importance: low

Server API

server.api.version

The server API version to use. Disabled by default.

  • Type: string
  • Importance: low
server.api.deprecation.errors

Sets whether the connector requires use of deprecated server APIs to be reported as errors.

  • Type: boolean
  • Default: false
  • Importance: low
server.api.strict

Sets whether the application requires strict server API version enforcement.

  • Type: boolean
  • Default: false
  • Importance: low

Connection details

max.num.retries

How many retries should be attempted on write errors.

  • Type: int
  • Default: 3
  • Valid Values: [0,…]
  • Importance: low
retries.defer.timeout

How long a retry should get deferred.

  • Type: int
  • Default: 5000
  • Valid Values: [0,…]
  • Importance: low

Time Series configuration

timeseries.timefield

The name of the top-level field which contains the date in each time series document. Setting this config will create a time series collection where each document will have a BSON date as the value for the timefield.

  • Type: string
  • Default: “”
  • Importance: low
timeseries.timefield.auto.convert

Whether to convert the data in the field into a BSON Date format. Supported formats include integer, long, and string.

  • Type: boolean
  • Default: false
  • Importance: low
timeseries.timefield.auto.convert.date.format

The string pattern to convert the source data from. The setting expects the string representation to contain both date and time information and uses the Java DateTimeFormatter.ofPattern(pattern, locale) API for the conversion. If the string only contains date information, then the time since epoch is from the start of that day. If a string representation does not contain time-zone offset, then the setting interprets the extracted date and time as UTC.

  • Type: string
  • Default: yyyy-MM-dd[[‘T’][ ]][HH:mm:ss[[.][SSSSSS][SSS]][ ]VV[ ]’[‘VV’]’][HH:mm:ss[[.][SSSSSS][SSS]][ ]X][HH:mm:ss[[.][SSSSSS][SSS]]]
  • Importance: low
timeseries.timefield.auto.convert.locale.language.tag

The DateTimeFormatter locale language tag to use with the date pattern.

  • Type: string
  • Default: en
  • Importance: low
timeseries.metafield

The name of the top-level field that contains metadata in each time series document. This field groups related data. It can be of any type except array.

  • Type: string
  • Default: “”
  • Importance: low
timeseries.expire.after.seconds

The amount of seconds the data remains in MongoDB before MongoDB deletes it. Omitting this field means data will not be deleted automatically.

  • Type: int
  • Default: 0
  • Valid Values: [0,…]
  • Importance: low
ts.granularity

The expected interval between subsequent measurements for a time-series. Set this to None or leave it empty if the data is not time-series

  • Type: string
  • Default: None
  • Importance: low

Number of tasks for this connector

tasks.max
  • Type: int
  • Valid Values: [1,…]
  • Importance: high

Next Steps

See also

For an example that shows fully-managed Confluent Cloud connectors in action with Confluent Cloud ksqlDB, see the Cloud ETL Demo. This example also shows how to use Confluent CLI to manage your resources in Confluent Cloud.

../_images/topology.png