.. _mongodb-source-connector: Debezium MongoDB Source Connector --------------------------------- `Debezium’s MongoDB Connector`_ can monitor a MongoDB replica set or a MongoDB sharded cluster for document changes in databases and collections, recording those changes as events in |ak-tm| topics. The connector automatically handles the addition or removal of shards in a sharded cluster, changes in membership of each replica set, elections within each replica set, and dynamically adjusts when communication issues occur. The Debezium MongoDB connector uses MongoDB's `oplog`_ to capture changes. Since it makes use of MongoDB's `replication mechanism`_, the connector works only with MongoDB replica sets or sharded clusters. The Debezium MongoDB connector is not capable of monitoring the changes of a standalone MongoDB server, since standalone servers do not have an oplog. The connector will work if the standalone server is converted to a replica set with one member. * Confluent supports MongoDB connector version 0.9.3 and later. * Confluent supports using this connector with MongoDB 3.4 or later. Install the MongoDB Connector ============================= .. include:: ../includes/connector-install.rst .. codewithvars:: bash confluent-hub install debezium/debezium-connector-mongodb:latest .. include:: ../includes/connector-install-version.rst .. codewithvars:: bash confluent-hub install debezium/debezium-connector-mongodb:0.9.4 License ======= The Debezium MongoDB connector is an open source connector and does not require a Confluent Enterprise License. Configure a Replication Mechanism on MongoDB ============================================ Initiate a Replica Set ^^^^^^^^^^^^^^^^^^^^^^ The Debezium MongoDB connector can capture changes from a single MongoDB replica set. In production replica sets it is recommended to have at least three members, the connector can work with fewer members, it doesn’t care how many members are in the replica set. To use the Debezium MongoDB connector with a replica set, simply provide the addresses of one or more replica set servers as seed addresses via the connector’s mongodb.hosts property. The connector will use these seeds to connect to the replica set, and then once connected will get from the replica set the complete set of members and which member is primary. The connector will start a task to connect to the primary and capture the changes from the primary’s oplog. When the replica set elects a new primary, the task will automatically switch over to the new primary. With at least one MongoDB server running, start a container with this image to verify if the replica set is initiated. If it is not initiated, initiate the replica set and add all of the servers to it. Start the container with the name of the replica set in the ``$REPLICASET`` environment variable, using links named like ``MONGO_n`` (where n=1,2,3, etc.) for each of the MongoDB servers that are to be in the replica set. .. codewithvars:: bash docker run -it --name mongo-init --rm -e REPLICASET=rs0 --link data1:mongo1 --link data2:mongo2 --link data3:mongo3 debezium/mongo-initiator Initiate a Sharded Cluster ^^^^^^^^^^^^^^^^^^^^^^^^^^ A MongoDB sharded cluster consists of: * a separate replica set that acts as the cluster’s configuration server * one or more shards, each deployed as a replica set * one or more routers (mongos) to which clients connect and that routes requests to the appropriate shards To use the Debezium MongoDB connector with a sharded cluster: * Configure the connector with the host addresses of the configuration server replica set. The container can be added to the replica set as a shard to one or more MongoDB routers. For example, consider three MongoDB servers running in containers ``shardA1``, ``shardA2``, and ``shardA3``, and two MongoDB routers running in containers ``router1`` and ``router2``. The following command will ensure that ``shardA1``, ``shardA2``, and ``shardA3`` are properly initiated as replica set shardA, and that the ``shardA`` replica set is added as a shard to the routers router1 and router2. .. codewithvars:: bash docker run -it --name mongo-init --rm -e REPLICASET=shardA --link shardA1:mongo1 --link shardA2:mongo2 --link shardA3:mongo3 --link router1 --link router2 debezium/mongo-initiator * More shard replica sets can be added by running more containers. For example: .. codewithvars:: bash docker run -it --name mongo-init --rm -e REPLICASET=shardB --link shardB1:mongo1 --link shardB2:mongo2 When the connector connects to this replica set, it acts as the configuration server for a sharded cluster and discovers the information about each replica set used as a shard in the cluster The connector starts up a separate task to capture the changes from each replica set. If new shards are added to the cluster or existing shards removed, the connector will automatically adjust its tasks accordingly. .. _mongodb-source-connector-quickstart: Quick Start =========== `Debezium’s MongoDB Connector`_ can monitor a MongoDB replica set or a MongoDB sharded cluster for document changes in databases and collections, recording those changes as events in |ak| topics. Install the Connector ^^^^^^^^^^^^^^^^^^^^^ Refer to the `Debezium tutorial`_ if you want to use Docker images to set up |ak|, |zk| and |kconnect|. For the following tutorial, you need to have a local |cp| installation. Navigate to your |cp| installation directory and enter the following command to install the connector: .. codewithvars:: bash confluent-hub install debezium/debezium-connector-mongodb:0.9.4 Adding a new connector plugin requires restarting Connect. Use the Confluent CLI to restart Connect. .. codewithvars:: bash |confluent_stop| connect && |confluent_start| connect Using CONFLUENT_CURRENT: /Users/username/Sandbox/confluent-snapshots/var/confluent.NuZHxXfq Starting zookeeper zookeeper is [UP] Starting kafka kafka is [UP] Starting schema-registry schema-registry is [UP] Starting kafka-rest kafka-rest is [UP] Starting connect connect is [UP] Check if the MongoDB plugin has been installed correctly and picked up by the plugin loader. .. codewithvars:: bash curl -sS localhost:8083/connector-plugins | jq '.[].class' | grep mongodb "io.debezium.connector.mongodb.MongoDbConnector" Set up MongoDB using Docker (Optional) ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^ If you do not have a native installation of MongoDB, you may use the following command to launch MongoDB using a Docker image. .. codewithvars:: bash # Configure the MongoDB data directory mkdir -p path/to/project/data/db # Pull the Docker image docker pull mongo # Run the container, where `mongodb` is the name assigned to the conatiner docker run --name mongodb -v $(pwd)/data/db:/data/db -p 27017:27017 -d mongo --replSet debezium # Start a new bash process in running container docker exec -it mongodb bash # Start the mongo process mongo # Initialize MongoDB replica set docker exec -it mongodb mongo --eval 'rs.initiate({_id: "debezium", members:[{_id: 0, host: "localhost:27017"}]})' # Create a user profile use admin db.createUser( { user: "debezium", pwd: "dbz", roles: ["dbOwner"] } ) # Insert a record use inventory db.customers.insert([ { _id : 1005, first_name : 'Bob', last_name : 'Hopper', email : 'thebob@example.com' } ]); # View records db.customers.find().pretty(); Start the Debezium MongoDB connector ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^ Create the file ``register-mongodb.json`` to store the following connector configuration: :: { "name": "inventory-connector", "config": { "connector.class" : "io.debezium.connector.mongodb.MongoDbConnector", "tasks.max" : "1", "mongodb.hosts" : "debezium/localhost:27017", "mongodb.name" : "dbserver1", "mongodb.user" : "debezium", "mongodb.password" : "dbz", "database.history.kafka.bootstrap.servers" : "localhost:9092" } } Start the connector. :: # Start MongoDB connector curl -i -X POST -H "Accept:application/json" -H "Content-Type:application/json" http://localhost:8083/connectors/ -d @register-mongodb.json Start your |ak| consumer ^^^^^^^^^^^^^^^^^^^^^^^^ Start the consumer in a new terminal session. .. codewithvars:: bash |confluent_consume| dbserver1.inventory.customers|dash| --from-beginning Start MongoDB bash and enter queries to insert or modify records in the database. When you enter queries in your MongoDB bash to add or modify records in the database, messages are displayed on your consumer terminal to reflect those records. .. codewithvars:: bash db.customers.insert([ { _id : 1008, first_name : 'Jim', last_name : 'Colbert', email : 'thejim@example.com' } ]); Clean up resources ^^^^^^^^^^^^^^^^^^ Delete the connector and stop Confluent services. .. codewithvars:: bash curl -X DELETE localhost:8083/connectors/inventory-connector |confluent_stop| Stop the MongoDB container. .. codewithvars:: bash docker stop mongodb .. note:: Portions of the information provided here derives from documentation originally produced by the `Debezium Community <https://debezium.io/>`_. Work produced by Debezium is licensed under `Creative Commons 3.0 <https://creativecommons.org/licenses/by/3.0/>`_. Additional Documentation ======================== .. toctree:: :maxdepth: 1 :titlesonly: mongodb_source_connector_config .. _Debezium’s MongoDB Connector: https://debezium.io/docs/connectors/mongodb/ .. _Debezium tutorial: https://github.com/debezium/debezium-examples/tree/master/tutorial#using-mongodb .. _Debezium’s MongoDB Connector: https://debezium.io/docs/connectors/mongodb/ .. _replication mechanism: https://docs.mongodb.com/manual/replication/ .. _oplog: https://docs.mongodb.com/manual/core/replica-set-oplog/