Cluster Linking のデモ(Docker)

Looking for Confluent Cloud Cluster Linking docs? You are currently viewing Confluent Platform documentation. If you are looking for Confluent Cloud docs, check out Cluster Linking on Confluent Cloud.

重要

この機能はプレビュー機能として利用できます。プレビュー機能とは、開発者から早い段階でフィードバックを受けるために提供している Confluent Platform のコンポーネントのことです。この機能は、評価用、本稼働環境以外でのテスト用、あるいは Confluent にフィードバックを提供するために使用できます。

これは、一般的なユースケースのコンテキストでの Cluster Linking とその機能の実践的なデモです。スクリプトと README は、GitHub の demo-scene/cluster-linking で入手できます。

デモの内容

このデモを完了すると、2 つのクラスターを構成し、さまざまなユースケースを対象に Cluster Linking を使用してクラスター間でデータを移行できるようになります。

このチュートリアルでは、以下のことを行います。

  • トピックリンクの作成
  • クラスターリンクの詳細を表示する方法の学習
  • トピックデータを共有するための基本的なトピックミラーリングの例の実行
  • コマンドターミナルで 2 つのウィンドウを開き、送信元クラスターでメッセージを生成し、送信先クラスターでミラートピックから消費すること
  • コンシューマーを west クラスターから east クラスターに移行し、これをモニタリングする方法を学ぶこと
  • リンクの停止およびデモの終了
../../_images/cluster-linking-diagram.ja.png

セットアップと前提条件

このデモには、Docker イメージを取得し、それぞれ独自の ZooKeeper を持つ 2 つの Kafka クラスターをセットアップする Docker Compose ファイルが含まれています。

  • ZooKeeper
  • Kafka
  • Confluent Server

複数のスクリプトが含まれています。これらは、構成の設定、コマンドの実行、ユースケースのデモに使用します。

前提条件:
  • Docker
    • Docker バージョン 1.11 またはそれ以降が インストールされ動作している
    • Docker Compose が インストール済みである 。Docker Compose は、Docker for Mac ではデフォルトでインストールされます。
    • Docker メモリーに最小でも 6 GB が割り当てられている。Docker Desktop for Mac を使用しているとき、Docker メモリーの割り当てはデフォルトで 2 GB です。Docker のデフォルトの割り当てを 6 GB に変更できます。PreferencesResourcesAdvanced の順に移動します。
  • インターネット接続
  • Confluent Platform で現在サポートされる オペレーティングシステム
  • Docker でのネットワークと Kafka
    • 内部コンポーネントと外部コンポーネントが Docker ネットワークに通信できるように、ホストとポートを構成します。詳細については、こちらの記事 を参照してください。
  • (オプション)`curl <https://curl.se/>`__.
    • 以下の手順では、Docker Compose ファイルをダウンロードします。ダウンロードにはさまざまな方法がありますが、この手順では、ファイルのダウンロードに使用できる明示的な curl コマンドを説明します。

サービスの起動

Confluent demo-scene リポジトリのクローンを GitHub から作成し、 cluster-linking/ サブディレクトリで作業します。ここには、このチュートリアルでコンパイルし実行するサンプルコードが含まれます。

ちなみに

次の git clone サンプルは SSH を使用します。 Git 構成で HTTPS を設定している場合、代わりに git clone https://github.com/confluentinc/demo-scene.git を使用します。

git clone git@github.com:confluentinc/demo-scene.git
cd cluster-linking/

ミラートピックの共有

  1. Docker Compose を起動します。

    docker-compose up -d
    

    これにより、最新の Docker イメージが取得され、コンテナが起動します。

    Creating network "cluster-linking_n1" with the default driver
    Pulling zookeeper-west (confluentinc/cp-zookeeper:latest)...
    latest: Pulling from confluentinc/cp-zookeeper
    0fd3b5213a9b: Pull complete
    aebb8c556853: Pull complete
    ...
    db19045b67cf: Pull complete
    ea44f5056484: Pull complete
    Digest: sha256:d5ba29dbd01ad6f8ebd7c83be89d4949c830b7d0d503d80b882ac739e7974067
    Status: Downloaded newer image for confluentinc/cp-server:latest
    Creating zookeeper-west ... done
    Creating zookeeper-east ... done
    Creating broker-west    ... done
    Creating broker-east    ... done
    
  2. トピック、クラスターリンク、およびミラートピックを作成します。

    以下のスクリプトを実行して、west クラスターにトピックを作成し、west クラスターを east クラスターにリンクして、ソーストピックのトピックデータを送信先トピックにミラーリングします。

    ./scripts/2-create-links-topics.sh
    

    出力は以下のようになります。

    ==> Create West Demo Topic
    Created topic west-trades.
    
    ==> Create East -> West link
    Cluster link 'west-cluster-link' creation successfully completed.
    
    ==> Create an east mirror of west-trades
    Created topic west-trades.
    

    これを完了するために、create-links-topics スクリプト は以下を実行します。

    • kafka-cluster-links (送信元上) : west-cluster-link というクラスターリンクを作成し、コンシューマーオフセットからミラーリングするためのコンシューマーグループを定義するグループフィルター(JSON ファイル内)を渡します(すべてレプリケートされます)。さらに、リンク先となるクラスターのブートストラップサーバー(broker-west )を識別し、コンシューマーオフセット同期を有効にして、オフセットを 10 秒ごとに同期します。
    • kafka-cluster-links --list: 作成されたリンクを表示します。
    • kafka-topics create: 送信先クラスター(broker-east)に west-trades トピックのミラーを作成します。

    ちなみに

    このプレビューではトピック名の変更はサポートされていません。元のトピック名とミラートピック名は同じである必要があります。

  3. list-links-and-lag スクリプト を実行します。このスクリプトは、 kafka-configs コマンドを実行してリンクとトピックの詳細を表示します。

    ./scripts/3-list-links-and-lag.sh
    

    出力は以下のようになります。

    ==> List cluster links
    
    Link name: 'west-cluster-link', link ID: 'c0f902c9-9fb9-4495-8b71-e9ae04d73264', cluster ID: 'P-Le2LkxTISct-OApmmaFg'
    
    ==> Link Metrics
    
    
    ==> Monitor MaxLag
    
    west-cluster-link: 0
    

    ちなみに

    このデモは、リンクメトリクスを生成するようにセットアップされていません。クラスターリンクのメトリクスを収集して表示する方法については、「クラスターメトリクスのモニタリングおよびリンクの最適化」と「チュートリアル: トピックデータ共有での Cluster Linking の使用」を参照してください。

  4. 3 つのコマンドウィンドウセッションを開きます。1 つは west クラスター用、1 つは east クラスター用、1 つはこの 2 つの間のラグをモニタリングするために使用します(すべてのセッションを必ず同じリポジトリと作業ディレクトリ demo-scene/cluster-linking に配置してください)。

    • east クラスター(ミラートピックがある送信先)用のウィンドウで、run-consumer スクリプト を実行して、ミラートピック west-trades からメッセージを消費します。

      ./scripts/4-run-consumer.sh
      

      コンシューマーが起動すると、以下のメッセージが表示されます。

      ==> Consume from east cluster, west-trades
      
    • west クラスター(元のトピックがある送信元)用のウィンドウで、run-producer スクリプト を実行して、west クラスター上のトピック west-trades に対して 100 件のメッセージを生成します。

      ./scripts/5-run-producer.sh
      

      プロデューサーが実行されると、以下のメッセージが表示されます(このプロデューサーはメッセージを自動生成してからシャットダウンし、プロンプトに戻ります)。

      ==> Produce: West -> East west-trades
      >>>> $
      
    • east クラスターのコンシューマーコマンドウィンドウに戻って確認します。コンシューマー(送信先上)がミラートピックから読み取ると、以下のような出力になります。

      ==> Consume from east cluster, west-trades
      
      1
      2
      3
      ...
      99
      100
      

      Ctrl キーを押しながら C キーを押してこのコンシューマーをシャットダウンし、プロンプトに戻ります。

送信元のトピックから読み取るコンシューマーグループのセットアップ

  1. 新しいコマンドウィンドウを開き、west クラスターで west-trades を消費するコンシューマーグループを作成します。

    ./scripts/6-setup-consumer.sh
    

    set-up-consumer スクリプト は、以下を実行します。

    • オフセットを自動的にコミットするプロパティを使用してグループを構成
    • グループ someGroup に名前を付ける。
    • west クラスター(送信元)上の west-trades トピックから読み取るコンシューマーをセットアップして実行する。

    コンシューマーグループがミラートピックからメッセージ 1 ~ 100 を読み取ると、出力は以下のようになります。

    ==> Consume from west cluster, west-trades and commit offsets (source cluster)
    
    1
    2
    3
    ...
    99
    100
    
  2. "ラグ" ウィンドウで、kafka-consumer-groups コマンドを実行して送信元クラスターと送信先クラスターの両方でオフセットを検証します(オフセットは一致するはずです)。

    • west クラスター(送信元)のオフセットを取得するには、以下のコマンドを実行します。

      docker-compose exec broker-west kafka-consumer-groups  \
         --bootstrap-server broker-west:19091 \
         --describe \
         --group someGroup
      

      送信元クラスターの出力は以下のようになります。

      GROUP           TOPIC           PARTITION  CURRENT-OFFSET  LOG-END-OFFSET  LAG             CONSUMER-ID                                               HOST            CLIENT-ID
      someGroup       west-trades     0          100             100             0               consumer-someGroup-1-8161b2c7-b9a1-4a81-b2ab-bd58a7a0b2e6 /172.24.0.5     consumer-someGroup-1
      
    • east クラスター(送信先)のオフセットを取得するには、以下のコマンドを実行します。

      docker-compose exec broker-east kafka-consumer-groups  \
         --bootstrap-server broker-east:19092 \
         --describe \
         --group someGroup
      

      送信先クラスターの出力は以下のようになります。

      Consumer group 'someGroup' has no active members.
      
      GROUP           TOPIC           PARTITION  CURRENT-OFFSET  LOG-END-OFFSET  LAG             CONSUMER-ID     HOST            CLIENT-ID
      someGroup       west-trades     0          100             100             0               -               -               -
      

    ちなみに

    オフセットが移行されるまで最大 10 秒かかる場合があります。

送信元から送信先へのグループの移行

  1. オフセット移行をアップデートして、west クラスターから east クラスターへのコンシューマーオフセットの移行を停止します。

    migrate-one-cg スクリプト は、オフセット移行とクラスターリンクをアップデートして移行を実行し、より多くのメッセージを west-trades トピックに対して生成して、それらを east のコンシューマーグループから消費します。

    ./scripts/7-migrate-one-cg.sh
    

    出力メッセージを参照し、スクリプトが以下のタスクを完了したことを確認します。

    • consumer.offset.group.filters をアップデートして、コンシューマーオフセットから除外するフィルターを someGroup に対して設定する。
    • kafka-configs コマンドを使用して east からのクラスターリンクをアップデートし、someGroup コンシューマーオフセットの移行を除外する。
    • west クラスターで別の 100 件のメッセージを生成する。
    • east クラスターの新しいコンシューマーを消費する。
    • 一方から他方に移行されたコンシューマーオフセットをモニタリングする。
    ==> Stop migrating the consumer group someGroup via the west link
    Completed updating config for cluster-link west-cluster-link.
    ==> Produce 100 more messages to the source topic
    >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>
    ==> Consume from east cluster, west-trades and commit offsets (destination cluster)
    101
    102
    ...
    197
    198
    199
    200
    ^CProcessed a total of 100 messages
    ==> Monitor that the consumer offsets have correctly been migrated
    ==> West Cluster
    Consumer group 'someGroup' has no active members.
    GROUP           TOPIC           PARTITION  CURRENT-OFFSET  LOG-END-OFFSET  LAG             CONSUMER-ID     HOST            CLIENT-ID
    someGroup       west-trades     0          100             200             100             -               -               -
    ==> East Cluster
    Consumer group 'someGroup' has no active members.
    GROUP           TOPIC           PARTITION  CURRENT-OFFSET  LOG-END-OFFSET  LAG             CONSUMER-ID     HOST            CLIENT-ID
    someGroup       west-trades     0          200             200             0               -               -               -
    
  2. "ラグ" ウィンドウで、kafka-consumer-groups コマンドを両方のクラスターで再実行して、broker-east の移行されたコンシューマーグループがオフセット 200 に完全に追い付いていることを検証します。

    • まず、west クラスターで実行します(これは生成先のクラスターであるため、プロデューサーが動作していることだけを検証します)。

      docker-compose exec broker-west kafka-consumer-groups  \
         --bootstrap-server broker-west:19091 \
         --describe \
         --group someGroup
      

      送信元クラスターの出力は以下のようになります。

      GROUP           TOPIC           PARTITION  CURRENT-OFFSET  LOG-END-OFFSET  LAG             CONSUMER-ID                                               HOST            CLIENT-ID
      someGroup       west-trades     0          100             200             0               consumer-someGroup-1-8161b2c7-b9a1-4a81-b2ab-bd58a7a0b2e6 /172.24.0.5     consumer-someGroup-1
      
    • 次に、broker-east が追い付いていることを検証します。

      docker-compose exec broker-east kafka-consumer-groups  \
         --bootstrap-server broker-east:19092 \
         --describe \
         --group someGroup
      
      GROUP           TOPIC           PARTITION  CURRENT-OFFSET  LOG-END-OFFSET  LAG             CONSUMER-ID                                               HOST            CLIENT-ID
      someGroup       west-trades     0          200             200             0               consumer-someGroup-1-dff88343-140b-481b-acb2-e86d0713e180 /172.24.0.4     consumer-someGroup-1
      

      この時点ではメッセージを生成していないため、両方が一致し、ラグも存在しないはずです。

ミラートピックから書き込み可能トピックへのトピックの変更

stop-link スクリプト を実行して、ミラートピックから書き込み可能トピックにトピックを変更します。

スクリプトは kafka-replica-status を使用してミラートピックを表示し、kafka-topics --alter --mirror-action stop を使用してリンクを停止して、再び kafka-replica-status を使用して変更をモニタリングします。

./scripts/8-stop-link.sh

リンクが停止し、ミラートピックが書き込み可能トピックに変更されると、出力は以下のようになります。

==> Using replica status to see mirrored topic

Topic              Partition Replica ClusterLink       IsLeader IsObserver IsIsrEligible IsInIsr IsCaughtUp LastCaughtUpLagMs LastFetchLagMs LogStartOffset LogEndOffset
 __consumer_offsets 0         1       -                 true     false      true          true    true       0                 0              0              0
 __consumer_offsets 1         1       -                 true     false      true          true    true       0                 0              0              0
 ...
 __consumer_offsets 49        1       -                 true     false      true          true    true       0                 0              0              0
 _confluent-license 0         1       -                 true     false      true          true    true       0                 0              0              0
 west-trades        0         1       -                 true     false      true          true    true       0                 0              0              500
 west-trades        0         1       west-cluster-link true     false      true          true    true       -7                -7             0              500

 ==> Stop west-link
 Topic 'west-trades's mirror was successfully stopped.

 ==> Monitor the change in mirrored topic status
 Topic              Partition Replica ClusterLink IsLeader IsObserver IsIsrEligible IsInIsr IsCaughtUp LastCaughtUpLagMs LastFetchLagMs LogStartOffset LogEndOffset
 __consumer_offsets 0         1       -           true     false      true          true    true       0                 0              0              0
 __consumer_offsets 1         1       -           true     false      true          true    true       0                 0              0              0
 ...
 __consumer_offsets 49        1       -           true     false      true          true    true       0                 0              0              0
 _confluent-license 0         1       -           true     false      true          true    true       0                 0              0              0
 west-trades        0         1       -           true     false      true          true    true       0                 0              0              500

ちなみに

Docker のデモでは、非推奨となった kafka-topics --alter --mirror-action stop コマンドが使用されています。このコマンドは、Confluent Platform 6.0.0 リリースに固定されているため、このデモにおいても引き続き正しく機能します。新しい promote コマンドと failover コマンドを使用してミラーリングを停止する方法について詳しくは、構成リファレンスの「トピックのミラーリングの停止」およびチュートリアルの「書き込み可能にするためのミラートピックの切り替え」を参照してください。

強制停止

  1. 実行中のプロデューサーまたはコンシューマーをクリーンシャットダウンするには、それぞれのコマンドウィンドウで Ctrl キーを押しながら C キーを押します。(忘れた場合は、シャットダウンスクリプトによって停止されます)。

  2. shutdown スクリプト を実行して、Docker コンテナを停止して削除します。

    ./scripts/9-shutdown.sh
    

    出力は以下のようになります。

    Error response from daemon: No such container: pumba-latency
    Stopping broker-west    ... done
    Stopping broker-east    ... done
    Stopping zookeeper-west ... done
    Stopping zookeeper-east ... done
    Removing broker-west    ... done
    Removing broker-east    ... done
    Removing zookeeper-west ... done
    Removing zookeeper-east ... done
    Removing network clusterlinking_n1
    

詳細については、Docker の公式ドキュメントを参照してください。