Confluent Platform の Cluster Linking ドキュメントをお探しですか? このページは Confluent Cloud における Cluster Linking の説明です。Confluent Platform のドキュメントをお探しの場合は、Confluent Platform の「Cluster Linking」 をご覧ください。
ディザスターリカバリとフェイルオーバー¶
はじめに¶
Cluster Linking を使用してディザスターリカバリ戦略をデプロイすることにより、パブリッククラウドプロバイダーの機能停止など、予測できない災害が発生したときのデータの損失とダウンタイムを最小限に抑え、ミッションクリティカルなアプリケーションの可用性と信頼性を高めることができます。このドキュメントでは、ディザスターリカバリ戦略の設計とフェイルオーバーの実行方法について説明します。
Cluster Linking を使用できるクラスターについては、「サポートされるクラスターのタイプ」を参照してください。
Cluster Linking を使用したディザスターリカバリの目的¶
まず、"プライマリ" クラスターには、そのトピックのデータのほか、コンシューマーオフセットや ACL などの運用メタデータが格納されます。それらのトピックにデータを送信するプロデューサーとそれらのトピックからデータを読み取るコンシューマーがアプリケーションの核となります。
Cluster Linking を使用すると、プライマリクラスターとは異なるリージョンまたはクラウドにディザスターリカバリ("DR")クラスターを作成できます。プライマリクラスターが停止した場合も、DR クラスターにデータとメタデータの最新のコピーが残っています。プロデューサーとコンシューマーは DR クラスターに切り替え、短いダウンタイムと最小限のデータ損失で稼働し続けることができます。こうしてアプリケーションは、災害の発生時も自社や顧客へのサービスを継続することができます。
ディザスターリカバリクラスターのセットアップ¶
災害が発生したときにいつでもディザスターリカバリ("DR")クラスターを使用できるよう備えるためには、プライマリクラスターのトピックデータ、コンシューマーグループオフセット、ACL について、その最新のコピーを DR クラスターに用意する必要があります。
- コンシューマーがまだ消費していないメッセージを処理できるよう、DR クラスターには最新のトピックデータが必要です。ラグが発生しているコンシューマーは、メッセージの損失を最小限にトピックデータの処理を継続できます。今後作成するコンシューマーでは、災害前に生成されたデータを一切失わずに履歴データを処理できます。これによって災害発生時の目標復旧時点("RPO")を短くすることができます。
- DR クラスターに切り替わったときにコンシューマーが、中断したところからメッセージ処理を続行できるよう、DR クラスターには最新のコンシューマーグループオフセットが必要です。コンシューマーが重複して読み取るメッセージの数をできるだけ少なくすることで、アプリケーションのダウンタイムを最小限に抑えることができます。これによって目標復旧時間("RTO")を短くすることができます。
- スイッチオーバーしたときには既にプロデューサーとコンシューマーに DR クラスターへの接続が認可されているような状態にするには、DR クラスターに最新の ACL が必要です。あらかじめ ACL を設定し、最新の状態にしておくことは、RTO の短縮にもつながります。
Cluster Linking で使用するディザスターリカバリクラスターをセットアップするには、以下の手順に従います。
- 必要に応じて、DR クラスターとして使用する、パブリックインターネットを備えた新しい専用 Confluent Cloud クラスターを異なるリージョンまたはクラウドプロバイダーに作成します。
- プライマリクラスターから DR クラスターへのクラスターリンクを作成します。クラスターリンクには、次の構成を適用する必要があります。
- コンシューマーオフセットの同期を有効にします。一部のコンシューマーグループのみを DR クラスターにフェイルオーバーする場合は、フィルターを使用して該当するコンシューマーグループ名のみを選択してください。それ以外の場合、すべてのコンシューマーグループ名を同期します。
- ACL の同期を有効にします。一部の Kafka クライアントのみを DR クラスターにフェイルオーバーする場合は、フィルターを使用して該当するクライアントのみを選択してください。それ以外の場合、すべての ACL を同期します。
- クラスターリンクを使用し、プライマリクラスターの各トピックについて、DR クラスターにミラートピックを作成します。一部のトピックについてのみ DR が必要である場合は、該当するトピックのミラートピックのみを作成します。
以上の手順を行ったうえで、プライマリクラスターのデータとメタデータに対する最新の変更を反映したディザスターリカバリクラスターを作成します。
DR を必要とする新しいトピックをプライマリクラスターに作成したら、そのつど、そのミラートピックを DR クラスターに作成してください。
ちなみに
各 Kafka クライアントには、その接続先のクラスターごとに API キーとシークレットが必要となります。RTO を短くするため、DR クラスターの API キーをあらかじめ作成してコンテナーに格納し、その DR クラスターに接続する Kafka クライアントがキーを取得できるようにしておいてください。
ディザスターリカバリクラスターのモニタリング¶
プライマリクラスターが災害に見舞われたとき、データの損失を最小限に抑えるためには、ディザスターリカバリ("DR")クラスターがプライマリクラスターの最新の情報を常に把握しておく必要があります。Cluster Linking は "非同期" のプロセスであるため、"ラグ"、つまりプライマリクラスターには存在するものの、DR クラスターにはまだミラーリングされていないメッセージが生じることがあります。災害が発生すると、ラグのあるデータは失われるおそれがあります。
Metrics API を使用したラグのモニタリング¶
災害の発生中にミラートピックで失われるおそれのあるデータの量は、組み込みのメトリクスを使用して DR クラスターのラグをモニタリングすることで確認できます。ミラートピックのパーティションでラグの生じるメッセージの推定最大数は、Metrics API の ミラーラグメトリック によってレポートされます。
CLI でのラグの確認¶
特定時点におけるミラートピックのラグを Confluent CLI で確認する方法は 2 つあります。
confluent kafka mirror list
を実行すると、送信先クラスター上のすべてのミラートピックが一覧表示されます。その中のMax Per Partition Mirror Lag
という列に、各ミラートピックのパーティションにおけるラグの最大量が示されています。--link
フラグや--mirror-status
フラグを使用すると、特定のクラスターリンクやミラートピックのステータスを条件としてフィルター処理を行うことができます。confluent kafka mirror describe <mirror-topic> --link <link-name>
を実行すると、そのミラートピックの各パーティションについての詳細な情報が表示されます。その中のPartition Mirror Lag
という列に、各パーティションの推定ラグが示されています。
REST API でのラグの照会¶
Confluent Community REST API は、ミラートピックのパーティションとそのエンドポイントにおけるラグのリストを返します。
/kafka/v3/clusters/<destination-cluster-id>/links/<link-name>/mirrors
は、クラスターリンクのすべてのミラートピックを返します。/kafka/v3/clusters/<destination-cluster-id>/links/<link-name>/mirrors/<mirror-topic>
は、指定されたミラートピックのみを返します。
パーティションとラグのリストの形式は次のとおりです。
"mirror_lags": [
{
"partition": 0,
"lag": 24
},
{
"partition": 1,
"lag": 42
},
{
"partition": 2,
"lag": 15
},
...
],
チュートリアル¶
このユースケースを確認するために、Cluster Linking を使用して、トピックと単純なコマンドラインコンシューマーおよびプロデューサーをオリジナルクラスターからディザスターリカバリ("DR")クラスターにフェイルオーバーします。
前提条件¶
- 既に Confluent CLI がある場合は、最新の状態であることを確認します。 既に CLI がインストールされている場合は、新しい Cluster Linking コマンドとツールを備えた最新バージョンであることを確認します。既に Confluent Cloud CLI がある場合は、
ccloud update --major
を実行すると、利用可能なアップデートと confluent v2.0 への直接アップグレードのプロンプトが表示されます。既に新しい統合 CLI がある場合はconfluent update
を実行します。詳細については、クイックスタートの前提条件の「Confluent CLI の最新バージョンの取得」を参照してください。 - 「概要」の「コマンドと前提条件」に記載された手順に従います。これらの手順には、まだお持ちでない方のために、最新バージョンの Confluent Cloud を取得する最も簡単な方法が記されているほか、Cluster Linking コマンドの概要が簡単に紹介されています。
- DR クラスターは、パブリックインターネットエンドポイントを持つ 専用 クラスターであることが必要です。
- オリジナルクラスターには、パブリックインターネットエンドポイントに接続された、ベーシック、スタンダード、または 専用 クラスターを使用できます。これらのクラスターがまだ存在しない場合は、Cloud Console または Confluent CLI で作成できます。
注釈
有効な Confluent Platform クラスターと有効な Confluent Cloud クラスターの間でフェイルオーバーを使用できます。Confluent Cloud クラスターと Confluent Platform クラスターには 統合 Confluent CLI を使用する必要があります。インストール手順と新しい統合 CLI の概要については、こちら を参照してください。
チュートリアルの内容¶
このチュートリアルでは、Confluent CLI Cluster Linking コマンドを使用した DR クラスターの作成とそれに対するフェイルオーバーを解説します。
将来のリリースで、Cluster Linking コマンドの REST API が利用できるようになる可能性があります。
最初に、DR クラスター(送信先)へのクラスターリンクを構築し、関連するすべてのトピック、ACL、およびコンシューマーグループオフセットをミラーリングします。これは、"定常状態" のセットアップです。
次に、オリジナル(送信元)クラスターを停止させてみます。この場合、プロデューサー、コンシューマー、クラスターリンクはオリジナルクラスターとやり取りできなくなります。
次に、DR クラスターのミラートピックを通常のトピックに変換するフェイルオーバーコマンドを呼び出します。
最後に、プロデューサーとコンシューマーを DR クラスターに移動して、運用を続けます。これで、DR クラスターが新しい Source of Truth(信頼できる唯一の情報源)になりました。
定常状態のセットアップ¶
使用するクラスターを作成または選択します。¶
Confluent Cloud ウェブ UI にログオンします。
まだクラスターを作成していない場合は、「Confluent Cloud でのクラスターの作成」の説明に従い、同じ環境内に 2 つのクラスターを作成します。
これらの少なくとも一方は 専用クラスター である必要があります。これが DR クラスター(ミラートピックのホストとなる "送信先")として動作します。
オリジナルクラスターまたは送信元クラスターのタイプはどれでもかまいません。すぐに識別できるよう、送信元クラスターはベーシッククラスターまたはスタンダードクラスターとし、それらのクラスターに ORIGINAL と DR という名前を付けることをお勧めします。
必要な情報の記録¶
ちなみに
この情報を常時監視するには、Confluent CLI のコマンド出力をテキストファイルに単に保存するか、シェル環境変数を使用するのが最も簡単です。そのようにする場合は、後でファイルを削除するか、セキュリティコードだけをより安全なストレージに移動するなどして、API キーとシークレットを必ず保護してください。その方法を含め、時間短縮のためのヒントについては、「概要」の「CLI の高度なヒント」に詳しく説明されています。
このウォークスルーでは、Confluent CLI のコマンドから次の情報にアクセスできる必要があります。
オリジナルクラスターのクラスター ID (このチュートリアルでは
<original-cluster-id>
)。Confluent Cloud のクラスター ID を取得するには、Confluent CLI から以下のコマンドを入力します。以下の例では、オリジナルクラスターの ID はlkc-xkd1g
です。confluent kafka cluster list
オリジナルクラスター用のブートストラップサーバー。これを取得するには、次のコマンドを入力します。
<original-cluster-id>
は、送信元クラスターのクラスター ID に置き換えてください。confluent kafka cluster describe <original-cluster-id>
この出力は以下のようになります。以下の出力例で、ブートストラップサーバーの値は
SASL_SSL://pkc-9kyp5.us-east-1.aws.confluent.cloud:9092
です。このチュートリアルでは、この値を<original-bootstrap-server>
として参照します。+--------------+---------------------------------------------------------+ | Id | lkc-xkd1g | | Name | AWS US | | Type | DEDICATED | | Ingress | 50 | | Egress | 150 | | Storage | Infinite | | Provider | aws | | Availability | single-zone | | Region | us-east-1 | | Status | UP | | Endpoint | SASL_SSL://pkc-9kyp5.us-east-1.aws.confluent.cloud:9092 | | ApiEndpoint | https://pkac-rrwjk.us-east-1.aws.confluent.cloud | | RestEndpoint | https://pkc-9kyp5.us-east-1.aws.confluent.cloud:443 | | ClusterSize | 1 | +--------------+---------------------------------------------------------+
DR クラスターのクラスター ID (
<DR-cluster-id>
)。これは、オリジナルクラスターの ID と同じように、confluent kafka cluster list
コマンドで取得できます。confluent kafka cluster list
DR クラスターのブートストラップサーバー (
<DR-bootstrap-server>
)。これは、オリジナルクラスターのブートストラップサーバーと同じように、describe コマンドを使用して取得できます。<DR-cluster-id>
は送信先クラスター ID に置き換えてください。confluent kafka cluster describe <DR-cluster-id>
オリジナルクラスターと DR クラスター間のクラスターリンクの作成¶
最初に、ソースクラスターのトピック、ACL、およびコンシューマーグループオフセットを送信先クラスターにミラーリングするクラスターリンクを作成します。
クラスターリンクが送信元クラスターのトピックにアクセスするための権限のセットアップ¶
クラスターリンクには、送信元クラスター上の適切なトピックを読み取るための権限が必要です。これらの権限を付与するために、以下の 2 つのメカニズムを作成します。
- クラスターリンク用の サービスアカウント。サービスアカウントは、Confluent Cloud で、Confluent Cloud リソースへのアクセスを必要とするアプリケーションとエンティティをグループ化するために使用されます。
- クラスターリンクのサービスアカウントとソースクラスターに関連付けられている API キーとシークレット。リンクによって、トピック情報とメッセージがフェッチされる際に、ソースクラスターでの認証にこの API キーが使用されます。サービスアカウントは多数の API キーを所有できますが、ここで必要なのは 1 つだけです。
これらのリソースを作成するには、以下を実行します。
このクラスターリンク用のサービスアカウントを作成します。
confluent iam service-account create Cluster-Linking-Demo --description "For the cluster link created for the DR failover tutorial"
この出力は以下のようになります。
+-------------+-----------+ | Id | 254122 | | Resource ID | sa-lqxn16 | | Name | ... | | Description | ... | +-------------+-----------+
ID フィールド(このチュートリアルでは
<service-account-id>
)を保存します。API キーとシークレットを作成します。
confluent api-key create --resource <original-cluster-id> --service-account <service-account-id>
注釈
このキーとシークレットを安全なところに保管します。 クラスターリンクを作成する場合は、この API キーとシークレットをリンクに指定する必要があるので、クラスターリンク自体に保存されます。
クラスターリンクによるソースクラスターのトピックの読み取りを許可します。クラスターリンクのサービスアカウントに、すべてのトピックに対する READ および DESCRIBE_CONFIGS を実行できる ACL を付与します。
confluent kafka acl create --allow --service-account <service-account-id> --operation READ --operation DESCRIBE_CONFIGS --topic "*" --cluster <original-cluster-id>
ちなみに
上記の例では、特定のトピックの代わりにアスタリスク(
--topic "*"
)を使用することで、すべてのトピックの読み取りアクセスを許可しています。必要な場合は、ミラー対象のトピックを特定のセットに絞り込むこともできます。たとえば、クラスターリンクに対して、"clicks" プレフィックスで始まるすべてのトピックの読み取りを許可するには、以下を実行できます。confluent kafka acl create --allow --service-account <service-account-id> --operation READ --operation DESCRIBE_CONFIGS --topic "clicks" --prefix --cluster <original-cluster-id>
ソースの ACL を送信先クラスターに同期できるようにします。
これで、障害が発生しても、コンシューマー、プロデューサー、その他のサービスが実行を継続できます。
このためには、クラスターリンクのサービスアカウントに、ソースクラスターに対して DESCRIBE を実行する ACL を付与する必要があります。
confluent kafka acl create --allow --service-account <service-account-id> --operation DESCRIBE --cluster-scope --cluster <original-cluster-id>
ミラートピックのコンシューマーグループオフセットをこのクラスターリンクに同期できるようにします。これで、コンシューマーは中断したオフセットから再開できます。
このためには、2 つの ACL セットがクラスターに必要です。
クラスターリンクのサービスアカウントに、ソース(オリジナル)クラスターのトピックに対する DESCRIBE、コンシューマーグループに対する READ および DESCRIBE を実行するための適切な ACL を付与します。
confluent kafka acl create --allow --service-account <service-account-id> --operation DESCRIBE --topic "*" --cluster <original-cluster-id>
confluent kafka acl create --allow --service-account <service-account-id> --operation READ --operation DESCRIBE --consumer-group "*" --cluster <original-cluster-id>
クラスターリンクのサービスアカウントに、送信先(DR)クラスターのトピックに対する READ および ALTER を実行するための ACL と、そのコンシューマーグループに対する READ を実行するための ACL を付与します。
confluent kafka acl create --allow --service-account <service-account-id> --operation READ --operation ALTER --topic "*" --cluster <DR-cluster-id>
confluent kafka acl create --allow --service-account <service-account-id> --operation READ --consumer-group "*" --cluster <DR-cluster-id>
クラスターリンクの作成¶
ACL の同期とコンシューマーグループオフセットの同期を有効にする構成ファイルを作成します。このファイルにリンクのセキュリティ認証情報も含めます。
これを行うには、
dr-link.config
という名前の新規ファイルに以下の行をコピーし、<api-key>
と<api-secret>
を、作成したキーとシークレットに置き換えます。consumer.offset.sync.enable=true consumer.offset.group.filters={"groupFilters": [{"name": "*","patternType": "LITERAL","filterType": "INCLUDE"}]} consumer.offset.sync.ms=1000 acl.sync.enable=true acl.sync.ms=1000 acl.filters={ "aclFilters": [ { "resourceFilter": { "resourceType": "any", "patternType": "any" }, "accessFilter": { "operation": "any", "permissionType": "any" } } ] } topic.config.sync.ms=1000 security.protocol=SASL_SSL sasl.mechanism=PLAIN sasl.jaas.config=org.apache.kafka.common.security.plain.PlainLoginModule required username="<api-key>" password="<api-secret>";
この構成で注意する点があります。それらは以下を実行します。
- これらのミラートピックのすべてのコンシューマーグループ(
*
)のオフセットを同期します。これをフィルター処理するには、包含("filterType": "INCLUDE"
)または除外("filterType": "EXCLUDE"
)する正確なトピック名("patternType": "LITERAL"
)またはプレフィックス("patternType": "PREFIX"
)を渡します。 - コンシューマー、プロデューサー、その他のサービスが DR クラスターのトピックにアクセスできるように、オリジナルクラスター上のすべての ACL(
*
)を同期します。コンシューマーグループオフセットの同期とは異なり、これはミラートピックに限らず他のトピックの ACL も同期できます。これをフィルター処理するには、包含("filterType": "INCLUDE"
)または除外("filterType": "EXCLUDE"
)する正確なトピック名("patternType": "LITERAL"
)またはプレフィックス("patternType": "PREFIX"
)を渡します。 - コンシューマーオフセット、ACL、トピック構成を 1000 ミリ秒ごとに同期します。したがって、これらについて、より新しい値をリンクできます。これと引き換えにリンクのデータスループットが高くなり、トピックデータの最大スループットが低下する可能性があります。値を変えて試し、クラスターリンクに最適な値を設定することをお勧めします。
- ファイルの最後の行は、
sasl.jaas.config
で始まり、セミコロン(;
)で終わります。例示したように、全体が 1 行で入力されている必要があります。
- これらのミラートピックのすべてのコンシューマーグループ(
以下のように
confluent kafka link create <flags>
コマンドを使ってクラスターリンクを作成します。この例では、クラスターリンクの名前は
dr-link
です。confluent kafka link create dr-link \ --cluster <dr-cluster-id> \ --source-cluster-id <original-cluster-id> \ --source-bootstrap-server <original-bootstrap-server> \ --config-file dr-link.config
このコマンドが成功した場合、
Created cluster link "dr-link".
というメッセージが返されます。また、リンクが作成されたことは、既存のリンクをリスト表示することでも確認できます。
confluent kafka link list --cluster <DR-cluster-id>
オリジナルクラスターのソーストピックと DR クラスターのミラートピックの構成¶
オリジナルクラスターに
dr-topic
というトピックを作成します。このデモでは便宜上、このトピックをパーティション 1 つだけ(
--partitions 1
)で作成します。パーティションを 1 つだけにすると、オリジナルクラスターから DR クラスターでコンシューマーオフセットがどのように同期されるかがわかりやすくなります。confluent kafka topic create dr-topic --partitions 1 --cluster <original-cluster-id>
以下の例のように、
Created topic "dr-topic"
というメッセージが表示されます。> confluent kafka topic create dr-topic --partitions 1 --cluster lkc-xkd1g Created topic "dr-topic".
これはトピックをリスト表示することで検証できます。
confluent kafka topic list --cluster <original-cluster-id>
DR クラスターに
dr-topic
のミラートピックを作成します。confluent kafka mirror create dr-topic --link <link-name> --cluster <DR-cluster-id>
以下の例のように、
Created mirror topic "dr-topic"
というメッセージが表示されます。> confluent kafka mirror create dr-topic --link dr-link --cluster lkc-r68yp Created mirror topic "dr-topic".
ちなみに
現在のプレビューリリースでは、CLI または API コマンドでミラートピックを 1 つずつ作成する必要があります。今後のリリースでは、送信元クラスターで新しいトピックが作成されるたびに自動的にミラートピックを作成できるサービスが Cluster Linking に追加される可能性があります。この場合は、プレフィックスに基づいてトピックをフィルター処理できる可能性があります。この機能は、DR クラスターに DR トピックを自動作成する場合に役立ちます。
この時点では、オリジナルクラスターにトピックが 1 つあり、そのデータ、ACL、およびコンシューマーグループオフセットがすべて DR クラスターのミラートピックにミラーリングされています。
オリジナルクラスターでのデータの生成および消費¶
このセクションでは、オリジナルクラスターへのデータ生成とオリジナルクラスターからのデータ消費を行うアプリケーションをシミュレーションします。Confluent CLI を使用して、そのための関数を消費および生成します。
CLI ベースのデモクライアントとするサービスアカウントを作成します。
confluent iam service-account create CLI --description "From CLI"
出力は以下のようになります。
+-------------+-----------+ | Id | 254262 | | Resource ID | sa-ldr3w1 | | Name | CLI | | Description | From CLI | +-------------+-----------+
上の例では、
<cli-service-account-id>
は 254262 です。オリジナルクラスターに、このサービスアカウントの API キーとシークレットを作成し、
<original-CLI-api-key>
と<original-CLI-api-secret>
として保存します。confluent api-key create --resource <original-cluster-id> --service-account <cli-service-account-id>
DR クラスターに、このサービスアカウントの API キーとシークレットを作成し、
<DR-CLI-api-key>
と<DR-CLI-api-secret>
として保存します。confluent api-key create --resource <DR-cluster-id> --service-account <cli-service-account-id>
CLI サービスアカウントに、オリジナルクラスターでメッセージを生成および消費できる ACL を付与します。
confluent kafka acl create --service-account <cli-service-account-id> --allow --operation READ --operation DESCRIBE --operation WRITE --topic "*" --cluster <original-cluster-id>
confluent kafka acl create --service-account <cli-service-account-id> --allow --operation DESCRIBE --operation READ --consumer-group "*" --cluster <original-cluster-id>
これで、オリジナルクラスターでデータを生成および消費できるようになりました。
元のクラスターではオリジナル API キーを使用することを CLI に指示します。
confluent api-key use <original-cluster-api-key> --resource <original-cluster-id>
トピックに 1 から 5 の数字を生成します。
seq 1 5 | confluent kafka topic produce dr-topic --cluster <original-cluster-id>
以下のように出力されますが、^C キーまたは ^D キーを押さなくてもコマンドは終了します。
Starting Kafka Producer. ^C or ^D to exit
ちなみに
Kafka クラスターに接続できないことを示す
unable to connect to Kafka cluster
というエラーメッセージが返された場合は、1 ~ 2 分待ってから再試行してください。最近作成された Kafka クラスターと API キーについては、リソースの準備が整うまでに数分かかる場合があります。CLI コンシューマーを開始して、
dr-topic
トピックから読み取りを行い、それにcli-consumer
という名前を付けます。このコマンド内で、フラグ
--from-beginning
を渡して、コンシューマーにオフセット0
から開始するよう指示します。confluent kafka topic consume dr-topic --group cli-consumer --from-beginning
コンシューマーが 5 個のメッセージをすべて読み取ったら、Ctrl + C キーを押してコンシューマーを終了します("+" はキーを同時に押すことを意味します)。
コンシューマーがフェイルオーバーで正しいオフセットから再開することを確認するために、コンシューマーで人為的にコンシューマーラグを発生させます。
トピックに 6 から 10 の数字を生成します。
seq 6 10 | confluent kafka topic produce dr-topic
以下のように出力されますが、^C キーまたは ^D キーを押さなくてもコマンドは終了します。
Starting Kafka Producer. ^C or ^D to exit
これで、オリジナルクラスターのトピックに 10 個のメッセージを生成しました。cli-consumer が消費したのは 5 個だけです。
ミラーリングラグのモニタリング¶
Cluster Linking は非同期プロセスであるため、ソースクラスターと送信先クラスターの間でミラーリングラグが発生する可能性があります。
以下のコマンドで、DR トピックのパーティション単位でどのようなミラーリングラグが生じているかを確認できます。
confluent kafka mirror describe dr-topic --link dr-link --cluster <dr-cluster-id>
LinkName | MirrorTopicName | Partition | PartitionMirrorLag | SourceTopicName | MirrorStatus | StatusTimeMs
+-----------+-----------------+-----------+--------------------+-----------------+--------------+---------------+
dr-link | dr-topic | 0 | 0 | dr-topic | ACTIVE | 1624030963587
dr-link | dr-topic | 1 | 0 | dr-topic | ACTIVE | 1624030963587
dr-link | dr-topic | 2 | 0 | dr-topic | ACTIVE | 1624030963587
dr-link | dr-topic | 3 | 0 | dr-topic | ACTIVE | 1624030963587
dr-link | dr-topic | 4 | 0 | dr-topic | ACTIVE | 1624030963587
dr-link | dr-topic | 5 | 0 | dr-topic | ACTIVE | 1624030963587
また、Confluent Cloud Metrics を通じてラグとミラーリングメトリクスをモニタリングできます。以下の 2 つのメトリクスが表示されます。
- MaxLag は、ミラーリング中のパーティションでの最大ラグ(メッセージ数)を示します。トピック単位、リンク単位で利用できます。これにより、フェイルオーバーの時点で、どの程度のデータがオリジナルクラスターにのみ存在するかを大まかに把握できます。
- ミラーリングスループットは、リンク単位またはトピック単位でどの程度データがミラーリングされているかを示します。
クラスターリンクとミラートピックのリスト表示¶
ワークフローのさまざまなポイントで、リンクやミラートピックなど、Cluster Linking リソースのリストを取得することが必要になる場合があります。たとえば、モニタリングの目的で行うことや、フェイルオーバーまたは移行を開始する前に行うことが考えられます。
アクティブクラスター上のクラスターリンクをリスト表示するには
confluent kafka link list
リンク上またはクラスター上のミラートピックをリスト表示できます。
特定のクラスターリンク上のミラートピックをリスト表示するには
confluent kafka mirror list --link <link-name> --cluster <cluster-id>
特定のクラスター上のすべてのミラートピックをリスト表示するには
confluent kafka mirror list --cluster <cluster-id>
DR クラスターへのフェイルオーバーのシミュレーション¶
災害時は、オリジナルクラスターに到達できなくなるのが普通です。
このセクションでは、DR クラスターで運用を再開するために行う手順を説明します。
フェイルオーバーのドライランを実行して、実際にはコマンドを実行せずに結果をプレビューします。その方法は、コマンドの最後に
--dry-run
フラグを追加するだけです。confluent kafka mirror failover <mirror-topic-name> --link <link-name> --cluster <DR-cluster-id> --dry-run
その例を次に示します。
confluent kafka mirror failover dr-topic --link dr-link --cluster <DR-cluster-id> --dry-run
ミラートピックを停止し、通常の書き込み可能なトピックに変換します。
confluent kafka mirror failover <mirror-topic-name> --link <link-name> --cluster <DR-cluster-id>
この例のミラートピック名とリンク名は次のとおりです。
confluent kafka mirror failover dr-topic --link dr-link --cluster <DR-cluster-id>
想定される出力を以下に示します。
MirrorTopicName | Partition | PartitionMirrorLag | ErrorMessage | ErrorCode ------------------------------------------------------------------------- dr-topic | 0 | 0 | |
stop コマンドは元に戻せません。ミラートピックを通常のトピックに変更した後でミラートピックに戻すことはできません。再度ミラートピックにする場合は、いったん削除して、ミラートピックとして再作成する必要があります。
これで、DR クラスターでデータを生成および消費できるようになりました。
DR クラスターの API キーを使用するように CLI を設定します。
confluent api-key use <DR-CLI-api-key> --resource <DR-cluster-id>
トピックで 11 から 15 の数字を生成して、書き込み可能なトピックであることを確認します。
seq 11 15 | confluent kafka topic produce dr-topic --cluster <DR-cluster-id>
以下のように出力されますが、^C キーまたは ^D キーを押さなくてもコマンドは終了します。
Starting Kafka Producer. ^C or ^D to exit
コンシューマーグループを DR クラスターに "移動" し、DR クラスターの
dr-topic
から消費します。confluent kafka topic consume dr-topic --group cli-consumer --cluster <DR-cluster-id>
コンシューマーは、数字 6 で消費を開始するはずです。オリジナルクラスターはここで中断されたからです。想定どおりの出力であれば、コンシューマーオフセットが正しく同期されたことが示されます。数字 15 まで消費されます。これが DR クラスターに生成した最後のメッセージです。
数字 15 が表示されたら、Ctrl + C キーを押してコンシューマーを終了します。
想定される出力を以下に示します。
Starting Kafka Consumer. ^C or ^D to exit 6 7 8 9 10 11 12 13 14 15 ^CStopping Consumer.
これで CLI プロデューサーとコンシューマーを DR クラスターにフェイルオーバーできました。ここでスムーズに運用が継続されます。
おすすめのリソース¶
- ディザスターリカバリに関するウェビナー: Demo Series: Learn the Confluent Q3 '21 Release
- このチュートリアルでは、ディザスターリカバリの特定のユースケースを説明しました。「クラスター、リージョン、クラウド間でのデータの共有」では、クラスター、リージョン、クラウドの境界内のトピック間データ共有または境界を越えたトピック間データ共有に関するチュートリアルを取り上げています。こちらも Cluster Linking の基本的なユースケースです。
- 「ミラートピック」では、Cluster Linking のこの機能のコンセプトの概要について説明します。