Cluster Linking のコマンド¶
Looking for Confluent Cloud Cluster Linking docs? You are currently viewing Confluent Platform documentation. If you are looking for Confluent Cloud docs, check out Cluster Linking on Confluent Cloud.
Confluent Enterprise 7.0.0 には、Confluent Platform、Confluent Cloud、Kafka クラスターのデータとメタデータを Confluent Platform および Confluent Cloud クラスターと同期する、Cluster Linking と呼ばれる新機能が含まれています。
Cluster Linking は、2 つの重要な抽象化を使用してデータを Geo レプリケートします。
- クラスターリンク: "送信元クラスター" と "送信先クラスター" を接続します。
- ミラートピック: クラスターリンクの送信先クラスターにあるトピックで、送信元クラスターのトピックと同一のコピーです。
クラスターリンクとミラートピックの作成および管理には、Kafka REST v3 Proxy の REST API、またはターミナルで CLI コマンドを使用します。このセクションでは、クラスターリンクおよびミラートピックを作成および管理するためのコマンドについて説明します。
CLI コマンド¶
クラスターリンクの構成オプションは、kafka-cluster-links
コマンドのフラグの値として使用できます。これらのオプションの一部は、以下のコマンド例のコンテキストの中で示されています。完全なリストは、「リンクプロパティ」にリファレンス形式で記載されています。
kafka-cluster-links
は、クラスター間のリンクを作成および管理するために使用します。
Confluent Platform の他の Kafka コマンドと同様に、--bootstrap-server
は kafka-cluster-links
コマンドの必須フラグです。
--bootstrap-server
(必須)クラスター内のブローカーの接続文字列で、
host:port
形式です(ブローカーが複数存在する場合はコンマ区切りリストを指定できます)。ミラートピックの作成先となる送信先クラスターを指定する必要があります。送信先クラスターは、Confluent Platform 7.0.0 以降を実行している必要があります。これは Cluster Linking をサポートするために必要です。- 型: 文字列
- デフォルト: 空の文字列
以降のすべての kafka-cluster-links
の実装で --bootstrap-server
を使用します。
ちなみに
ほとんどの場合、--bootstrap-server
はコマンドの最初(kafka-cluster-links
の直後)または最後に指定できます。以下のセクションの例では、--bootstrap-server
が kafka-cluster-links
の直後に使用されています。
クラスターリンクの作成¶
コマンド例
kafka-cluster-links --bootstrap-server localhost:9092 \
--create \
--link example-link \
--config-file example-link.config
出力例
Cluster link 'example-link' creation successfully completed.
To create a cluster link, use kafka-cluster-links
along with bootstrap-server and the following flags. You must have ALTER CLUSTER
authorization to create a cluster link, as described in 認可(ACL)
(subsection, 送信元クラスターのリンクに対する ACL).
--link
(必須)作成するクラスターリンクの名前です。クラスター内で一意のクラスターリンク名である必要があります。
- 型: 文字列
--cluster-id
(必須)リンクする送信元クラスターの ID。クラスターの ID は CLI コマンド
kafka-cluster cluster-id
で調べることができます。- 型: 文字列
(必須)送信先クラスターが送信元クラスターと通信する方法を指定するために、以下の(両方ではなく)いずれかのパラメーターを指定する必要があります。使用可能な構成は、クライアントを構成するために使用する構成です。これには、必須の bootstrap.servers
と他の必要なセキュリティおよび認可プロパティが含まれます。
--config
作成時のクラスターリンクに適用される構成のコンマ区切りリストです("キー=値" 形式)。このフラグを使用する場合、構成は直接コマンドラインで指定されます(次のフラグで説明する、ファイルに指定する方法とは対照的です)。角かっこを使用して、コンマを含む値をグループ化することができます。使用できる全構成のリストについては、「リンクプロパティ」を参照してください。
- 型: 文字列
--config-file
クラスターリンクの 構成 が含まれているプロパティファイルです。これは、クラスターリンク構成の推奨される指定方法です。
- 型: 文字列
たとえば、link-config.properties
という名前のファイルに、以下のようにセキュアクラスターリンク用の構成を指定したとします。
bootstrap.servers=example-1:9092,example-2:9092,example-3:9092
sasl.mechanism=PLAIN
sasl.jaas.config=org.apache.kafka.common.security.plain.PlainLoginModule required username="example-user" password="example-password"
security.protocol=SASL_SSL
ssl.endpoint.identification.algorithm=https
この場合、以下のコマンドを使用してクラスターリンク example-link
を作成できます。
kafka-cluster-links --bootstrap-server localhost:9092 --create --link example-link --config-file link-config.properties --cluster-id pz-s7W72Sdm7A11wzku9gA
オプションの構成:
--command-config
- AdminClient に渡される構成が含まれているプロパティファイルです。たとえば、認可および認証のセキュリティ認証情報が含まれます。
--consumer-group-filters-json
consumer.offset.group.filters
の構成に使用する JSON 文字列。詳細については、「送信元クラスターから送信先クラスターへのコンシューマーグループの移行」を参照してください。- 型: 文字列
--consumer-group-filters-json-file
consumer.offset.group.filters
の構成に使用する JSON ファイルのパスです。詳細については、「送信元クラスターから送信先クラスターへのコンシューマーグループの移行」を参照してください。- 型: 文字列
--acl-filters-json-file
acl.filters
の構成に使用する ACL フィルター JSON ファイルのパスです。詳細については、「送信元クラスターから送信先クラスターへの ACL の移行」を参照してください。- 型: 文字列
--validate-only
- 指定した場合、クラスターリンクを指定どおりに作成できるかどうかを検証します(ただし作成は行いません)。
--exclude-validate-link
- 指定した場合、送信元クラスターに到達できることを検証せずにリンクを作成します。これは、送信元クラスターがまだ実行されていない場合や到達可能でない場合にのみ便利です。送信元クラスターが実行中で利用可能な場合、検証機能はスキップされるため、このオプションの使用はお勧めしません。
--topic-filters-json
auto.create.mirror.topics.filters
の構成に使用する JSON 文字列。詳細については、「ミラートピック」を参照してください。--topic-filters-json-file
auto.create.mirror.topics.filters
の構成に使用する JSON ファイルのパス。詳細については、「ミラートピック」を参照してください。
クラスターリンクのリストの表示¶
コマンド例
kafka-cluster-links --list --bootstrap-server localhost:9092
出力例
Link name: 'example-link', link ID: '123-some-link-id', remote cluster ID: '123-some-cluster-id', local cluster ID: ', local cluster ID: '456-some-other-cluster-id'', remote cluster available: 'true'
既存のクラスターリンクのリストを表示できます。このコマンドでは、リンク名、リンク ID(内部で割り当てられる一意の ID)、リンクされたクラスターのクラスター ID、およびリンクされたクラスターが利用可能かどうかが返されます。
ちなみに
クラスターリンクのリストを表示するには、kafka-cluster-links
を bootstrap-server と以下のフラグとともに使用します。
--link
指定した場合、指定されたクラスターリンクのリストのみを表示します。
- 型: 文字列
--command-config
AdminClient に渡される構成が含まれているプロパティファイルです。たとえば、認可および認証のセキュリティ認証情報が含まれます。
- 型: 文字列
--include-topics
指定した場合、このクラスターリンクのすべてのミラートピックのリストを表示します。
- 型: 文字列
クラスターリンクのリストを表示するには、DESCRIBE CLUSTER
認可が必要です。
クラスターリンクの詳細の表示¶
コマンド例
kafka-configs --bootstrap-server localhost:9092 \
--describe \
--cluster-link example-link
出力例
Dynamic configs for cluster-link example-link are:
metadata.max.age.ms=300000 sensitive=false synonyms={}
...
クラスターリンクの詳細を表示するには、kafka-cluster-links
を bootstrap-server と以下のフラグとともに使用します。
--cluster-link
(必須)詳細を表示するクラスターリンクの名前です。
- 型: 文字列
--command-config
AdminClient に渡される構成が含まれているプロパティファイルです。たとえば、認可および認証のセキュリティ認証情報が含まれます。
- 型: 文字列
クラスターリンクの詳細を表示するには、DESCRIBE CLUSTER
認可が必要です。
クラスターリンクの変更¶
コマンド例
kafka-configs --bootstrap-server localhost:9092 \
--alter \
--cluster-link example-link \
--add-config cluster.link.retry.timeout.ms=10000 \
--delete-config request.timeout.ms
出力例
Completed updating config for cluster-link example-link.
To alter an existing link, use kafka-cluster-links
along with bootstrap-server and these flags.
--alter
- (必須)リンクを変更します。
--cluster-link
(必須)変更するクラスターリンクの名前です。
- 型: 文字列
--command-config
- AdminClient に渡される構成が含まれているプロパティファイルです。たとえば、認可および認証のセキュリティ認証情報が含まれます。
少なくとも以下のいずれかを指定する必要があります。
--add-config
コマンドラインで構成を "キー=値" 形式で追加します。角かっこを使用して、コンマを含む値をグループ化することができます。使用できる全構成のリストについては、「リンクプロパティ」を参照してください。
- 型: 文字列
--add-config-file
追加する 構成 が含まれているプロパティファイルのパスです。
- 型: 文字列
--delete-config
削除する構成キーのコンマ区切りリストです。
- 型: 文字列
「認可(ACL)」で説明するように、リンクに関連付けられているクラスターを変更するには ALTER CLUSTER
認可が必要です。
ちなみに
既存のクラスターリンクの構成を更新するときには、変更された構成のみを渡します。構成ファイルの完全なセットを渡すほうが簡単ではありますが、
--add-config-file
で構成ファイルを使用する場合は、更新する構成のみがそれに含まれていることに、特に留意してください。たとえば、my-update-configs.txt
には以下のものが含まれる可能性があります。consumer.offset.sync.ms=25384 topic.config.sync.ms=38254
クラスターリンクの構成はさまざまな側面を変更できますが、その 送信元クラスター (ブートストラップサーバー、送信元クラスター ID など)やプレフィックス、リンク名 を変更することはできません。
新しい構成を検証するためのリンクの一時停止と再開¶
動的に更新可能な構成(SSL など)の場合、古い構成が使用される接続と新しい構成が使用される接続とが混在する可能性があります。アクションに不要な接続は、再作成されないようになっています。API と CLI の出力には最新の構成が反映されていますが、これらは、kafka-configs --alter
操作後の真の構成の状態を表してない場合があります。
新しい構成が適用されていることを確認するには、cluster.link.paused
を使用してリンクを一時停止した後、一時停止を解除してリンクを再開してください。詳細については、「Cluster Linking の構成オプション」の cluster.link.paused
を参照してください。
クラスターリンクの削除¶
コマンド例
kafka-cluster-links --bootstrap-server localhost:9092 \
--delete \
--link example-link
出力例
Cluster link 'example-link' deletion successfully completed.
既存のリンクを削除するには、kafka-cluster-links
を bootstrap-server と以下のフラグとともに使用します。
--link
(必須)詳細を表示するクラスターリンクの名前です。
- 型: 文字列
--command-config
AdminClient に渡される構成が含まれているプロパティファイルです。たとえば、認可および認証のセキュリティ認証情報が含まれます。
- 型: 文字列
--validate-only
- 指定した場合、クラスターリンクの削除を検証します(実際には削除しません)。
--force
- 現在リンクされているミラートピックがある場合でも強制的にリンクを削除します。
ちなみに
クラスターリンクに未処理のミラートピックがある場合、強制的に削除(--force
)しなければ、リクエストが失敗することがあります。強制的に削除する場合、そのクラスターリンクに関連付けられていたミラートピックのリンクがバックグラウンドで解除されます。
「認可(ACL)」で説明するように、クラスターリンクを削除するには、ALTER CLUSTER
認可が必要です。
重要
- クラスターリンクが削除されると、
STOPPED
トピックの履歴も削除されます。昇格された、またはフェイルオーバーされたミラートピックのLast Source Fetch Offset
やStatus Time
が必要な場合は、それらを保存してからクラスターリンクを削除してください。 - ミラートピックがまだ存在しているクラスターリンクは削除できません(削除操作は失敗します)。
- Confluent for Kubernetes (CFK)を使用している場合にクラスターリンクリソースを削除すると、そのクラスターリンクに関連付けられたままのミラートピックは、
failover
API を使用して強制的に通常のトピックに変換されます。
ミラートピックの作成¶
ミラートピックは、別のトピック内のすべてのデータとメタデータを反映した読み取り専用トピックです。
CLI でミラートピックを作成するには、kafka-mirrors
ツールを使用します。ミラートピックが作成されると、ミラーリングでは自動的に送信元トピックからのデータのフェッチを開始します。
詳細については、「ミラートピック」を参照してください。
コマンド例
kafka-mirrors --create --mirror-topic example-topic \
--link demo-link \
--bootstrap-server localhost:9092
出力例
Created topic example-topic.
ミラートピックを作成するには、kafka-cluster-links
を bootstrap-server と以下のフラグとともに使用します。
--mirror-topic
(必須)作成するミラートピックの名前。これは、クラスターリンクを通じてミラーリングするソーストピックと正確に一致する必要があります。
- 型: 文字列
--link
(必須)ソーストピックからデータを取得するために使用するクラスターリンクの名前です。
- 型: 文字列
ちなみに
--mirror-topic
の値は元の--topic
の名前と一致している必要があります。- ミラートピックのリストを表示する には、
--include-topics
フラグを指定してkafka-cluster-links --list
を実行します。
--command-config
- AdminClient に渡される構成が含まれているプロパティファイルです。たとえば、認可および認証のセキュリティ認証情報が含まれます。
以下は、ミラートピック作成時のオプションの構成です。
--config
ミラートピックの作成時にオーバーライドする構成のコンマ区切りのリスト。オーバーライドする各構成は、
name=value
として指定する必要があります。ミラートピックに設定できる構成の詳細については、ミラートピックの「構成」を参照してください。- 型: 文字列
--replication-factor
作成されるミラートピックのレプリケーション係数。指定しない場合、"デフォルトは送信先クラスターのデフォルト" になります。送信元トピックのレプリケーション係数ではありません。
- 型: 文字列
ミラートピックを作成するには、ALTER CLUSTER
認可が必要です。
ミラートピックのリストの表示¶
クラスターのすべてのクラスターリンクのミラートピックを表示するには、kafka-mirrors --list
を使用します。このコマンドでは、特定のクラスターリンクのミラートピックのリスト、またはクラスターのすべてのクラスターリンクの全ミラートピックのリストを表示できます。
コマンド例
kafka-mirrors --list --bootstrap-server localhost:9092
出力例
topic1
topic2
topic3
topic4
コマンドに以下のパラメーターを追加できます。
--link
(任意)フィルタリングするクラスターリンクの名前。指定した場合、このクラスターリンクのミラートピックのみのリストが表示されます。
- 型: 文字列
--include-stopped
- (任意)このフラグを追加した場合は、以前はミラートピックであったものの
promote
またはfailover
コマンドによって停止されたトピックもリストに含まれます。このフラグは引数を取りません。
ミラートピックの詳細表示¶
クラスターリンク上でミラーリングされるトピックについての情報を表示したり、トピックを管理したりするには、kafka-mirrors
を使用します。
kafka-mirrors --describe --bootstrap-server pkc-nwnyk.us-west-2.aws.confluent.cloud:9092 --command-config lkc-rn220.config --link onprem-to-cloud
Topic: web.orders.modified LinkName: onprem-to-cloud LinkId: 2c3dee72-f54b-4bb3-9694-cf29caaf3637 MirrorTopic: web.orders.modified State: ACTIVE StateTime: 2021-11-10 15:33:29
Partition: 0 State: ACTIVE DestLogEndOffset: 114123 LastFetchSourceHighWatermark: 114123 Lag: 0 TimeSinceLastFetchMs: 8385101
Partition: 1 State: ACTIVE DestLogEndOffset: 115278 LastFetchSourceHighWatermark: 115278 Lag: 0 TimeSinceLastFetchMs: 8387954
Partition: 2 State: ACTIVE DestLogEndOffset: 112210 LastFetchSourceHighWatermark: 112210 Lag: 0 TimeSinceLastFetchMs: 8508856
Partition: 3 State: ACTIVE DestLogEndOffset: 120887 LastFetchSourceHighWatermark: 120887 Lag: 0 TimeSinceLastFetchMs: 8389749
Partition: 4 State: ACTIVE DestLogEndOffset: 109225 LastFetchSourceHighWatermark: 109225 Lag: 0 TimeSinceLastFetchMs: 8385101
Partition: 5 State: ACTIVE DestLogEndOffset: 111669 LastFetchSourceHighWatermark: 111669 Lag: 0 TimeSinceLastFetchMs: 8387954
Topic: web.orders LinkName: onprem-to-cloud LinkId: 2c3dee72-f54b-4bb3-9694-cf29caaf3637 MirrorTopic: web.orders State: ACTIVE StateTime: 2021-11-10 15:33:29
Partition: 0 State: ACTIVE DestLogEndOffset: 294760 LastFetchSourceHighWatermark: 294760 Lag: 0 TimeSinceLastFetchMs: 8387954
Partition: 1 State: ACTIVE DestLogEndOffset: 285862 LastFetchSourceHighWatermark: 285862 Lag: 0 TimeSinceLastFetchMs: 8508856
Partition: 2 State: ACTIVE DestLogEndOffset: 284891 LastFetchSourceHighWatermark: 284891 Lag: 0 TimeSinceLastFetchMs: 8389749
Partition: 3 State: ACTIVE DestLogEndOffset: 285982 LastFetchSourceHighWatermark: 285982 Lag: 0 TimeSinceLastFetchMs: 8385101
Partition: 4 State: ACTIVE DestLogEndOffset: 277379 LastFetchSourceHighWatermark: 277379 Lag: 0 TimeSinceLastFetchMs: 8387954
Partition: 5 State: ACTIVE DestLogEndOffset: 283731 LastFetchSourceHighWatermark: 283731 Lag: 0 TimeSinceLastFetchMs: 8508856
Topic: inventory.shipments LinkName: onprem-to-cloud LinkId: 2c3dee72-f54b-4bb3-9694-cf29caaf3637 MirrorTopic: inventory.shipments State: ACTIVE StateTime: 2021-11-10 15:51:21
Confluent Platform の他の Kafka コマンドと同様に、--bootstrap-server
は kafka-mirrors
コマンドの必須フラグです。
--bootstrap-server
(必須)クラスター内のブローカーの接続文字列で、
host:port
形式です(ブローカーが複数存在する場合はコンマ区切りリストを指定できます)。ミラートピックの作成先となる送信先クラスターを指定する必要があります。送信先クラスターは、Confluent Platform 6.0.0 以降を実行している必要があります。これは Cluster Linking をサポートするために必要です。- 型: 文字列
- デフォルト: 空の文字列
コマンドに以下のパラメーターを追加できます。
--link
(任意)フィルタリングするクラスターリンクの名前。指定した場合、このクラスターリンクのミラートピックのみのリストが表示されます。
- 型: 文字列
--include-stopped
- (任意)このフラグを追加した場合は、以前はミラートピックであったものの
promote
またはfailover
コマンドによって停止されたトピックもリストに含まれます。このフラグは引数を取りません。
トピックのミラーリングの停止¶
トピックのミラーリングを停止するには、kafka-mirrors --failover
または kafka-mirrors --promote
コマンドを使用します。どちらのコマンドもミラートピックを通常の書き込み可能トピックに永続的に変換しますが、それぞれ特定のユースケースを意図して設計されています。
promote
コマンドは移行を意図したものであるため、ミラーリングを停止する前に、別途いくつかの検証と操作が実行されます。failover
コマンドはディザスターリカバリを意図したものであるため、すぐに反映され、常に成功します。他の操作は実行されません。dry-run
オプションを使用すると、コマンドを実際に実行する前に、その結果をプレビューできます。
promote
および failover
コマンドでは、--topics
フラグとコンマ区切りのトピック名のリストを使用してトピック名のリストを渡すことによって、複数のトピックでコマンドを同時に実行できます。1 トピックのみをプロモートする場合でも、--topics
(複数形)フラグを使用します。その例を次に示します。
--topics topic1
--topics topic1,topic2,topic3
その他の例を以下に示します。
トピックの昇格¶
ミラーリングを停止してミラートピックを通常のトピックに変換する作業を正しい(一般に移行のシナリオに適した)プロセスで行うには、kafka-mirrors --promote
を使用します。このコマンドは、送信元トピックとミラートピック間のラグが 0 であることをチェックし、メタデータ(コンシューマーグループのオフセットとトピックの構成)の最終的な同期を 1 回実行した後で、ミラートピックを通常のトピックに変換します。
注釈
promote
コマンドでは、送信元トピックに対するデータの生成は停止されません。ラグが 0 であることのチェックに成功した後で、送信元トピックに対してプロデューサーから新たにデータが生成される可能性はあります。このケースでは、両者は分岐して 2 つの異なるトピックとなり、(それまでミラーリングされていた)通常のトピックにデータはミラーリングされません。- promote コマンド実行時に、送信元トピックとミラートピックとの間にラグがあった場合、promote コマンドは失敗します。
例
promote
を使用して、同じクラスターリンク上の複数のトピックまたは 1 つのトピックを指定し、ミラーリングを停止することができます。コマンドの構文は次のとおりです。
kafka-mirrors --promote --topics <destination-topic-1> .. <destination-topic-n> \
--link <link name> --bootstrap-server <host:port>
以下に示すのは、example-link
という名前のリンクを使用するトピック example-topic
のミラーリングを、kafka-mirrors --promote
を使用して停止する例です。
kafka-mirrors --promote --topics example-topic --link example-link --bootstrap-server localhost:9092
トピックのフェイルオーバー¶
直ちにミラーリングを停止し、ミラートピックを通常のトピックに変換するには、kafka-mirrors failover
を使用します。通常、これは送信元クラスターに予期せず障害が発生した場合のディザスターリカバリに適しています。failover
コマンドでは、別途のチェックや同期は実行されません。その働きは、promote
の "強制版" といえます。1 つのトピックまたは複数のトピックを指定して、ミラーリングを停止することができます。
例
failover
を使用して、同じクラスターリンク上の複数のトピックまたは 1 つのトピックを指定し、ミラーリングを停止することができます。コマンドの構文は次のとおりです。
kafka-mirrors --failover --topics <destination-topic-1> .. <destination-topic-n> \
--link <link name> --bootstrap-server <host:port>
以下に示すのは、example-link
という名前のリンクを使用するトピック example-topic
のミラーリングを、kafka-mirrors --failover
を使用して停止する例です。
kafka-mirrors --failover --topics example-topic --link example-link --bootstrap-server localhost:9092
ドライラン¶
promote
または failover
コマンドを実行する前にその結果をテストするには、--dry-run
フラグを追加します。その例を次に示します。
kafka-mirrors --failover --topics example-topic --link example-link --bootstrap-server localhost:9092 --dry-run
ミラートピックの一時停止と再開¶
pause(kafka-mirrors --pause
)コマンドと resume(kafka-mirrors --unpause
)コマンドを使用して一時的にミラーリングを一時停止したり再開したりできます。
ミラートピックを一時停止するには、次のようにします。
kafka-mirrors --pause --topics example-topic --link example-link --bootstrap-server localhost:9092
ミラートピックを再開するには、--unpause
を使用します。
kafka-mirrors --unpause --topics example-topic --link example-link --bootstrap-server localhost:9092
送信元クラスターから送信先クラスターへのコンシューマーグループの移行¶
リンク内でコンシューマーグループを移行するには、リンク構成に consumer.offset.sync.enable=true
を設定し、JSON ファイルにグループフィルターを指定します。そのファイルの名前を CLI コマンド の --consumer-group-filters-json-file
フラグの値として渡します。これは、クラスターリンクを作成するときか、または既存の構成のアップデートとして設定できます。
注釈
コンシューマーグループフィルターに含めるのは、送信先で使用されていないグループのみにしてください。そうすることで、送信先の他のコンシューマーによってコミットされたオフセットがシステムによってオーバーライドされるのを防ぐことができます。フィルターに含まれているグループが送信先でも使用されている場合、システムはそのフィルターを回避しようと試みますが、この場合、その保証はありません。オフセットが上書きされる可能性はあります。ミラートピックの "昇格" が正しく機能するためには、システムがオフセットをロールバックできることが必要ですが、グループが送信先コンシューマーによって使用されていると、ロールバックを実行できません。
この例は、"someGroup" グループを "broker-west" クラスターから "broker-east" クラスターに移行し、移行を実行する前に以下のフィルターセットですべてのオフセットを移行している状態になっていると仮定しています。
{"groupFilters": [
{
"name": "*",
"patternType": "LITERAL",
"filterType": "INCLUDE"
}
]}
コンシューマーグループを送信元クラスターから送信先クラスターに移行するには、以下の手順に従います。
送信元クラスターのコンシューマーを停止します。
consumer.offset.sync.ms
の 2 倍の期間だけ待機します。Cluster Linking レプリケーションが最後のコミット済みオフセットを超えていることを確認します。これは、以下のコマンドで確認できます。
送信元クラスターの CURRENT-OFFSET をチェックします。
kafka-consumer-groups --bootstrap-server broker-west:19091 --describe --group someGroup
出力は次のようになります。
GROUP TOPIC PARTITION CURRENT-OFFSET LOG-END-OFFSET LAG CONSUMER-ID HOST CLIENT-ID someGroup west-offsets 0 100 100 0 - - -
送信先クラスターの LOG-END-OFFSET が、上の手順で記録した CURRENT-OFFSET 以上であることを確認します。
kafka-consumer-groups --bootstrap-server broker-east:19092 --describe --group someGroup
出力は次のようになります。
GROUP TOPIC PARTITION CURRENT-OFFSET LOG-END-OFFSET LAG CONSUMER-ID HOST CLIENT-ID someGroup west-offsets 0 100 100 0
以下のコマンドを実行して、現在のオフセットが送信元と送信先で一致していることを検証します。
送信元クラスターの CURRENT-OFFSET をチェックします。
kafka-consumer-groups --bootstrap-server broker-west:19091 --describe --group someGroup
出力は次のようになります。
GROUP TOPIC PARTITION CURRENT-OFFSET LOG-END-OFFSET LAG CONSUMER-ID HOST CLIENT-ID someGroup west-offsets 0 100 100 0 - - -
送信先クラスターの CURRENT-OFFSET をチェックします。
kafka-consumer-groups --bootstrap-server broker-east:19092 --describe --group someGroup
出力は次のようになります。
GROUP TOPIC PARTITION CURRENT-OFFSET LOG-END-OFFSET LAG CONSUMER-ID HOST CLIENT-ID someGroup west-offsets 0 100 100 0
オフセット移行フィルターをアップデートして、移行処理からグループを削除します。
echo "consumer.offset.group.filters={\"groupFilters\": [ \ { \ \"name\": \"*\", \ \"patternType\": \"LITERAL\", \ \"filterType\": \"INCLUDE\" \ }, \ { \ \"name\": \"someGroup\", \ \"patternType\": \"LITERAL\", \ \"filterType\": \"EXCLUDE\" \ } \ ]}" > newFilters.properties kafka-configs --bootstrap-server broker-east:19092 --alter --cluster-link offsets-cluster-link --add-config-file newFilters.properties
送信先クラスターのコンシューマーを起動します。
REST API コマンド¶
Cluster Linking REST API については、REST API Proxy v3 のドキュメント に記載されています。
おすすめの記事¶
- Cluster Linking の構成オプション
- Confluent Platform の CLI ツール
- ブローカー構成の動的な変更 (
kafka-configs
コマンドの使い方を学ぶ場合) - Kafka コンシューマーグループのコマンドツール (
kafka-consumer-groups
コマンドの使い方を学ぶ場合) - チュートリアル: トピックデータ共有での Cluster Linking の使用
- Cluster Linking のセキュリティ