Confluent Cloud の Cluster Linking ドキュメントをお探しですか? このページは Confluent Platform における Cluster Linking の説明です。Confluent Cloud のドキュメントをお探しの場合は、Confluent Cloud の「Cluster Linking」 をご覧ください。
Cluster Linking のデモ(Docker)¶
重要
この機能はプレビュー機能として利用できます。プレビュー機能とは、開発者から早い段階でフィードバックを受けるために提供している Confluent Platform のコンポーネントのことです。この機能は、評価用、本稼働環境以外でのテスト用、あるいは Confluent にフィードバックを提供するために使用できます。
これは、一般的なユースケースのコンテキストでの Cluster Linking とその機能の実践的なデモです。スクリプトと README は、GitHub の demo-scene/cluster-linking で入手できます。
デモの内容¶
このデモを完了すると、2 つのクラスターを構成し、さまざまなユースケースを対象に Cluster Linking を使用してクラスター間でデータを移行できるようになります。
このチュートリアルでは、以下のことを行います。
- トピックリンクの作成
- クラスターリンクの詳細を表示する方法の学習
- トピックデータを共有するための基本的なトピックミラーリングの例の実行
- コマンドターミナルで 2 つのウィンドウを開き、送信元クラスターでメッセージを生成し、送信先クラスターでミラートピックから消費すること
- コンシューマーを west クラスターから east クラスターに移行し、これをモニタリングする方法を学ぶこと
- リンクの停止およびデモの終了
セットアップと前提条件¶
このデモには、Docker イメージを取得し、それぞれ独自の ZooKeeper を持つ 2 つの Kafka クラスターをセットアップする Docker Compose ファイルが含まれています。
- ZooKeeper
- Kafka
- Confluent Server
複数のスクリプトが含まれています。これらは、構成の設定、コマンドの実行、ユースケースのデモに使用します。
- 前提条件:
- Docker
- Docker バージョン 1.11 以降が インストールされ動作している 。
- Docker Compose が インストール済みである 。Docker Compose は、Docker for Mac ではデフォルトでインストールされます。
- Docker メモリーに最小でも 6 GB が割り当てられている。Docker Desktop for Mac を使用しているとき、Docker メモリーの割り当てはデフォルトで 2 GB です。Docker のデフォルトの割り当てを 6 GB に変更できます。Preferences、Resources、Advanced の順に移動します。
- インターネット接続
- Confluent Platform で現在サポートされる オペレーティングシステム
- Docker でのネットワークと Kafka
- 内部コンポーネントと外部コンポーネントの両方で Docker ネットワークに通信できるように、ホストとポートを構成します。詳細については、こちらの記事 を参照してください。
- (オプション)`curl <https://curl.se/>`__.
- 以下の手順では、Docker Compose ファイルをダウンロードします。ダウンロードにはさまざまな方法がありますが、この手順では、ファイルのダウンロードに使用できる明示的な curl コマンドを説明します。
- Docker
サービスの起動¶
Confluent demo-scene リポジトリのクローンを GitHub から作成し、 cluster-linking/
サブディレクトリで作業します。ここには、このチュートリアルでコンパイルし実行するサンプルコードが含まれます。
ちなみに
次の git clone
サンプルは SSH を使用します。 Git 構成で HTTPS を設定している場合、代わりに git clone https://github.com/confluentinc/demo-scene.git
を使用します。
git clone git@github.com:confluentinc/demo-scene.git
cd cluster-linking/
送信元のトピックから読み取るコンシューマーグループのセットアップ¶
新しいコマンドウィンドウを開き、west クラスターで
west-trades
を消費するコンシューマーグループを作成します。./scripts/6-setup-consumer.sh
set-up-consumer スクリプト は、以下を実行します。
- オフセットを自動的にコミットするプロパティを使用してグループを構成
- グループ
someGroup
に名前を付ける。 - west クラスター(送信元)上の
west-trades
トピックから読み取るコンシューマーをセットアップして実行する。
コンシューマーグループがミラートピックからメッセージ 1 ~ 100 を読み取ると、出力は以下のようになります。
==> Consume from west cluster, west-trades and commit offsets (source cluster) 1 2 3 ... 99 100
"ラグ" ウィンドウで、
kafka-consumer-groups
コマンドを実行して送信元クラスターと送信先クラスターの両方でオフセットを検証します(オフセットは一致するはずです)。west クラスター(送信元)のオフセットを取得するには、以下のコマンドを実行します。
docker-compose exec broker-west kafka-consumer-groups \ --bootstrap-server broker-west:19091 \ --describe \ --group someGroup
送信元クラスターの出力は以下のようになります。
GROUP TOPIC PARTITION CURRENT-OFFSET LOG-END-OFFSET LAG CONSUMER-ID HOST CLIENT-ID someGroup west-trades 0 100 100 0 consumer-someGroup-1-8161b2c7-b9a1-4a81-b2ab-bd58a7a0b2e6 /172.24.0.5 consumer-someGroup-1
east クラスター(送信先)のオフセットを取得するには、以下のコマンドを実行します。
docker-compose exec broker-east kafka-consumer-groups \ --bootstrap-server broker-east:19092 \ --describe \ --group someGroup
送信先クラスターの出力は以下のようになります。
Consumer group 'someGroup' has no active members. GROUP TOPIC PARTITION CURRENT-OFFSET LOG-END-OFFSET LAG CONSUMER-ID HOST CLIENT-ID someGroup west-trades 0 100 100 0 - - -
ちなみに
オフセットが移行されるまで最大 10 秒かかる場合があります。
送信元から送信先へのグループの移行¶
オフセット移行をアップデートして、west クラスターから east クラスターへのコンシューマーオフセットの移行を停止します。
migrate-one-cg スクリプト は、オフセット移行とクラスターリンクをアップデートして移行を実行し、より多くのメッセージを
west-trades
トピックに対して生成して、それらを east のコンシューマーグループから消費します。./scripts/7-migrate-one-cg.sh
出力メッセージを参照し、スクリプトが以下のタスクを完了したことを確認します。
consumer.offset.group.filters
をアップデートして、コンシューマーオフセットから除外するフィルターをsomeGroup
に対して設定する。kafka-configs
コマンドを使用して east からのクラスターリンクをアップデートし、someGroup
コンシューマーオフセットの移行を除外する。- west クラスターで別の 100 件のメッセージを生成する。
- east クラスターの新しいコンシューマーを消費する。
- 一方から他方に移行されたコンシューマーオフセットをモニタリングする。
==> Stop migrating the consumer group someGroup via the west link Completed updating config for cluster-link west-cluster-link. ==> Produce 100 more messages to the source topic >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> ==> Consume from east cluster, west-trades and commit offsets (destination cluster) 101 102 ... 197 198 199 200 ^CProcessed a total of 100 messages ==> Monitor that the consumer offsets have correctly been migrated ==> West Cluster Consumer group 'someGroup' has no active members. GROUP TOPIC PARTITION CURRENT-OFFSET LOG-END-OFFSET LAG CONSUMER-ID HOST CLIENT-ID someGroup west-trades 0 100 200 100 - - - ==> East Cluster Consumer group 'someGroup' has no active members. GROUP TOPIC PARTITION CURRENT-OFFSET LOG-END-OFFSET LAG CONSUMER-ID HOST CLIENT-ID someGroup west-trades 0 200 200 0 - - -
"ラグ" ウィンドウで、
kafka-consumer-groups
コマンドを両方のクラスターで再実行して、broker-east
の移行されたコンシューマーグループがオフセット 200 に完全に追い付いていることを検証します。まず、west クラスターで実行します(これは生成先のクラスターであるため、プロデューサーが動作していることだけを検証します)。
docker-compose exec broker-west kafka-consumer-groups \ --bootstrap-server broker-west:19091 \ --describe \ --group someGroup
送信元クラスターの出力は以下のようになります。
GROUP TOPIC PARTITION CURRENT-OFFSET LOG-END-OFFSET LAG CONSUMER-ID HOST CLIENT-ID someGroup west-trades 0 100 200 0 consumer-someGroup-1-8161b2c7-b9a1-4a81-b2ab-bd58a7a0b2e6 /172.24.0.5 consumer-someGroup-1
次に、
broker-east
が追い付いていることを検証します。docker-compose exec broker-east kafka-consumer-groups \ --bootstrap-server broker-east:19092 \ --describe \ --group someGroup
GROUP TOPIC PARTITION CURRENT-OFFSET LOG-END-OFFSET LAG CONSUMER-ID HOST CLIENT-ID someGroup west-trades 0 200 200 0 consumer-someGroup-1-dff88343-140b-481b-acb2-e86d0713e180 /172.24.0.4 consumer-someGroup-1
この時点ではメッセージを生成していないため、両方が一致し、ラグも存在しないはずです。
ミラートピックから書き込み可能トピックへのトピックの変更¶
stop-link スクリプト を実行して、ミラートピックから書き込み可能トピックにトピックを変更します。
スクリプトは kafka-replica-status
を使用してミラートピックを表示し、kafka-topics --alter --mirror-action stop
を使用してリンクを停止して、再び kafka-replica-status
を使用して変更をモニタリングします。
./scripts/8-stop-link.sh
リンクが停止し、ミラートピックが書き込み可能トピックに変更されると、出力は以下のようになります。
==> Using replica status to see mirrored topic
Topic Partition Replica ClusterLink IsLeader IsObserver IsIsrEligible IsInIsr IsCaughtUp LastCaughtUpLagMs LastFetchLagMs LogStartOffset LogEndOffset
__consumer_offsets 0 1 - true false true true true 0 0 0 0
__consumer_offsets 1 1 - true false true true true 0 0 0 0
...
__consumer_offsets 49 1 - true false true true true 0 0 0 0
_confluent-license 0 1 - true false true true true 0 0 0 0
west-trades 0 1 - true false true true true 0 0 0 500
west-trades 0 1 west-cluster-link true false true true true -7 -7 0 500
==> Stop west-link
Topic 'west-trades's mirror was successfully stopped.
==> Monitor the change in mirrored topic status
Topic Partition Replica ClusterLink IsLeader IsObserver IsIsrEligible IsInIsr IsCaughtUp LastCaughtUpLagMs LastFetchLagMs LogStartOffset LogEndOffset
__consumer_offsets 0 1 - true false true true true 0 0 0 0
__consumer_offsets 1 1 - true false true true true 0 0 0 0
...
__consumer_offsets 49 1 - true false true true true 0 0 0 0
_confluent-license 0 1 - true false true true true 0 0 0 0
west-trades 0 1 - true false true true true 0 0 0 500
ちなみに
Docker のデモでは、非推奨となった kafka-topics --alter --mirror-action stop
コマンドが使用されています。このコマンドは、Confluent Platform 6.0.0 リリースに固定されているため、このデモにおいても引き続き正しく機能します。新しい promote
コマンドと failover
コマンドを使用してミラーリングを停止する方法について詳しくは、構成リファレンスの「トピックのミラーリングの停止」およびチュートリアルの「書き込み可能にするためのミラートピックの切り替え」を参照してください。
終了後の手順¶
実行中のプロデューサーまたはコンシューマーを明確にシャットダウンするには、それぞれのコマンドウィンドウで Ctrl キーを押しながら C キーを押します。(忘れた場合は、シャットダウンスクリプトによって停止されます)。
shutdown スクリプト を実行して、Docker コンテナを停止して削除します。
./scripts/9-shutdown.sh
出力は以下のようになります。
Error response from daemon: No such container: pumba-latency Stopping broker-west ... done Stopping broker-east ... done Stopping zookeeper-west ... done Stopping zookeeper-east ... done Removing broker-west ... done Removing broker-east ... done Removing zookeeper-west ... done Removing zookeeper-east ... done Removing network clusterlinking_n1
詳細については、Docker の公式ドキュメントを参照してください。