Cluster Linking のコマンド

Looking for Confluent Cloud Cluster Linking docs? You are currently viewing Confluent Platform documentation. If you are looking for Confluent Cloud docs, check out Cluster Linking on Confluent Cloud.

Confluent Enterprise 7.0.0 には、Confluent Platform、Confluent Cloud、Kafka クラスターのデータとメタデータを Confluent Platform および Confluent Cloud クラスターと同期する、Cluster Linking と呼ばれる新機能が含まれています。

Cluster Linking は、2 つの重要な抽象化を使用してデータを Geo レプリケートします。

  • クラスターリンク: "送信元クラスター" と "送信先クラスター" を接続します。
  • ミラートピック: クラスターリンクの送信先クラスターにあるトピックで、送信元クラスターのトピックと同一のコピーです。

クラスターリンクとミラートピックの作成および管理には、Kafka REST v3 Proxy の REST API、またはターミナルで CLI コマンドを使用します。このセクションでは、クラスターリンクおよびミラートピックを作成および管理するためのコマンドについて説明します。

CLI コマンド

クラスターリンクの構成オプションは、kafka-cluster-links コマンドのフラグの値として使用できます。これらのオプションの一部は、以下のコマンド例のコンテキストの中で示されています。完全なリストは、「リンクプロパティ」にリファレンス形式で記載されています。

kafka-cluster-links は、クラスター間のリンクを作成および管理するために使用します。

Confluent Platform の他の Kafka コマンドと同様に、--bootstrap-serverkafka-cluster-links コマンドの必須フラグです。

--bootstrap-server

(必須)クラスター内のブローカーの接続文字列で、host:port 形式です(ブローカーが複数存在する場合はコンマ区切りリストを指定できます)。ミラートピックの作成先となる送信先クラスターを指定する必要があります。送信先クラスターは、Confluent Platform 7.0.0 以降を実行している必要があります。これは Cluster Linking をサポートするために必要です。

  • 型: 文字列
  • デフォルト: 空の文字列

以降のすべての kafka-cluster-links の実装で --bootstrap-server を使用します。

ちなみに

ほとんどの場合、--bootstrap-server はコマンドの最初(kafka-cluster-links の直後)または最後に指定できます。以下のセクションの例では、--bootstrap-serverkafka-cluster-links の直後に使用されています。

ミラートピックの作成

ミラートピックは、別のトピック内のすべてのデータとメタデータを反映した読み取り専用トピックです。

CLI でミラートピックを作成するには、kafka-mirrors ツールを使用します。ミラートピックが作成されると、ミラーリングでは自動的に送信元トピックからのデータのフェッチを開始します。

詳細については、「ミラートピック」を参照してください。

コマンド例

kafka-mirrors --create --mirror-topic example-topic \
--link demo-link \
--bootstrap-server localhost:9092

出力例

Created topic example-topic.

ミラートピックを作成するには、kafka-cluster-linksbootstrap-server と以下のフラグとともに使用します。

--mirror-topic

(必須)作成するミラートピックの名前。これは、クラスターリンクを通じてミラーリングするソーストピックと正確に一致する必要があります。

  • 型: 文字列
--link

(必須)ソーストピックからデータを取得するために使用するクラスターリンクの名前です。

  • 型: 文字列

ちなみに

  • --mirror-topic の値は元の --topic の名前と一致している必要があります。
  • ミラートピックのリストを表示する には、--include-topics フラグを指定して kafka-cluster-links --list を実行します。
--command-config
AdminClient に渡される構成が含まれているプロパティファイルです。たとえば、認可および認証のセキュリティ認証情報が含まれます。

以下は、ミラートピック作成時のオプションの構成です。

--config

ミラートピックの作成時にオーバーライドする構成のコンマ区切りのリスト。オーバーライドする各構成は、name=value として指定する必要があります。ミラートピックに設定できる構成の詳細については、ミラートピックの「構成」を参照してください。

  • 型: 文字列
--replication-factor

作成されるミラートピックのレプリケーション係数。指定しない場合、"デフォルトは送信先クラスターのデフォルト" になります。送信元トピックのレプリケーション係数ではありません。

  • 型: 文字列

ミラートピックを作成するには、ALTER CLUSTER 認可が必要です。

ミラートピックのリストの表示

クラスターのすべてのクラスターリンクのミラートピックを表示するには、kafka-mirrors --list を使用します。このコマンドでは、特定のクラスターリンクのミラートピックのリスト、またはクラスターのすべてのクラスターリンクの全ミラートピックのリストを表示できます。

コマンド例

kafka-mirrors --list --bootstrap-server localhost:9092

出力例

topic1
topic2
topic3
topic4

コマンドに以下のパラメーターを追加できます。

--link

(任意)フィルタリングするクラスターリンクの名前。指定した場合、このクラスターリンクのミラートピックのみのリストが表示されます。

  • 型: 文字列
--include-stopped
(任意)このフラグを追加した場合は、以前はミラートピックであったものの promote または failover コマンドによって停止されたトピックもリストに含まれます。このフラグは引数を取りません。

ミラートピックの詳細表示

クラスターリンク上でミラーリングされるトピックについての情報を表示したり、トピックを管理したりするには、kafka-mirrors を使用します。

kafka-mirrors --describe --bootstrap-server pkc-nwnyk.us-west-2.aws.confluent.cloud:9092 --command-config lkc-rn220.config --link onprem-to-cloud

Topic: web.orders.modified  LinkName: onprem-to-cloud LinkId: 2c3dee72-f54b-4bb3-9694-cf29caaf3637  MirrorTopic: web.orders.modified  State: ACTIVE StateTime: 2021-11-10 15:33:29
  Partition: 0  State: ACTIVE DestLogEndOffset: 114123  LastFetchSourceHighWatermark: 114123  Lag: 0  TimeSinceLastFetchMs: 8385101
  Partition: 1  State: ACTIVE DestLogEndOffset: 115278  LastFetchSourceHighWatermark: 115278  Lag: 0  TimeSinceLastFetchMs: 8387954
  Partition: 2  State: ACTIVE DestLogEndOffset: 112210  LastFetchSourceHighWatermark: 112210  Lag: 0  TimeSinceLastFetchMs: 8508856
  Partition: 3  State: ACTIVE DestLogEndOffset: 120887  LastFetchSourceHighWatermark: 120887  Lag: 0  TimeSinceLastFetchMs: 8389749
  Partition: 4  State: ACTIVE DestLogEndOffset: 109225  LastFetchSourceHighWatermark: 109225  Lag: 0  TimeSinceLastFetchMs: 8385101
  Partition: 5  State: ACTIVE DestLogEndOffset: 111669  LastFetchSourceHighWatermark: 111669  Lag: 0  TimeSinceLastFetchMs: 8387954
Topic: web.orders LinkName: onprem-to-cloud LinkId: 2c3dee72-f54b-4bb3-9694-cf29caaf3637  MirrorTopic: web.orders State: ACTIVE StateTime: 2021-11-10 15:33:29
  Partition: 0  State: ACTIVE DestLogEndOffset: 294760  LastFetchSourceHighWatermark: 294760  Lag: 0  TimeSinceLastFetchMs: 8387954
  Partition: 1  State: ACTIVE DestLogEndOffset: 285862  LastFetchSourceHighWatermark: 285862  Lag: 0  TimeSinceLastFetchMs: 8508856
  Partition: 2  State: ACTIVE DestLogEndOffset: 284891  LastFetchSourceHighWatermark: 284891  Lag: 0  TimeSinceLastFetchMs: 8389749
  Partition: 3  State: ACTIVE DestLogEndOffset: 285982  LastFetchSourceHighWatermark: 285982  Lag: 0  TimeSinceLastFetchMs: 8385101
  Partition: 4  State: ACTIVE DestLogEndOffset: 277379  LastFetchSourceHighWatermark: 277379  Lag: 0  TimeSinceLastFetchMs: 8387954
  Partition: 5  State: ACTIVE DestLogEndOffset: 283731  LastFetchSourceHighWatermark: 283731  Lag: 0  TimeSinceLastFetchMs: 8508856
Topic: inventory.shipments  LinkName: onprem-to-cloud LinkId: 2c3dee72-f54b-4bb3-9694-cf29caaf3637  MirrorTopic: inventory.shipments  State: ACTIVE StateTime: 2021-11-10 15:51:21

Confluent Platform の他の Kafka コマンドと同様に、--bootstrap-serverkafka-mirrors コマンドの必須フラグです。

--bootstrap-server

(必須)クラスター内のブローカーの接続文字列で、host:port 形式です(ブローカーが複数存在する場合はコンマ区切りリストを指定できます)。ミラートピックの作成先となる送信先クラスターを指定する必要があります。送信先クラスターは、Confluent Platform 6.0.0 以降を実行している必要があります。これは Cluster Linking をサポートするために必要です。

  • 型: 文字列
  • デフォルト: 空の文字列

コマンドに以下のパラメーターを追加できます。

--link

(任意)フィルタリングするクラスターリンクの名前。指定した場合、このクラスターリンクのミラートピックのみのリストが表示されます。

  • 型: 文字列
--include-stopped
(任意)このフラグを追加した場合は、以前はミラートピックであったものの promote または failover コマンドによって停止されたトピックもリストに含まれます。このフラグは引数を取りません。

トピックのミラーリングの停止

トピックのミラーリングを停止するには、kafka-mirrors --failover または kafka-mirrors --promote コマンドを使用します。どちらのコマンドもミラートピックを通常の書き込み可能トピックに永続的に変換しますが、それぞれ特定のユースケースを意図して設計されています。

  • promote コマンドは移行を意図したものであるため、ミラーリングを停止する前に、別途いくつかの検証と操作が実行されます。
  • failover コマンドはディザスターリカバリを意図したものであるため、すぐに反映され、常に成功します。他の操作は実行されません。
  • dry-run オプションを使用すると、コマンドを実際に実行する前に、その結果をプレビューできます。

promote および failover コマンドでは、--topics フラグとコンマ区切りのトピック名のリストを使用してトピック名のリストを渡すことによって、複数のトピックでコマンドを同時に実行できます。1 トピックのみをプロモートする場合でも、--topics (複数形)フラグを使用します。その例を次に示します。

--topics topic1

--topics topic1,topic2,topic3

その他の例を以下に示します。

トピックの昇格

ミラーリングを停止してミラートピックを通常のトピックに変換する作業を正しい(一般に移行のシナリオに適した)プロセスで行うには、kafka-mirrors --promote を使用します。このコマンドは、送信元トピックとミラートピック間のラグが 0 であることをチェックし、メタデータ(コンシューマーグループのオフセットとトピックの構成)の最終的な同期を 1 回実行した後で、ミラートピックを通常のトピックに変換します。

注釈

  • promote コマンドでは、送信元トピックに対するデータの生成は停止されません。ラグが 0 であることのチェックに成功した後で、送信元トピックに対してプロデューサーから新たにデータが生成される可能性はあります。このケースでは、両者は分岐して 2 つの異なるトピックとなり、(それまでミラーリングされていた)通常のトピックにデータはミラーリングされません。
  • promote コマンド実行時に、送信元トピックとミラートピックとの間にラグがあった場合、promote コマンドは失敗します。

promote を使用して、同じクラスターリンク上の複数のトピックまたは 1 つのトピックを指定し、ミラーリングを停止することができます。コマンドの構文は次のとおりです。

kafka-mirrors --promote --topics <destination-topic-1> .. <destination-topic-n> \
--link <link name> --bootstrap-server <host:port>

以下に示すのは、example-link という名前のリンクを使用するトピック example-topic のミラーリングを、kafka-mirrors --promote を使用して停止する例です。

kafka-mirrors --promote --topics example-topic --link example-link --bootstrap-server localhost:9092

トピックのフェイルオーバー

直ちにミラーリングを停止し、ミラートピックを通常のトピックに変換するには、kafka-mirrors failover を使用します。通常、これは送信元クラスターに予期せず障害が発生した場合のディザスターリカバリに適しています。failover コマンドでは、別途のチェックや同期は実行されません。その働きは、promote の "強制版" といえます。1 つのトピックまたは複数のトピックを指定して、ミラーリングを停止することができます。

failover を使用して、同じクラスターリンク上の複数のトピックまたは 1 つのトピックを指定し、ミラーリングを停止することができます。コマンドの構文は次のとおりです。

kafka-mirrors --failover --topics <destination-topic-1> .. <destination-topic-n> \
--link <link name> --bootstrap-server <host:port>

以下に示すのは、example-link という名前のリンクを使用するトピック example-topic のミラーリングを、kafka-mirrors --failover を使用して停止する例です。

kafka-mirrors --failover --topics example-topic --link example-link --bootstrap-server localhost:9092

ドライラン

promote または failover コマンドを実行する前にその結果をテストするには、--dry-run フラグを追加します。その例を次に示します。

kafka-mirrors --failover --topics example-topic --link example-link --bootstrap-server localhost:9092 --dry-run

ミラートピックの一時停止と再開

pause(kafka-mirrors --pause)コマンドと resume(kafka-mirrors --unpause)コマンドを使用して一時的にミラーリングを一時停止したり再開したりできます。

ミラートピックを一時停止するには、次のようにします。

kafka-mirrors --pause --topics example-topic --link example-link --bootstrap-server localhost:9092

ミラートピックを再開するには、--unpause を使用します。

kafka-mirrors --unpause --topics example-topic --link example-link --bootstrap-server localhost:9092

送信元クラスターから送信先クラスターへのコンシューマーグループの移行

リンク内でコンシューマーグループを移行するには、リンク構成に consumer.offset.sync.enable=true を設定し、JSON ファイルにグループフィルターを指定します。そのファイルの名前を CLI コマンド--consumer-group-filters-json-file フラグの値として渡します。これは、クラスターリンクを作成するときか、または既存の構成のアップデートとして設定できます。

注釈

コンシューマーグループフィルターに含めるのは、送信先で使用されていないグループのみにしてください。そうすることで、送信先の他のコンシューマーによってコミットされたオフセットがシステムによってオーバーライドされるのを防ぐことができます。フィルターに含まれているグループが送信先でも使用されている場合、システムはそのフィルターを回避しようと試みますが、この場合、その保証はありません。オフセットが上書きされる可能性はあります。ミラートピックの "昇格" が正しく機能するためには、システムがオフセットをロールバックできることが必要ですが、グループが送信先コンシューマーによって使用されていると、ロールバックを実行できません。

この例は、"someGroup" グループを "broker-west" クラスターから "broker-east" クラスターに移行し、移行を実行する前に以下のフィルターセットですべてのオフセットを移行している状態になっていると仮定しています。

{"groupFilters": [
  {
    "name": "*",
    "patternType": "LITERAL",
    "filterType": "INCLUDE"
  }
]}

コンシューマーグループを送信元クラスターから送信先クラスターに移行するには、以下の手順に従います。

  1. 送信元クラスターのコンシューマーを停止します。

  2. consumer.offset.sync.ms の 2 倍の期間だけ待機します。

  3. Cluster Linking レプリケーションが最後のコミット済みオフセットを超えていることを確認します。これは、以下のコマンドで確認できます。

    • 送信元クラスターの CURRENT-OFFSET をチェックします。

      kafka-consumer-groups --bootstrap-server broker-west:19091 --describe --group someGroup
      

      出力は次のようになります。

      GROUP           TOPIC           PARTITION  CURRENT-OFFSET  LOG-END-OFFSET  LAG             CONSUMER-ID     HOST            CLIENT-ID
      someGroup       west-offsets    0          100             100             0               -               -               -
      
    • 送信先クラスターの LOG-END-OFFSET が、上の手順で記録した CURRENT-OFFSET 以上であることを確認します。

      kafka-consumer-groups --bootstrap-server broker-east:19092 --describe --group someGroup
      

      出力は次のようになります。

      GROUP           TOPIC           PARTITION  CURRENT-OFFSET  LOG-END-OFFSET  LAG             CONSUMER-ID     HOST            CLIENT-ID
      someGroup       west-offsets    0          100             100             0
      
  4. 以下のコマンドを実行して、現在のオフセットが送信元と送信先で一致していることを検証します。

    • 送信元クラスターの CURRENT-OFFSET をチェックします。

      kafka-consumer-groups --bootstrap-server broker-west:19091 --describe --group someGroup
      

      出力は次のようになります。

      GROUP           TOPIC           PARTITION  CURRENT-OFFSET  LOG-END-OFFSET  LAG             CONSUMER-ID     HOST            CLIENT-ID
      someGroup       west-offsets    0          100             100             0               -               -               -
      
    • 送信先クラスターの CURRENT-OFFSET をチェックします。

      kafka-consumer-groups --bootstrap-server broker-east:19092 --describe --group someGroup
      

      出力は次のようになります。

      GROUP           TOPIC           PARTITION  CURRENT-OFFSET  LOG-END-OFFSET  LAG             CONSUMER-ID     HOST            CLIENT-ID
      someGroup       west-offsets    0          100             100             0
      
  5. オフセット移行フィルターをアップデートして、移行処理からグループを削除します。

    echo "consumer.offset.group.filters={\"groupFilters\": [ \
      { \
        \"name\": \"*\", \
        \"patternType\": \"LITERAL\", \
        \"filterType\": \"INCLUDE\" \
      }, \
      { \
        \"name\": \"someGroup\", \
        \"patternType\": \"LITERAL\", \
        \"filterType\": \"EXCLUDE\" \
      } \
    ]}" > newFilters.properties
    kafka-configs --bootstrap-server broker-east:19092 --alter --cluster-link offsets-cluster-link --add-config-file newFilters.properties
    
  6. 送信先クラスターのコンシューマーを起動します。

REST API コマンド

Cluster Linking REST API については、REST API Proxy v3 のドキュメント に記載されています。