Debezium MongoDB Source Connector for Confluent Platform

Debezium’s MongoDB connector can monitor a MongoDB replica set or a MongoDB sharded cluster for document changes in databases and collections, recording those changes as events in Apache Kafka® topics. The connector automatically handles the addition or removal of shards in a sharded cluster, changes in membership of each replica set, elections within each replica set, and dynamically adjusts when communication issues occur.

The Debezium MongoDB connector uses MongoDB’s oplog to capture changes. Since it makes use of MongoDB’s replication mechanism, the connector works only with MongoDB replica sets or sharded clusters.

The Debezium MongoDB connector is not capable of monitoring the changes of a standalone MongoDB server, since standalone servers do not have an oplog. The connector will work if the standalone server is converted to a replica set with one member.

  • Confluent supports MongoDB connector version 0.9.3 and later.
  • Confluent supports using this connector with MongoDB 3.4 or later.

Features

The Debezium MongoDB Source connector includes the following features:

At least once delivery

This connector guarantees that records are delivered at least once to the Kafka topic. If the connector restarts, there may be some duplicate records in the Kafka topic.

Supports one task

The Debezium MongoDB Source Connector supports running only one task.

Automatic topic creation

The connector automatically creates the internal database history Kafka topic if it doesn’t exist.

Install the MongoDB Connector

You can install this connector by using the confluent connect plugin install command, or by manually downloading the ZIP file.

Prerequisites

  • You must install the connector on every machine where Connect will run.

  • An install of the latest (latest) connector version.

    To install the latest connector version, navigate to your Confluent Platform installation directory and run the following command:

    confluent connect plugin install debezium/debezium-connector-mongodb:latest
    

    You can install a specific version by replacing latest with a version number as shown in the following example:

    confluent connect plugin install debezium/debezium-connector-mongodb:<version-number>
    
  • The Debezium MongoDB Source connector has specific ACL requirements. See the ACL requirements for Debezium Source connectors to ensure you meet the specified requirements.

Install the connector manually

Download and extract the ZIP file for your connector and then follow the manual connector installation instructions.

Configuration Properties

For a complete list of configuration properties for this connector, see Configuration Reference for Debezium MongoDB Source Connector for Confluent Platform.

License

The Debezium MongoDB connector is an open source connector and does not require a Confluent Enterprise License.

Configure a Replication Mechanism on MongoDB

Initiate a replica set

The Debezium MongoDB connector can capture changes from a single MongoDB replica set. In production replica sets it is recommended to have at least three members, the connector can work with fewer members, it doesn’t care how many members are in the replica set.

To use the Debezium MongoDB connector with a replica set, configure the mongodb.hosts property using the addresses of one or more replica set servers as seed addresses. The connector uses these seeds to connect to the replica set. After the connector connects to the replica set, it retrieves the complete set of members–including which member is primary–from the replica set. The connector then starts a task to connect to the primary member and captures the changes from the primary member’s oplog. When the replica set elects a new primary member, the task switches over to the new primary member.

With at least one MongoDB server running, start a container with this image to verify if the replica set is initiated. If it is not initiated, initiate the replica set and add all of the servers to it.

Start the container with the name of the replica set in the $REPLICASET environment variable, using links named like MONGO_n (where n=1,2,3, etc.) for each of the MongoDB servers that are to be in the replica set.

docker run -it --name mongo-init --rm -e REPLICASET=rs0 --link data1:mongo1 --link data2:mongo2 --link data3:mongo3 debezium/mongo-initiator

Initiate a sharded cluster

A MongoDB sharded cluster consists of:

  • a separate replica set that acts as the cluster’s configuration server
  • one or more shards, each deployed as a replica set
  • one or more routers (mongos) to which clients connect and that routes requests to the appropriate shards

To use the Debezium MongoDB connector with a sharded cluster:

  • Configure the connector with the host addresses of the configuration server replica set.

The container can be added to the replica set as a shard to one or more MongoDB routers. For example, consider three MongoDB servers running in containers shardA1, shardA2, and shardA3, and two MongoDB routers running in containers router1 and router2. The following command will ensure that shardA1, shardA2, and shardA3 are properly initiated as replica set shardA, and that the shardA replica set is added as a shard to the routers router1 and router2.

docker run -it --name mongo-init --rm -e REPLICASET=shardA --link shardA1:mongo1 --link shardA2:mongo2 --link shardA3:mongo3 --link router1 --link router2 debezium/mongo-initiator
  • More shard replica sets can be added by running more containers. For example:
docker run -it --name mongo-init --rm -e REPLICASET=shardB --link shardB1:mongo1 --link shardB2:mongo2

When the connector connects to this replica set, it acts as the configuration server for a sharded cluster and discovers the information about each replica set used as a shard in the cluster

The connector starts up a separate task to capture the changes from each replica set. If new shards are added to the cluster or existing shards removed, the connector will automatically adjust its tasks accordingly.

Quick Start

Debezium’s MongoDB connector can monitor a MongoDB replica set or a MongoDB sharded cluster for document changes in databases and collections, recording those changes as events in Kafka topics.

For an example of how to get Kafka Connect connected to Confluent Cloud, see Connect Self-Managed Kafka Connect to Confluent Cloud.

Install the connector

Refer to the Debezium tutorial if you want to use Docker images to set up Kafka, ZooKeeper and Connect. For the following tutorial, you need to have a local Confluent Platform installation. Note that as of Confluent Platform 7.5, ZooKeeper is deprecated for new deployments. Confluent recommends KRaft mode for new deployments.

confluent connect plugin install debezium/debezium-connector-mongodb:0.9.4

Adding a new connector plugin requires restarting Connect. Use the Confluent CLI to restart Connect.

confluent local services connect stop && confluent local services connect start
Using CONFLUENT_CURRENT: /Users/username/Sandbox/confluent-snapshots/var/confluent.NuZHxXfq
Starting Zookeeper
Zookeeper is [UP]
Starting Kafka
Kafka is [UP]
Starting Schema Registry
Schema Registry is [UP]
Starting Kafka REST
Kafka REST is [UP]
Starting Connect
Connect is [UP]

Check if the MongoDB plugin has been installed correctly and picked up by the plugin loader.

curl -sS localhost:8083/connector-plugins | jq '.[].class' | grep mongodb
"io.debezium.connector.mongodb.MongoDbConnector"

Set up MongoDB using Docker (optional)

If you do not have a native installation of MongoDB, you may use the following command to launch MongoDB using a Docker image.

# Configure the MongoDB data directory
mkdir -p path/to/project/data/db

# Pull the Docker image
docker pull mongo

# Run the container, where ``mongodb`` is the name assigned to the container
docker run --name mongodb -v $(pwd)/data/db:/data/db -p 27017:27017 -d mongo  --replSet debezium

# Start a new bash process in running container
docker exec -it mongodb bash

# Start the mongo process
mongo

# Initialize MongoDB replica set

docker exec -it mongodb mongo --eval 'rs.initiate({_id: "debezium", members:[{_id: 0, host: "localhost:27017"}]})'

# Create a user profile
use admin
db.createUser(
{
user: "debezium",
pwd: "dbz",
roles: ["dbOwner"]
}
)

# Insert a record
use inventory
db.customers.insert([
{ _id : 1005, first_name : 'Bob', last_name : 'Hopper', email : 'thebob@example.com' }
]);

# View records
db.customers.find().pretty();

Start the Debezium MongoDB connector

Create the file register-mongodb.json to store the following connector configuration:

{
 "name": "inventory-connector",
 "config": {
     "connector.class" : "io.debezium.connector.mongodb.MongoDbConnector",
     "tasks.max" : "1",
     "mongodb.hosts" : "debezium/localhost:27017",
     "mongodb.name" : "dbserver1",
     "mongodb.user" : "debezium",
     "mongodb.password" : "dbz",
     }
 }

Start the connector.

# Start MongoDB connector
curl -i -X POST -H "Accept:application/json" -H  "Content-Type:application/json" http://localhost:8083/connectors/ -d @register-mongodb.json

Start your Kafka consumer

Start the consumer in a new terminal session.

confluent local services kafka consume dbserver1.inventory.customers --from-beginning

Start MongoDB bash and enter queries to insert or modify records in the database. When you enter queries in your MongoDB bash to add or modify records in the database, messages are displayed on your consumer terminal to reflect those records.

db.customers.insert([
 { _id : 1008, first_name : 'Jim', last_name : 'Colbert', email : 'thejim@example.com' }
 ]);

Clean up resources

Delete the connector and stop Confluent services.

curl -X DELETE localhost:8083/connectors/inventory-connector
confluent local stop

Stop the MongoDB container.

docker stop mongodb

Note

Portions of the information provided here derives from documentation originally produced by the Debezium Community. Work produced by Debezium is licensed under Creative Commons 3.0.