Important

You are viewing documentation for an older version of Confluent Platform. For the latest, click here.

Debezium MongoDB Source Connector

Debezium’s MongoDB Connector can monitor a MongoDB replica set or a MongoDB sharded cluster for document changes in databases and collections, recording those changes as events in Apache Kafka® topics. The connector automatically handles the addition or removal of shards in a sharded cluster, changes in membership of each replica set, elections within each replica set, and dynamically adjusts when communication issues occur.

The Debezium MongoDB connector uses MongoDB’s oplog to capture changes. Since it makes use of MongoDB’s replication mechanism, the connector works only with MongoDB replica sets or sharded clusters.

The Debezium MongoDB connector is not capable of monitoring the changes of a standalone MongoDB server, since standalone servers do not have an oplog. The connector will work if the standalone server is converted to a replica set with one member.

  • Confluent supports MongoDB connector version 0.9.3 and later.
  • Confluent supports using this connector with MongoDB 3.4 or later.

Install the MongoDB Connector

You can install this connector by using the Confluent Hub client (recommended) or you can manually download the ZIP file.

confluent-hub install debezium/debezium-connector-mongodb:latest

You can install a specific version by replacing latest with a version number. For example:

confluent-hub install debezium/debezium-connector-mongodb:0.9.4

License

The Debezium MongoDB connector is an open source connector and does not require a Confluent Enterprise License.

Configure a Replication Mechanism on MongoDB

Initiate a Replica Set

The Debezium MongoDB connector can capture changes from a single MongoDB replica set. In production replica sets it is recommended to have at least three members, the connector can work with fewer members, it doesn’t care how many members are in the replica set.

To use the Debezium MongoDB connector with a replica set, simply provide the addresses of one or more replica set servers as seed addresses via the connector’s mongodb.hosts property. The connector will use these seeds to connect to the replica set, and then once connected will get from the replica set the complete set of members and which member is primary. The connector will start a task to connect to the primary and capture the changes from the primary’s oplog. When the replica set elects a new primary, the task will automatically switch over to the new primary.

With at least one MongoDB server running, start a container with this image to verify if the replica set is initiated. If it is not initiated, initiate the replica set and add all of the servers to it.

Start the container with the name of the replica set in the $REPLICASET environment variable, using links named like MONGO_n (where n=1,2,3, etc.) for each of the MongoDB servers that are to be in the replica set.

docker run -it --name mongo-init --rm -e REPLICASET=rs0 --link data1:mongo1 --link data2:mongo2 --link data3:mongo3 debezium/mongo-initiator

Initiate a Sharded Cluster

A MongoDB sharded cluster consists of:

  • a separate replica set that acts as the cluster’s configuration server
  • one or more shards, each deployed as a replica set
  • one or more routers (mongos) to which clients connect and that routes requests to the appropriate shards

To use the Debezium MongoDB connector with a sharded cluster:

  • Configure the connector with the host addresses of the configuration server replica set.

The container can be added to the replica set as a shard to one or more MongoDB routers. For example, consider three MongoDB servers running in containers shardA1, shardA2, and shardA3, and two MongoDB routers running in containers router1 and router2. The following command will ensure that shardA1, shardA2, and shardA3 are properly initiated as replica set shardA, and that the shardA replica set is added as a shard to the routers router1 and router2.

docker run -it --name mongo-init --rm -e REPLICASET=shardA --link shardA1:mongo1 --link shardA2:mongo2 --link shardA3:mongo3 --link router1 --link router2 debezium/mongo-initiator
  • More shard replica sets can be added by running more containers. For example:
docker run -it --name mongo-init --rm -e REPLICASET=shardB --link shardB1:mongo1 --link shardB2:mongo2

When the connector connects to this replica set, it acts as the configuration server for a sharded cluster and discovers the information about each replica set used as a shard in the cluster

The connector starts up a separate task to capture the changes from each replica set. If new shards are added to the cluster or existing shards removed, the connector will automatically adjust its tasks accordingly.

Quick Start

Debezium’s MongoDB Connector can monitor a MongoDB replica set or a MongoDB sharded cluster for document changes in databases and collections, recording those changes as events in Kafka topics.

Install the Connector

Refer to the Debezium tutorial if you want to use Docker images to set up Kafka, ZooKeeper and Connect. For the following tutorial, you need to have a local Confluent Platform installation.

Navigate to your Confluent Platform installation directory and enter the following command to install the connector:

confluent-hub install debezium/debezium-connector-mongodb:0.9.4

Adding a new connector plugin requires restarting Connect. Use the Confluent CLI to restart Connect.

confluent stop connect && confluent start connect
Using CONFLUENT_CURRENT: /Users/username/Sandbox/confluent-snapshots/var/confluent.NuZHxXfq
Starting zookeeper
zookeeper is [UP]
Starting kafka
kafka is [UP]
Starting schema-registry
schema-registry is [UP]
Starting kafka-rest
kafka-rest is [UP]
Starting connect
connect is [UP]

Check if the MongoDB plugin has been installed correctly and picked up by the plugin loader.

curl -sS localhost:8083/connector-plugins | jq '.[].class' | grep mongodb
"io.debezium.connector.mongodb.MongoDbConnector"

Set up MongoDB using Docker (Optional)

If you do not have a native installation of MongoDB, you may use the following command to launch MongoDB using a Docker image.

# Configure the MongoDB data directory
mkdir -p path/to/project/data/db

# Pull the Docker image
docker pull mongo

# Run the container, where `mongodb` is the name assigned to the conatiner
docker run --name mongodb -v $(pwd)/data/db:/data/db -p 27017:27017 -d mongo  --replSet debezium

# Start a new bash process in running container
docker exec -it mongodb bash

# Start the mongo process
mongo

# Initialize MongoDB replica set

docker exec -it mongodb mongo --eval 'rs.initiate({_id: "debezium", members:[{_id: 0, host: "localhost:27017"}]})'

# Create a user profile
use admin
db.createUser(
{
user: "debezium",
pwd: "dbz",
roles: ["dbOwner"]
}
)

# Insert a record
use inventory
db.customers.insert([
{ _id : 1005, first_name : 'Bob', last_name : 'Hopper', email : 'thebob@example.com' }
]);

# View records
db.customers.find().pretty();

Start the Debezium MongoDB connector

Create the file register-mongodb.json to store the following connector configuration:

{
 "name": "inventory-connector",
 "config": {
     "connector.class" : "io.debezium.connector.mongodb.MongoDbConnector",
     "tasks.max" : "1",
     "mongodb.hosts" : "debezium/localhost:27017",
     "mongodb.name" : "dbserver1",
     "mongodb.user" : "debezium",
     "mongodb.password" : "dbz",
     "database.history.kafka.bootstrap.servers" : "localhost:9092"
     }
 }

Start the connector.

# Start MongoDB connector
curl -i -X POST -H "Accept:application/json" -H  "Content-Type:application/json" http://localhost:8083/connectors/ -d @register-mongodb.json

Start your Kafka consumer

Start the consumer in a new terminal session.

confluent consume dbserver1.inventory.customers​ --from-beginning

Start MongoDB bash and enter queries to insert or modify records in the database. When you enter queries in your MongoDB bash to add or modify records in the database, messages are displayed on your consumer terminal to reflect those records.

db.customers.insert([
 { _id : 1008, first_name : 'Jim', last_name : 'Colbert', email : 'thejim@example.com' }
 ]);

Clean up resources

Delete the connector and stop Confluent services.

curl -X DELETE localhost:8083/connectors/inventory-connector
confluent stop

Stop the MongoDB container.

docker stop mongodb

Note

Portions of the information provided here derives from documentation originally produced by the Debezium Community. Work produced by Debezium is licensed under Creative Commons 3.0.

Additional Documentation