Cluster Linking のコマンド

Looking for Confluent Cloud Cluster Linking docs? You are currently viewing Confluent Platform documentation. If you are looking for Confluent Cloud docs, check out Cluster Linking on Confluent Cloud.

重要

この機能はプレビュー機能として利用できます。プレビュー機能とは、開発者から早い段階でフィードバックを受けるために提供している Confluent Platform のコンポーネントのことです。この機能は、評価用、本稼働環境以外でのテスト用、あるいは Confluent にフィードバックを提供するために使用できます。

Confluent Enterprise 6.0.0 には新しい Kafka コマンド kafka-cluster-links が含まれています。このコマンドが正しく動作するには、送信先クラスターで Confluent Platform 6.0.0 以降が実行されており、Cluster Linking が送信先クラスターで有効にされている必要があります。

このセクションでは、Cluster Linking のコマンドと例について、さらに Cluster Linking に関連する Confluent Platform の CLI ツール について説明します。

クラスターリンクの構成オプションは、kafka-cluster-links コマンドのフラグの値として使用できます。これらのオプションの一部は、以下のコマンド例のコンテキストの中で示されています。完全なリストは、「リンクプロパティ」にリファレンス形式で記載されています。

kafka-mirrors コマンド

クラスターリンク上でミラーリングされるトピックについての情報を表示したり、トピックを管理したりするには、kafka-mirrors を使用します。

Confluent Platform の他の Kafka コマンドと同様に、--bootstrap-serverkafka-mirrors コマンドの必須フラグです。

--bootstrap-server

(必須)クラスター内のブローカーの接続文字列で、host:port 形式です(ブローカーが複数存在する場合はコンマ区切りリストを指定できます)。ミラートピックの作成先となる送信先クラスターを指定する必要があります。送信先クラスターは、Confluent Platform 6.0.0 以降を実行している必要があります。これは Cluster Linking をサポートするために必要です。

  • 型: string
  • デフォルト: 空の文字列

以降のすべての kafka-mirrors の実装で --bootstrap-server を使用します。

トピックのミラーリングの停止

トピックのミラーリングを停止するには、kafka-mirrors --failover または kafka-mirrors --promote コマンドを使用します。どちらのコマンドもミラートピックを通常の書き込み可能トピックに変換しますが、それぞれ特定のユースケースを意図して設計されています。

  • promote コマンドは移行を意図したものであるため、ミラーリングを停止する前に、別途いくつかの操作が実行されます。
  • failover コマンドはディザスターリカバリを意図したものであるため、その作用はすぐに現れ、自動的に成功します。
  • dry-run オプションを使用すると、コマンドを実際に実行する前に、その結果をプレビューできます。

以下に例を示します。

ちなみに

kafka-topics --alter --mirror-action stop は、Confluent Platform 6.2.0 では promotefailover が採用されて非推奨となり、Confluent Platform 7.0.0 以降では利用できなくなります。以前のバージョンのドキュメントには、そのコマンドの構文が示されていますが、今後は promotefailover を使用することをお勧めします。

トピックの昇格

ミラーリングを停止してミラートピックを通常のトピックに変換する作業を正しい(一般に移行のシナリオに適した)プロセスで行うには、kafka-mirrors --promote を使用します。このコマンドは、送信元トピックとミラートピック間のラグが 0 であることをチェックし、メタデータ(コンシューマーグループのオフセットとトピックの構成)の最終的な同期を 1 回実行した後で、ミラートピックを通常のトピックに変換します。

注釈

promote コマンドでは、送信元トピックに対するデータの生成は停止されません。ラグが 0 であることのチェックに成功した後で、送信元トピックに対してプロデューサーから新たにデータが生成される可能性はあります。このケースでは、両者は分岐して 2 つの異なるトピックとなり、(それまでミラーリングされていた)通常のトピックにデータはミラーリングされません。

promote を使用して、同じクラスターリンク上の複数のトピックまたは 1 つのトピックを指定し、ミラーリングを停止することができます。コマンドの構文は次のとおりです。

kafka-mirrors --promote --topics <destination-topic-1> .. <destination-topic-n> \
--link <link name> --bootstrap-server <host:port>

以下に示すのは、example-link という名前のリンクを使用するトピック example-topic のミラーリングを、kafka-mirrors --promote を使用して停止する例です。

kafka-mirrors --promote --topics example-topic --link example-link --bootstrap-server localhost:9092

トピックのフェイルオーバー

直ちにミラーリングを停止し、ミラートピックを通常のトピックに変換するには、kafka-mirrors failover を使用します。通常、これは送信元クラスターに予期せず障害が発生した場合のディザスターリカバリに適しています。failover コマンドでは、別途のチェックや同期は実行されません。その働きは、promote の "強制版" といえます。1 つのトピックまたは複数のトピックを指定して、ミラーリングを停止することができます。

failover を使用して、同じクラスターリンク上の複数のトピックまたは 1 つのトピックを指定し、ミラーリングを停止することができます。コマンドの構文は次のとおりです。

kafka-mirrors --failover --topics <destination-topic-1> .. <destination-topic-n> \
--link <link name> --bootstrap-server <host:port>

以下に示すのは、example-link という名前のリンクを使用するトピック example-topic のミラーリングを、kafka-mirrors --failover を使用して停止する例です。

kafka-mirrors --failover --topics example-topic --link example-link --bootstrap-server localhost:9092

ドライラン

promote または failover コマンドを実行する前にその結果をテストするには、--dry-run フラグを追加します。その例を次に示します。

kafka-mirrors --failover --topics example-topic --link example-link --bootstrap-server localhost:9092 --dry-run

ミラートピックの一時停止と再開

pause(kafka-mirrors --pause)コマンドと resume(kafka-mirrors --unpause)コマンドを使用して一時的にミラーリングを一時停止したり再開したりできます。

ミラートピックを一時停止するには、次のようにします。

kafka-mirrors --pause --topics example-topic --link example-link --bootstrap-server localhost:9092

ミラートピックを再開するには、--unpause を使用します。

kafka-mirrors --unpause --topics example-topic --link example-link --bootstrap-server localhost:9092

送信元クラスターから送信先クラスターへのコンシューマーグループの移行

リンク内でコンシューマーグループを移行するには、リンク構成に consumer.offset.sync.enable=true を設定し、JSON ファイルにグループフィルターを指定します。そのファイルの名前を kafka-cluster-links コマンド--consumer-group-filters-json-file フラグの値として渡します。これは、クラスターリンクを作成するときか、または既存の構成のアップデートとして設定できます。

注釈

コンシューマーグループフィルターに含めるのは、送信先で使用されていないグループのみにしてください。そうすることで、送信先の他のコンシューマーによってコミットされたオフセットがシステムによってオーバーライドされるのを防ぐことができます。フィルターに含まれているグループが送信先でも使用されている場合、システムはそのフィルターを回避しようと試みますが、この場合、その保証はありません。オフセットが上書きされる可能性はあります。ミラートピックの "昇格" が正しく機能するためには、システムがオフセットをロールバックできることが必要ですが、グループが送信先コンシューマーによって使用されていると、ロールバックを実行できません。

この例は、"someGroup" グループを "broker-west" クラスターから "broker-east" クラスターに移行し、移行を実行する前に以下のフィルターセットですべてのオフセットを移行している状態になっていると仮定しています。

{"groupFilters": [
  {
    "name": "*",
    "patternType": "LITERAL",
    "filterType": "INCLUDE"
  }
]}

コンシューマーグループを送信元クラスターから送信先クラスターに移行するには、以下の手順に従います。

  1. 送信元クラスターのコンシューマーを停止します。

  2. consumer.offset.sync.ms の 2 倍の期間だけ待機します。

  3. Cluster Linking レプリケーションが最後のコミット済みオフセットを超えていることを確認します。これは、以下のコマンドで確認できます。

    • 送信元クラスターの CURRENT-OFFSET をチェックします。

      kafka-consumer-groups --bootstrap-server broker-west:19091 --describe --group someGroup
      

      出力は次のようになります。

      GROUP           TOPIC           PARTITION  CURRENT-OFFSET  LOG-END-OFFSET  LAG             CONSUMER-ID     HOST            CLIENT-ID
      someGroup       west-offsets    0          100             100             0               -               -               -
      
    • 送信先クラスターの LOG-END-OFFSET が、上の手順で記録した CURRENT-OFFSET 以上であることを確認します。

      kafka-consumer-groups --bootstrap-server broker-east:19092 --describe --group someGroup
      

      出力は次のようになります。

      GROUP           TOPIC           PARTITION  CURRENT-OFFSET  LOG-END-OFFSET  LAG             CONSUMER-ID     HOST            CLIENT-ID
      someGroup       west-offsets    0          100             100             0
      
  4. 以下のコマンドを実行して、現在のオフセットが送信元と送信先で一致していることを検証します。

    • 送信元クラスターの CURRENT-OFFSET をチェックします。

      kafka-consumer-groups --bootstrap-server broker-west:19091 --describe --group someGroup
      

      出力は次のようになります。

      GROUP           TOPIC           PARTITION  CURRENT-OFFSET  LOG-END-OFFSET  LAG             CONSUMER-ID     HOST            CLIENT-ID
      someGroup       west-offsets    0          100             100             0               -               -               -
      
    • 送信先クラスターの CURRENT-OFFSET をチェックします。

      kafka-consumer-groups --bootstrap-server broker-east:19092 --describe --group someGroup
      

      出力は次のようになります。

      GROUP           TOPIC           PARTITION  CURRENT-OFFSET  LOG-END-OFFSET  LAG             CONSUMER-ID     HOST            CLIENT-ID
      someGroup       west-offsets    0          100             100             0
      
  5. オフセット移行フィルターをアップデートして、移行処理からグループを削除します。

    echo "consumer.offset.group.filters={\"groupFilters\": [ \
      { \
        \"name\": \"*\", \
        \"patternType\": \"LITERAL\", \
        \"filterType\": \"INCLUDE\" \
      }, \
      { \
        \"name\": \"someGroup\", \
        \"patternType\": \"LITERAL\", \
        \"filterType\": \"EXCLUDE\" \
      } \
    ]}" > newFilters.properties
    kafka-configs --bootstrap-server broker-east:19092 --alter --cluster-link offsets-cluster-link --add-config-file newFilters.properties
    
  6. 送信先クラスターのコンシューマーを起動します。