Self-Balancing Clusters デモ(Docker)

このクイックスタートでは、Docker コンテナーを使用した Confluent Server での Self-Balancing Clusters の機能のデモを示します。Self-Balancing Clusters はブローカー間でデータを移動し、多様な分配目標とパフォーマンス目標に従って、ワークロードを均等にします。このデモを終了すると、Self-Balancing Clusters を正しく実行して、ブローカーの追加や削除後にデータのバランスを調整できるようになります。

デモについて

Self-Balancing により、次の方法で Kafka クラスターの管理を簡素化します。

  • クラスターで負荷が不均等に分散しているとき、パフォーマンスを最適化するため、Self-Balancing によりパーティションのバランスが自動的に調整されます。
  • 新しいブローカーがクラスターに追加されたとき、Self-Balancing によりそのブローカーにパーティションが自動的に配置されます。
  • オペレーターがブローカーを削除するときに Self-Balancing API を呼び出し、対象ブローカーをシャットダウンして対応パーティションを移動できます。
  • ブローカーが一定時間停止しているとき、Self-Balancing により該当パーティションが他のブローカーに自動的に再割り当てされます。

このクイックスタートでは、上記の各シナリオに対する実践的な例を示します。

セットアップと対象範囲

このデモでは、次の Docker イメージを取得します。

  • ZooKeeper
  • Kafka
  • Confluent Control Center (コマンドラインの他に UI でもテストする場合)
前提条件:
  • Docker
    • Docker バージョン 1.11 またはそれ以降が インストールされ動作している
    • Docker Compose が インストール済みである 。Docker Compose は、Docker for Mac ではデフォルトでインストールされます。
    • Docker メモリーに最小でも 6 GB が割り当てられている。Docker Desktop for Mac を使用しているとき、Docker メモリーの割り当てはデフォルトで 2 GB です。Docker のデフォルトの割り当てを 6 GB に変更できます。PreferencesResourcesAdvanced の順に移動します。
  • インターネット接続
  • Confluent Platform で現在サポートされる オペレーティングシステム
  • Docker でのネットワークと Kafka
    • 内部コンポーネントと外部コンポーネントが Docker ネットワークに通信できるように、ホストとポートを構成します。詳細については、こちらの記事 を参照してください。
  • (オプション)`curl <https://curl.se/>`__.
    • 以下の手順では、Docker Compose ファイルをダウンロードします。ダウンロードにはさまざまな方法がありますが、この手順では、ファイルのダウンロードに使用できる明示的な curl コマンドを説明します。
  • API を呼び出す curl コマンド( 縮小 シナリオで使用)では、jq--silent フラグと組み合わせて使用すると、きれいに書式設定された出力結果が得られます。

各サービスの起動

Confluent demo-scene リポジトリのクローンを GitHub から作成し、 demo-scene/self-balancing サブディレクトリで作業します。ここには、このチュートリアルでコンパイルし実行するサンプルコードが含まれます。

ちなみに

次の git clone サンプルは SSH を使用します。 Git 構成で HTTPS を設定している場合、代わりに git clone https://github.com/confluentinc/demo-scene.git を使用します。

git clone git@github.com:confluentinc/demo-scene.git
cd self-balancing/

デモシナリオの実行

次の演習では、クラスターのさまざまな不均衡を、実際によくあるシナリオでモデル化して作成する方法を説明します。それぞれの場合で、Self-Balancing を使用してクラスターをモニタリングし、自動バランス調整を実行します。

不均等な負荷

ブローカー間で不均等に配分されたトピックデータのあるトピックを作成します。

  1. docker-compose をフォアグラウンドで実行して、ZooKeeper、3 つの Confluent Platform ブローカー、および Control Center を起動します。

    docker-compose -f kafka-0-1-2.yml up
    

    ちなみに

    これをフォアグラウンドで実行すると、クラスターのステートについて関心がある情報のログをモニタリングできます。新しいコマンドウィンドウを開き、次のステップでコマンドを実行します。

  2. トピックを作成し、レプリカを明示的に割り当てて、ブローカー 2 にレプリカがない状態でデータを強制的に配分して負荷を不均等にします。

    kafka-topics \
    --bootstrap-server localhost:9092 \
    --create \
    --topic sbc \
    --replica-assignment 0:1,0:1,0:1,0:1,0:1,0:1,0:1,0:1,0:1,0:1,0:1,0:1,0:1,0:1,0:1,0:1,0:1,0:1,0:1,0:1,0:1,0:1,0:1,0:1,0:1,0:1,0:1,0:1,0:1,0:1,0:1,0:1,0:1,0:1,0:1,0:1,0:1,0:1,0:1,0:1,0:1,0:1,0:1,0:1,0:1,0:1,0:1,0:1,0:1,0:1,0:1,0:1,0:1,0:1,0:1,0:1,0:1,0:1,0:1,0:1,0:1,0:1,0:1,0:1,0:1,0:1,0:1,0:1,0:1,0:1,0:1,0:1,0:1,0:1,0:1,0:1,0:1,0:1,0:1,0:1,0:1,0:1,0:1,0:1,0:1,0:1,0:1,0:1,0:1,0:1,0:1,0:1,0:1,0:1,0:1,0:1,0:1,0:1,0:1,0:1
    
  3. 1 時間ごとに、約 1 MB/秒のレートでデータを生成します。

    kafka-producer-perf-test \
    --producer-props bootstrap.servers=localhost:9092 \
    --topic sbc \
    --record-size 1000 \
    --throughput 1000 \
    --num-records 3600000
    
  4. トピックで変更の監視を始めます。

    watch kafka-topics \
        --bootstrap-server localhost:9092 \
        --describe \
        --topic sbc
    

    この監視ではレプリカの割り当て、同期中のレプリカ、およびレプリケーションスロットルの詳細など関連情報の変化を表示します。

    省略することも可能ですが、Control Center を使用して同じ変化を監視します。Control Center を使用するにはウェブブラウザーで http://localhost:9021/ にアクセスします。

  5. Self-Balancing でバランス調整が開始するのを待ちます。

    Self-Balancing は、ディスクの使用、リーダー数、パーティション数、ネットワークのスループット、他の変数などのデータをサンプリングします。このデータを集約し、情報に基づいてパーティションを再割り当てする方法を決定します。

    重要

    • Self-Balancing では、起動してから十分なサンプルを収集して(必要な場合に)バランス調整計画を生成するまでに、10 ~ 20 分かかります。
    • Self-Balancing では、クラスターのパーティション数が大きく変わるとき、以前のサンプルが無効になります。クラスターの現在のステートを正確に反映していない可能性があるからです。

    Self-Balancing がサンプリングを続けている間、次のメッセージがログに定期的に表示されます。

    INFO Skipping proposal precomputing because load monitor does not have enough snapshots.
    

    サンプリングが完了した後に、バランス調整処理が始まります。

    ログをモニタリングし、watch kafka-topics コマンドで調整しているときの変化を確認します。

拡張

2 つのブローカーをクラスターに追加し、Self-Balancing によりパーティションが配置されるのを監視します。

  1. 新しいコマンドウィンドウで、次のファイルを指定して docker-compose を実行し、2 つのブローカーを前のセットアップに追加します(チュートリアルの最初の部分からのプロセスはどれも停止しないでください)。

    docker-compose -f kafka-0-1-2.yml -f kafka-3-4.yml up --no-recreate
    
  2. Self-Balancing によるクラスターのバランス調整を監視します。

    Self-Balancing はサンプリングされたデータを使用でき、バランス調整はほとんど即座に開始されます。

    ログには、クラスターのバランスがとれていないことだけではなく、新しいブローカーがバランス調整されていることを Self-Balancing が検出する方法が反映されます。これが重要なのは、Self-Balancing に confluent.balancer.heal.uneven.load.trigger という構成パラメーターがあり、ANY_UNEVEN_LOAD または EMPTY_BROKER を設定できるからです。

    • EMPTY_BROKER では、Self-Balancing が空のブローカーを検出したときにのみバランス調整します(この拡張シナリオです)。
    • ANY_UNEVEN_LOAD は原因にかかわらず、負荷が均等ではないときにバランス調整します(前述の 不均等な負荷のシナリオ です)。

縮小

ブローカーをクラスターから削除して、Self-Balancing がクラスターをシャットダウンし、パーティションを移動するのを監視します。

  1. ブローカーの削除をトリガーします。

    この例では、Apache Kafka® の REST API を使用します。

    まず、次の curl コマンドで、REST API が対応できるすべてのクラスターを含むコレクションを返します。

    curl localhost:8090/kafka/v3/clusters
    

    この場合は、クラスターは1 つだけです。data[0].cluster_id の値がクラスターの ID であることに注意してください。

    ブローカーの削除をトリガーする複数のオプションがあります。

    • 次のコマンドではブローカー 3 の削除をトリガーします(例の <cluster_id> を前の手順のクラスター ID で置換します)。

      curl -X DELETE localhost:8090/kafka/v3/clusters/:<cluster_id>/brokers/3
      

      これにより 202 Accepted が返ります。

    • 代わりに、 kafka-remove-brokers でこの手順を実行できます。これは Confluent Platform 6.0.0 以降の Self-Balancing で利用できます。

      kafka-remove-brokers \
        --bootstrap-server localhost:9092 \
        --delete \
        --broker-id 3
      
    • 最後に、Confluent Control Center の Brokers overview ページでもブローカーを削除できます。

  2. Self-Balancing がブローカーを削除するのを監視します。

    Self-Balancing はサンプリングされたデータを使用でき、削除はほとんど即座に開始されます。ただし、削除は時間のかかる処理です(データの移動が関係するためです)。

    削除処理の進行中に、次のコマンドを実行して partition_reassignment_status および broker_shutdown_status を取得します(<cluster_id> を使用するクラスター ID で置換します)。

    curl localhost:8090/kafka/v3/clusters/:<cluster_id>/remove-broker-tasks/3
    

    モニタリングの代替オプションとして、--describe フラグを kafka-remove-brokers コマンドに使用することもできます。以下に例を示します。

    kafka-remove-brokers \
      --bootstrap-server localhost:9092 \
      --describe \
      --broker-id 3
    

ブローカーのエラー

ブローカーのエラーのシミュレーションを実行し、Self-Balancing がこの状況に対応するのを監視します。

  1. コンテナーを停止します。たとえば、ブローカー 4 を停止するには以下のとおり実行します。

    docker container stop $(docker container ls -q --filter name=kafka4)
    
  2. Self-Balancing が新しいレプリカを作成するのを監視します。

    Self-Balancing はサンプリングされたデータを使用でき、新しいレプリカの作成はほとんど即座に開始されます。

    ちなみに

    このデモでは、 confluent.balancer.heal.broker.failure.threshold.ms5000 に設定されています。これは、Self-Balancing がわずか 5 秒後にブローカーの停止を判断するということです。これは本稼働環境には適切でありません。一般的に、タイムアウトは 30 分から 1 時間(もしくは、実際の環境に合ったもの)に設定します。ただし、ここではデモを実行しやすくするため短くします。

強制停止

Docker を使って実行しているとき、Docker コンテナーとイメージを必要に応じて停止し、削除できます。

  1. docker container ls を使用して、動作しているすべての Docker コンテナーのリストを表示します。
  2. docker container stop を使用して、動作しているコンテナーを停止します。
  3. docker images ls を実行して、イメージのリストを表示します。
  4. docker image rm を実行して、システムで維持しないイメージを削除します。

docker-compose up -d コマンドを使用して、いつでもコンテナーを再構築し再起動できます(イメージを削除した場合、このコマンドで再取得されます)。

詳細については、Docker の公式ドキュメントを参照してください。