Cluster Linking のデモ(Docker)¶
Looking for Confluent Cloud Cluster Linking docs? You are currently viewing Confluent Platform documentation. If you are looking for Confluent Cloud docs, check out Cluster Linking on Confluent Cloud.
重要
この機能はプレビュー機能として利用できます。プレビュー機能とは、開発者から早い段階でフィードバックを受けるために提供している Confluent Platform のコンポーネントのことです。この機能は、評価用、本稼働環境以外でのテスト用、あるいは Confluent にフィードバックを提供するために使用できます。
これは、一般的なユースケースのコンテキストでの Cluster Linking とその機能の実践的なデモです。スクリプトと README は、GitHub の demo-scene/cluster-linking で入手できます。
デモの内容¶
このデモを完了すると、2 つのクラスターを構成し、さまざまなユースケースを対象に Cluster Linking を使用してクラスター間でデータを移行できるようになります。
このチュートリアルでは、以下のことを行います。
- トピックリンクの作成
- クラスターリンクの詳細を表示する方法の学習
- トピックデータを共有するための基本的なトピックミラーリングの例の実行
- コマンドターミナルで 2 つのウィンドウを開き、送信元クラスターでメッセージを生成し、送信先クラスターでミラートピックから消費すること
- コンシューマーを west クラスターから east クラスターに移行し、これをモニタリングする方法を学ぶこと
- リンクの停止およびデモの終了
セットアップと前提条件¶
このデモには、Docker イメージを取得し、それぞれ独自の ZooKeeper を持つ 2 つの Kafka クラスターをセットアップする Docker Compose ファイルが含まれています。
- ZooKeeper
- Kafka
- Confluent Server
複数のスクリプトが含まれています。これらは、構成の設定、コマンドの実行、ユースケースのデモに使用します。
- 前提条件:
- Docker
- Docker バージョン 1.11 またはそれ以降が インストールされ動作している。
- Docker Compose が インストール済みである 。Docker Compose は、Docker for Mac ではデフォルトでインストールされます。
- Docker メモリーに最小でも 6 GB が割り当てられている。Docker Desktop for Mac を使用しているとき、Docker メモリーの割り当てはデフォルトで 2 GB です。Docker のデフォルトの割り当てを 6 GB に変更できます。Preferences、Resources、Advanced の順に移動します。
- インターネット接続
- Confluent Platform で現在サポートされる オペレーティングシステム
- Docker でのネットワークと Kafka
- 内部コンポーネントと外部コンポーネントが Docker ネットワークに通信できるように、ホストとポートを構成します。詳細については、こちらの記事 を参照してください。
- (オプション)`curl <https://curl.se/>`__.
- 以下の手順では、Docker Compose ファイルをダウンロードします。ダウンロードにはさまざまな方法がありますが、この手順では、ファイルのダウンロードに使用できる明示的な curl コマンドを説明します。
- Docker
サービスの起動¶
Confluent demo-scene リポジトリのクローンを GitHub から作成し、 cluster-linking/
サブディレクトリで作業します。ここには、このチュートリアルでコンパイルし実行するサンプルコードが含まれます。
ちなみに
次の git clone
サンプルは SSH を使用します。 Git 構成で HTTPS を設定している場合、代わりに git clone https://github.com/confluentinc/demo-scene.git
を使用します。
git clone git@github.com:confluentinc/demo-scene.git
cd cluster-linking/
送信元のトピックから読み取るコンシューマーグループのセットアップ¶
新しいコマンドウィンドウを開き、west クラスターで
west-trades
を消費するコンシューマーグループを作成します。./scripts/6-setup-consumer.sh
set-up-consumer スクリプト は、以下を実行します。
- オフセットを自動的にコミットするプロパティを使用してグループを構成
- グループ
someGroup
に名前を付ける。 - west クラスター(送信元)上の
west-trades
トピックから読み取るコンシューマーをセットアップして実行する。
コンシューマーグループがミラートピックからメッセージ 1 ~ 100 を読み取ると、出力は以下のようになります。
==> Consume from west cluster, west-trades and commit offsets (source cluster) 1 2 3 ... 99 100
"ラグ" ウィンドウで、
kafka-consumer-groups
コマンドを実行して送信元クラスターと送信先クラスターの両方でオフセットを検証します(オフセットは一致するはずです)。west クラスター(送信元)のオフセットを取得するには、以下のコマンドを実行します。
docker-compose exec broker-west kafka-consumer-groups \ --bootstrap-server broker-west:19091 \ --describe \ --group someGroup
送信元クラスターの出力は以下のようになります。
GROUP TOPIC PARTITION CURRENT-OFFSET LOG-END-OFFSET LAG CONSUMER-ID HOST CLIENT-ID someGroup west-trades 0 100 100 0 consumer-someGroup-1-8161b2c7-b9a1-4a81-b2ab-bd58a7a0b2e6 /172.24.0.5 consumer-someGroup-1
east クラスター(送信先)のオフセットを取得するには、以下のコマンドを実行します。
docker-compose exec broker-east kafka-consumer-groups \ --bootstrap-server broker-east:19092 \ --describe \ --group someGroup
送信先クラスターの出力は以下のようになります。
Consumer group 'someGroup' has no active members. GROUP TOPIC PARTITION CURRENT-OFFSET LOG-END-OFFSET LAG CONSUMER-ID HOST CLIENT-ID someGroup west-trades 0 100 100 0 - - -
ちなみに
オフセットが移行されるまで最大 10 秒かかる場合があります。
送信元から送信先へのグループの移行¶
オフセット移行をアップデートして、west クラスターから east クラスターへのコンシューマーオフセットの移行を停止します。
migrate-one-cg スクリプト は、オフセット移行とクラスターリンクをアップデートして移行を実行し、より多くのメッセージを
west-trades
トピックに対して生成して、それらを east のコンシューマーグループから消費します。./scripts/7-migrate-one-cg.sh
出力メッセージを参照し、スクリプトが以下のタスクを完了したことを確認します。
consumer.offset.group.filters
をアップデートして、コンシューマーオフセットから除外するフィルターをsomeGroup
に対して設定する。kafka-configs
コマンドを使用して east からのクラスターリンクをアップデートし、someGroup
コンシューマーオフセットの移行を除外する。- west クラスターで別の 100 件のメッセージを生成する。
- east クラスターの新しいコンシューマーを消費する。
- 一方から他方に移行されたコンシューマーオフセットをモニタリングする。
==> Stop migrating the consumer group someGroup via the west link Completed updating config for cluster-link west-cluster-link. ==> Produce 100 more messages to the source topic >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> ==> Consume from east cluster, west-trades and commit offsets (destination cluster) 101 102 ... 197 198 199 200 ^CProcessed a total of 100 messages ==> Monitor that the consumer offsets have correctly been migrated ==> West Cluster Consumer group 'someGroup' has no active members. GROUP TOPIC PARTITION CURRENT-OFFSET LOG-END-OFFSET LAG CONSUMER-ID HOST CLIENT-ID someGroup west-trades 0 100 200 100 - - - ==> East Cluster Consumer group 'someGroup' has no active members. GROUP TOPIC PARTITION CURRENT-OFFSET LOG-END-OFFSET LAG CONSUMER-ID HOST CLIENT-ID someGroup west-trades 0 200 200 0 - - -
"ラグ" ウィンドウで、
kafka-consumer-groups
コマンドを両方のクラスターで再実行して、broker-east
の移行されたコンシューマーグループがオフセット 200 に完全に追い付いていることを検証します。まず、west クラスターで実行します(これは生成先のクラスターであるため、プロデューサーが動作していることだけを検証します)。
docker-compose exec broker-west kafka-consumer-groups \ --bootstrap-server broker-west:19091 \ --describe \ --group someGroup
送信元クラスターの出力は以下のようになります。
GROUP TOPIC PARTITION CURRENT-OFFSET LOG-END-OFFSET LAG CONSUMER-ID HOST CLIENT-ID someGroup west-trades 0 200 200 0 consumer-someGroup-1-8161b2c7-b9a1-4a81-b2ab-bd58a7a0b2e6 /172.24.0.5 consumer-someGroup-1
次に、
broker-east
が追い付いていることを検証します。docker-compose exec broker-east kafka-consumer-groups \ --bootstrap-server broker-east:19092 \ --describe \ --group someGroup
GROUP TOPIC PARTITION CURRENT-OFFSET LOG-END-OFFSET LAG CONSUMER-ID HOST CLIENT-ID someGroup west-trades 0 200 200 0 consumer-someGroup-1-dff88343-140b-481b-acb2-e86d0713e180 /172.24.0.4 consumer-someGroup-1
この時点ではメッセージを生成していないため、両方が一致し、ラグも存在しないはずです。
ミラートピックから書き込み可能トピックへのトピックの変更¶
stop-link スクリプト を実行して、ミラートピックから書き込み可能トピックにトピックを変更します。
スクリプトは kafka-replica-status
を使用してミラートピックを表示し、kafka-topics --alter --mirror-action stop
を使用してリンクを停止して、再び kafka-replica-status
を使用して変更をモニタリングします。
./scripts/8-stop-link.sh
リンクが停止し、ミラートピックが書き込み可能トピックに変更されると、出力は以下のようになります。
==> Using replica status to see mirrored topic
Topic Partition Replica ClusterLink IsLeader IsObserver IsIsrEligible IsInIsr IsCaughtUp LastCaughtUpLagMs LastFetchLagMs LogStartOffset LogEndOffset
__consumer_offsets 0 1 - true false true true true 0 0 0 0
__consumer_offsets 1 1 - true false true true true 0 0 0 0
...
__consumer_offsets 49 1 - true false true true true 0 0 0 0
_confluent-license 0 1 - true false true true true 0 0 0 0
west-trades 0 1 - true false true true true 0 0 0 500
west-trades 0 1 west-cluster-link true false true true true -7 -7 0 500
==> Stop west-link
Topic 'west-trades's mirror was successfully stopped.
==> Monitor the change in mirrored topic status
Topic Partition Replica ClusterLink IsLeader IsObserver IsIsrEligible IsInIsr IsCaughtUp LastCaughtUpLagMs LastFetchLagMs LogStartOffset LogEndOffset
__consumer_offsets 0 1 - true false true true true 0 0 0 0
__consumer_offsets 1 1 - true false true true true 0 0 0 0
...
__consumer_offsets 49 1 - true false true true true 0 0 0 0
_confluent-license 0 1 - true false true true true 0 0 0 0
west-trades 0 1 - true false true true true 0 0 0 500
終了後の手順¶
実行中のプロデューサーまたはコンシューマーをクリーンシャットダウンするには、それぞれのコマンドウィンドウで Ctrl キーを押しながら C キーを押します。(忘れた場合は、シャットダウンスクリプトによって停止されます)。
shutdown スクリプト を実行して、Docker コンテナを停止して削除します。
./scripts/9-shutdown.sh
出力は以下のようになります。
Error response from daemon: No such container: pumba-latency Stopping broker-west ... done Stopping broker-east ... done Stopping zookeeper-west ... done Stopping zookeeper-east ... done Removing broker-west ... done Removing broker-east ... done Removing zookeeper-west ... done Removing zookeeper-east ... done Removing network clusterlinking_n1
詳細については、Docker の公式ドキュメントを参照してください。