.. _mongodb-source-connector:

Debezium MongoDB Source Connector
---------------------------------

`Debezium’s MongoDB Connector`_ can monitor a MongoDB replica set or a MongoDB sharded cluster for document changes in databases and collections, recording those changes as events in |ak-tm| topics. The connector automatically handles the addition or removal of shards in a sharded cluster, changes in membership of each replica set, elections within each replica set, and dynamically adjusts when communication issues occur.

The Debezium MongoDB connector uses MongoDB's `oplog`_ to capture changes. Since it makes use of MongoDB's `replication mechanism`_, the connector works only with MongoDB replica sets or sharded clusters. 

The Debezium MongoDB connector is not capable of monitoring the changes of a standalone MongoDB server, since standalone servers do not have an oplog. The connector will work if the standalone server is converted to a replica set with one member.

* Confluent supports MongoDB connector version 0.9.3 and later. 
* Confluent supports using this connector with MongoDB 3.4 or later. 

Install the MongoDB Connector
=============================

.. include:: ../includes/connector-install.rst

.. codewithvars:: bash

   confluent-hub install debezium/debezium-connector-mongodb:latest

.. include:: ../includes/connector-install-version.rst

.. codewithvars:: bash

   confluent-hub install debezium/debezium-connector-mongodb:0.9.4
   
License
=======

The Debezium MongoDB connector is an open source connector and does not require a Confluent Enterprise License.


Configure a Replication Mechanism on MongoDB
============================================

Initiate a Replica Set
^^^^^^^^^^^^^^^^^^^^^^

The Debezium MongoDB connector can capture changes from a single MongoDB replica set. In production replica sets it is recommended to have at least three members, the connector can work with fewer members, it doesn’t care how many members are in the replica set.

To use the Debezium MongoDB connector with a replica set, simply provide the addresses of one or more replica set servers as seed addresses via the connector’s mongodb.hosts property. The connector will use these seeds to connect to the replica set, and then once connected will get from the replica set the complete set of members and which member is primary. The connector will start a task to connect to the primary and capture the changes from the primary’s oplog. When the replica set elects a new primary, the task will automatically switch over to the new primary.

With at least one MongoDB server running, start a container with this image to verify if the replica set is initiated. If it is not initiated, initiate the replica set and add all of the servers to it. 

Start the container with the name of the replica set in the ``$REPLICASET`` environment variable, using links named like ``MONGO_n`` (where n=1,2,3, etc.) for each of the MongoDB servers that are to be in the replica set.

.. codewithvars:: bash

   docker run -it --name mongo-init --rm -e REPLICASET=rs0 --link data1:mongo1 --link data2:mongo2 --link data3:mongo3 debezium/mongo-initiator

Initiate a Sharded Cluster
^^^^^^^^^^^^^^^^^^^^^^^^^^

A MongoDB sharded cluster consists of:

* a separate replica set that acts as the cluster’s configuration server
* one or more shards, each deployed as a replica set
* one or more routers (mongos) to which clients connect and that routes requests to the appropriate shards

To use the Debezium MongoDB connector with a sharded cluster:

* Configure the connector with the host addresses of the configuration server replica set. 

The container can be added to the replica set as a shard to one or more MongoDB routers. For example, consider three MongoDB servers running in containers ``shardA1``, ``shardA2``, and ``shardA3``, and two MongoDB routers running in containers ``router1`` and ``router2``. The following command will ensure that ``shardA1``, ``shardA2``, and ``shardA3`` are properly initiated as replica set shardA, and that the ``shardA`` replica set is added as a shard to the routers router1 and router2.

.. codewithvars:: bash

   docker run -it --name mongo-init --rm -e REPLICASET=shardA --link shardA1:mongo1 --link shardA2:mongo2 --link shardA3:mongo3 --link router1 --link router2 debezium/mongo-initiator

* More shard replica sets can be added by running more containers. For example:

.. codewithvars:: bash

   docker run -it --name mongo-init --rm -e REPLICASET=shardB --link shardB1:mongo1 --link shardB2:mongo2 

When the connector connects to this replica set, it acts as the configuration server for a sharded cluster and discovers the information about each replica set used as a shard in the cluster

The connector starts up a separate task to capture the changes from each replica set. If new shards are added to the cluster or existing shards removed, the connector will automatically adjust its tasks accordingly.

.. _mongodb-source-connector-quickstart:

Quick Start
===========

`Debezium’s MongoDB Connector`_ can monitor a MongoDB replica set or a MongoDB sharded cluster for document changes in databases and collections, recording those changes as events in |ak| topics.

Install the Connector
^^^^^^^^^^^^^^^^^^^^^

Refer to the `Debezium tutorial`_ if you want to use Docker images to set up |ak|, |zk| and
|kconnect|. For the following
tutorial, you need to have a local |cp| installation.

Navigate to your |cp| installation directory and enter the following
command to install the connector:

.. codewithvars:: bash

   confluent-hub install debezium/debezium-connector-mongodb:0.9.4

Adding a new connector plugin requires restarting Connect. Use the
Confluent CLI to restart Connect.

.. codewithvars:: bash

   |confluent_stop| connect && |confluent_start| connect
   Using CONFLUENT_CURRENT: /Users/username/Sandbox/confluent-snapshots/var/confluent.NuZHxXfq
   Starting zookeeper
   zookeeper is [UP]
   Starting kafka
   kafka is [UP]
   Starting schema-registry
   schema-registry is [UP]
   Starting kafka-rest
   kafka-rest is [UP]
   Starting connect
   connect is [UP]

Check if the MongoDB plugin has been installed correctly and picked
up by the plugin loader.

.. codewithvars:: bash

   curl -sS localhost:8083/connector-plugins | jq '.[].class' | grep mongodb 
   "io.debezium.connector.mongodb.MongoDbConnector"

Set up MongoDB using Docker (Optional)
^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^

If you do not have a native installation of MongoDB, you may use the following command to launch MongoDB using a Docker image.

.. codewithvars:: bash
   
   # Configure the MongoDB data directory
   mkdir -p path/to/project/data/db
   
   # Pull the Docker image
   docker pull mongo
   
   # Run the container, where `mongodb` is the name assigned to the conatiner
   docker run --name mongodb -v $(pwd)/data/db:/data/db -p 27017:27017 -d mongo  --replSet debezium
   
   # Start a new bash process in running container
   docker exec -it mongodb bash
   
   # Start the mongo process
   mongo

   # Initialize MongoDB replica set 
   
   docker exec -it mongodb mongo --eval 'rs.initiate({_id: "debezium", members:[{_id: 0, host: "localhost:27017"}]})'
   
   # Create a user profile
   use admin
   db.createUser(
   {
   user: "debezium",
   pwd: "dbz",
   roles: ["dbOwner"]
   }
   )
   
   # Insert a record 
   use inventory
   db.customers.insert([
   { _id : 1005, first_name : 'Bob', last_name : 'Hopper', email : 'thebob@example.com' }
   ]);
   
   # View records 
   db.customers.find().pretty();
   

Start the Debezium MongoDB connector
^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^

Create the file ``register-mongodb.json`` to store the following connector configuration:

::

   {
    "name": "inventory-connector",
    "config": {
        "connector.class" : "io.debezium.connector.mongodb.MongoDbConnector",
        "tasks.max" : "1",
        "mongodb.hosts" : "debezium/localhost:27017",
        "mongodb.name" : "dbserver1",
        "mongodb.user" : "debezium",
        "mongodb.password" : "dbz",
        "database.history.kafka.bootstrap.servers" : "localhost:9092"
        }
    }

Start the connector.
::

   # Start MongoDB connector
   curl -i -X POST -H "Accept:application/json" -H  "Content-Type:application/json" http://localhost:8083/connectors/ -d @register-mongodb.json


Start your |ak| consumer
^^^^^^^^^^^^^^^^^^^^^^^^

Start the consumer in a new terminal session.

.. codewithvars:: bash

   |confluent_consume| dbserver1.inventory.customers|dash| --from-beginning

Start MongoDB bash and enter queries to insert or modify records in the database. 
When you enter queries in your MongoDB bash to add or modify records in the database, messages are displayed on your consumer terminal to reflect those records. 

.. codewithvars:: bash

  db.customers.insert([
   { _id : 1008, first_name : 'Jim', last_name : 'Colbert', email : 'thejim@example.com' }
   ]);


Clean up resources
^^^^^^^^^^^^^^^^^^
Delete the connector and stop Confluent services.

.. codewithvars:: bash

   curl -X DELETE localhost:8083/connectors/inventory-connector
   |confluent_stop|

Stop the MongoDB container.

.. codewithvars:: bash

   docker stop mongodb

.. note::
   
   Portions of the information provided here derives from documentation originally
   produced by the `Debezium Community <https://debezium.io/>`_. Work produced
   by Debezium is licensed under `Creative Commons 3.0 <https://creativecommons.org/licenses/by/3.0/>`_.
   
Additional Documentation
========================

.. toctree::
   :maxdepth: 1
   :titlesonly:

   mongodb_source_connector_config
   
.. _Debezium’s MongoDB Connector: https://debezium.io/docs/connectors/mongodb/
.. _Debezium tutorial: https://github.com/debezium/debezium-examples/tree/master/tutorial#using-mongodb
.. _Debezium’s MongoDB Connector: https://debezium.io/docs/connectors/mongodb/
.. _replication mechanism: https://docs.mongodb.com/manual/replication/
.. _oplog: https://docs.mongodb.com/manual/core/replica-set-oplog/