Debezium MongoDB Source Connector for Confluent Platform¶
Debezium の MongoDB Connector は、MongoDB レプリカセットまたは MongoDB シャーディングクラスター内のデータベースおよびコレクションのドキュメントの変更をモニタリングし、その変更をイベントとして Apache Kafka® トピックに記録することができます。コネクターは、シャーディングクラスター内のシャードの追加または削除、各レプリカセットのメンバーシップの変更、各レプリカセット内の選出を自動的に処理し、通信の問題が発生したときに動的に調整します。
Debezium MongoDB コネクターは、MongoDB の oplog を使用して変更を取り込みます。これは MongoDB の レプリケーションメカニズム を利用するので、コネクターは MongoDB レプリカセットまたはシャーディングクラスターでのみ機能します。
スタンドアロンサーバーには oplog がないため、Debezium MongoDB コネクターは、スタンドアロンの MongoDB サーバーの変更をモニタリングすることができません。このコネクターは、1 つのメンバーを持つレプリカセットにスタンドアロンサーバーが変換される場合に機能します。
- Confluent では MongoDB コネクターのバージョン 0.9.3 以降がサポートされています。
- Confluent では MongoDB 3.4 以降でのこのコネクターの使用がサポートされています。
機能¶
Debezium MongoDB Source Connector には、以下の機能があります。
少なくとも 1 回のデリバリー¶
このコネクターによって、レコードが Kafka のトピックに少なくとも 1 回は配信されることが保証されます。コネクターを再起動した場合、Kafka のトピックに重複レコードが存在している可能性があります。
1 つのタスクをサポート¶
Debezium MongoDB Source Connector は、1 つのタスクのみの実行をサポートしています。
MongoDB コネクターのインストール¶
このコネクターは、Confluent Hub クライアントのインストール手順 に従ってインストールできます。ZIP ファイルを手動でダウンロードすることもできます。
前提条件¶
注釈
このコネクターは、Connect が実行されるすべてのマシンにインストールする必要があります。
Confluent Hub クライアント のインストール。
注釈
これは、Confluent Enterprise とともにデフォルトでインストールされます。
コネクターの最新(
latest
)バージョンのインストール。コネクターの
latest
バージョンをインストールするには、Confluent Platform のインストールディレクトリに移動し、次のコマンドを実行します。confluent-hub install debezium/debezium-connector-mongodb:latest
特定のバージョンをインストールするには、次の例に示すように
latest
をバージョン番号に置き換えます。confluent-hub install debezium/debezium-connector-mongodb:0.9.4
コネクターの手動インストール¶
コネクターの ZIP ファイル をダウンロードして展開し、コネクターの手動インストール 手順 に従ってください。
構成プロパティ¶
このコネクターの構成プロパティを網羅した一覧については、「 MongoDB Source Connector (Debezium) 構成プロパティ 」を参照してください。
ライセンス¶
Debezium MongoDB コネクターは、オープンソースコネクターであり、Confluent エンタープライズライセンスは不要です。
MongoDB でのレプリケーションメカニズムの構成¶
レプリカセットの開始¶
Debezium MongoDB コネクターは、単一の MongoDB レプリカセットから変更を取り込みます。本稼働環境のレプリカセットには少なくとも 3 つのメンバーを含めることが推奨されていますが、コネクターではレプリカセットのメンバーの数は重要ではなく、より少ないメンバーで機能します。
Debezium MongoDB コネクターをレプリカセットで使用するには、コネクターの mongodb.hosts プロパティで 1 台以上のレプリカセットサーバーのアドレスをシードアドレスとして指定するだけです。コネクターはレプリカセットへの接続にこれらのシードを使用し、接続されたら、レプリカセットからメンバーの完全なセットとプライマリメンバーを取得します。コネクターは、プライマリに接続するタスクを起動し、プライマリの oplog から変更を取り込みます。レプリカセットが新しいプライマリを選択すると、タスクは自動的に新しいプライマリに切り替わります。
MongoDB サーバーが少なくとも 1 台稼働している状態でこのイメージを使ってコンテナーを起動し、レプリカセットが開始されたかどうかを確認します。レプリカセットが開始されていなければ開始し、それにすべてのサーバーを追加します。
$REPLICASET
環境変数のレプリカセットの名前を持つコンテナーを起動します。レプリカセットに入る各 MongoDB サーバーには、MONGO_n
(n は 1、2、3..)のような名前のリンクを使用します。
docker run -it --name mongo-init --rm -e REPLICASET=rs0 --link data1:mongo1 --link data2:mongo2 --link data3:mongo3 debezium/mongo-initiator
シャーディングクラスターの開始¶
MongoDB シャーディングクラスターは、以下のもので構成されます。
- クラスターの構成サーバーとして動作する個別のレプリカセット
- 1 つ以上のシャード(それぞれレプリカセットとしてデプロイされる)
- クライアントの接続先となり、要求を適切なシャードにルーティングする 1 つ以上のルーター(mongos)
Debezium MongoDB コネクターをシャーディングクラスターで使用するには、次のようにします。
- 構成サーバーのレプリカセットのホストアドレスでコネクターを構成します。
コンテナーは、1 つ以上の MongoDB ルーターにシャードとしてレプリカセットに追加することができます。たとえば、3 台の MongoDB サーバーがコンテナー shardA1
、shardA2
、および shardA3
で稼働し、2 台の MongoDB ルーターがコンテナー router1
および router2
で稼働しているとします。以下のコマンドを実行すると、shardA1
、shardA2
、および shardA3
がレプリカセット shardA として適切に開始され、shardA
レプリカセットはルーター router1 および router2 にシャードとして追加されます。
docker run -it --name mongo-init --rm -e REPLICASET=shardA --link shardA1:mongo1 --link shardA2:mongo2 --link shardA3:mongo3 --link router1 --link router2 debezium/mongo-initiator
- さらにコンテナーを実行することで、シャードレプリカセットをさらに追加することができます。以下に例を示します。
docker run -it --name mongo-init --rm -e REPLICASET=shardB --link shardB1:mongo1 --link shardB2:mongo2
コネクターは、このレプリカセットに接続すると、シャーディングクラスターの構成サーバーとして動作し、クラスター内でシャードとして使用されている各レプリカセットの情報を検出します。
コネクターは、個別のタスクを起動して、各レプリカセットから変更を取り込みます。クラスターに新しいシャードが追加されたり、既存のシャードが削除されたりすると、コネクターはそれに応じて自動的にタスクを調整します。
クイックスタート¶
Debezium’s MongoDB Connector は、MongoDB レプリカセットまたは MongoDB シャーディングクラスター内のデータベースおよびコレクションのドキュメントの変更をモニタリングし、その変更をイベントとして Kafka トピックに記録することができます。
注釈
Kafka Connect を Confluent Cloud に接続する方法の例については、「分散クラスター」を参照してください。
コネクターのインストール¶
Docker イメージを使用して Kafka、ZooKeeper および Connect をセットアップする場合は、Debezium tutorial を参照してください。以下のチュートリアルでは、Confluent Platform をローカルにインストールしておく必要があります。
Confluent Platform のインストールディレクトリに移動し、次のコマンドを入力して、コネクターをインストールします。
confluent-hub install debezium/debezium-connector-mongodb:0.9.4
新しいコネクタープラグインを追加した場合は、Connect の再起動が必要です。Confluent CLI を使用して Connect を再起動します。
ちなみに
Confluent CLI 開発用コマンドのコマンド構文が、5.3.0 で変更されています。該当するコマンドは confluent local
に移行されています。たとえば、confluent start
の構文は、confluent local services start
に変わりました。詳しくは、「confluent local」を参照してください。
confluent local services connect stop && confluent local services connect start
Using CONFLUENT_CURRENT: /Users/username/Sandbox/confluent-snapshots/var/confluent.NuZHxXfq
Starting Zookeeper
Zookeeper is [UP]
Starting Kafka
Kafka is [UP]
Starting Schema Registry
Schema Registry is [UP]
Starting Kafka REST
Kafka REST is [UP]
Starting Connect
Connect is [UP]
MongoDB プラグインが正しくインストールされ、プラグインローダーによって選択されていることを確認します。
curl -sS localhost:8083/connector-plugins | jq '.[].class' | grep mongodb
"io.debezium.connector.mongodb.MongoDbConnector"
Docker を使用した MongoDB のセットアップ(オプション)¶
MongoDB がネイティブインストールされていない場合は、次のコマンドを使用し、Docker イメージを使用して MongoDB を起動することができます。
# Configure the MongoDB data directory
mkdir -p path/to/project/data/db
# Pull the Docker image
docker pull mongo
# Run the container, where `mongodb` is the name assigned to the conatiner
docker run --name mongodb -v $(pwd)/data/db:/data/db -p 27017:27017 -d mongo --replSet debezium
# Start a new bash process in running container
docker exec -it mongodb bash
# Start the mongo process
mongo
# Initialize MongoDB replica set
docker exec -it mongodb mongo --eval 'rs.initiate({_id: "debezium", members:[{_id: 0, host: "localhost:27017"}]})'
# Create a user profile
use admin
db.createUser(
{
user: "debezium",
pwd: "dbz",
roles: ["dbOwner"]
}
)
# Insert a record
use inventory
db.customers.insert([
{ _id : 1005, first_name : 'Bob', last_name : 'Hopper', email : 'thebob@example.com' }
]);
# View records
db.customers.find().pretty();
Debezium MongoDB コネクターの起動¶
ファイル register-mongodb.json
を作成して、次のコネクター構成を保存します。
{
"name": "inventory-connector",
"config": {
"connector.class" : "io.debezium.connector.mongodb.MongoDbConnector",
"tasks.max" : "1",
"mongodb.hosts" : "debezium/localhost:27017",
"mongodb.name" : "dbserver1",
"mongodb.user" : "debezium",
"mongodb.password" : "dbz",
}
}
コネクターを起動します。
# Start MongoDB connector
curl -i -X POST -H "Accept:application/json" -H "Content-Type:application/json" http://localhost:8083/connectors/ -d @register-mongodb.json
Kafka コンシューマーの起動¶
新しいターミナルセッションでコンシューマーを起動します。
confluent local services kafka consume dbserver1.inventory.customers --from-beginning
MongoDB bash を開始し、データベースにレコードを挿入、またはデータベースのレコードを変更するクエリを入力します。MongoDB bash でクエリを入力してデータベースにレコードを追加、またはデータベースのレコードを変更すると、そのレコードを反映したメッセージがコンシューマーのターミナルに表示されます。
db.customers.insert([
{ _id : 1008, first_name : 'Jim', last_name : 'Colbert', email : 'thejim@example.com' }
]);
リソースのクリーンアップ¶
コネクターを削除し、Confluent サービスを停止します。
curl -X DELETE localhost:8083/connectors/inventory-connector
confluent local stop
MongoDB コンテナーを停止します。
docker stop mongodb
注釈
ここで提供されている情報の一部は、 Debezium Community で最初に生成されたドキュメントから派生したものです。Debezium によって生成された作品は、Creative Commons 3.0 でライセンスされています。