Cluster Linking のコマンド¶
Looking for Confluent Cloud Cluster Linking docs? You are currently viewing Confluent Platform documentation. If you are looking for Confluent Cloud docs, check out Cluster Linking on Confluent Cloud.
重要
この機能はプレビュー機能として利用できます。プレビュー機能とは、開発者から早い段階でフィードバックを受けるために提供している Confluent Platform のコンポーネントのことです。この機能は、評価用、本稼働環境以外でのテスト用、あるいは Confluent にフィードバックを提供するために使用できます。
Confluent Enterprise 6.0.0 には新しい Kafka コマンド kafka-cluster-links が含まれています。このコマンドが正しく動作するには、送信先クラスターで Confluent Platform 6.0.0 以降が実行されており、Cluster Linking が送信先クラスターで有効にされている必要があります。
このセクションでは、Cluster Linking のコマンドと例について、さらに Cluster Linking に関連する Confluent Platform の CLI ツール について説明します。
クラスターリンクの構成オプションは、kafka-cluster-links
コマンドのフラグの値として使用できます。これらのオプションの一部は、以下のコマンド例のコンテキストの中で示されています。完全なリストは、「リンクプロパティ」にリファレンス形式で記載されています。
kafka-cluster-links コマンド¶
kafka-cluster-links
は、クラスター間のリンクを作成および管理するために使用します。
Confluent Platform の他の Kafka コマンドと同様に、--bootstrap-server
は kafka-cluster-links
コマンドの必須フラグです。
--bootstrap-server
(必須)クラスター内のブローカーの接続文字列で、
host:port
形式です(ブローカーが複数存在する場合はコンマ区切りリストを指定できます)。ミラートピックの作成先となる送信先クラスターを指定する必要があります。送信先クラスターは、Confluent Platform 6.0.0 以降を実行している必要があります。これは Cluster Linking をサポートするために必要です。- 型: string
- デフォルト: 空の文字列
以降のすべての kafka-cluster-links
の実装で --bootstrap-server
を使用します。
ちなみに
ほとんどの場合、--bootstrap-server
はコマンドの最初(kafka-cluster-links
の直後)または最後に指定できます。以下のセクションの例では、--bootstrap-server
が kafka-cluster-links
の直後に使用されています。「チュートリアル: トピックデータ共有での Cluster Linking の使用」の例では、コマンドの最後に使用されています。例外は、クラスターリンクを作成するコマンドです。このコマンドでは、--bootstrap-server
の値を少なくとも 2 つ指定する必要があります。
クラスターリンクの作成¶
コマンド例
kafka-cluster-links --bootstrap-server localhost:9092 \
--create \
--link-name example-link \
--config bootstrap.servers=example-1:9092,example-2:9092,example-3:9092
出力例
Cluster link 'example-link' creation successfully completed.
クラスターリンクを作成するには、kafka-cluster-links
を bootstrap-server と以下のフラグとともに使用します。
--link-name
(必須)作成するクラスターリンクの名前です。クラスター内で一意である必要があります。
- 型: string
(必須)送信先クラスターが送信元クラスターと通信する方法を指定するために、以下の(両方ではなく)いずれかのパラメーターを指定する必要があります。使用可能な構成は、クライアントを構成するために使用する構成です。これには、必須の bootstrap.servers
と他の必要なセキュリティおよび認可プロパティが含まれます。
--config
作成時のクラスターリンクに適用される構成のコンマ区切りリストです("キー=値" 形式)。このフラグを使用する場合、構成は直接コマンドラインで指定されます(次のフラグで説明する、ファイルに指定する方法とは対照的です)。角かっこを使用して、コンマを含む値をグループ化することができます。使用できる全構成のリストについては、「リンクプロパティ」を参照してください。
- 型: string
--config-file
クラスターリンクの 構成 が含まれているプロパティファイルです。これは、クラスターリンク構成の推奨される指定方法です。
- 型: string
--consumer-group-filters-json-file
consumer.offset.group.filters
の構成に使用する JSON ファイルのパスです。詳細については、「送信元クラスターから送信先クラスターへのコンシューマーグループの移行」を参照してください。- 型: string
「認可(ACL)」(サブセクション「送信元クラスター上のリンクの ACL」)で説明するように、クラスターリンクを作成するには ALTER CLUSTER
認可が必要です。
--validate-only
- 指定した場合、クラスターリンクを指定どおりに作成できるかどうかを検証します(ただし作成は行いません)。
たとえば、link-config.properties
という名前のファイルに、以下のようにセキュアクラスターリンク用の構成を指定したとします。
bootstrap.servers=example-1:9092,example-2:9092,example-3:9092
sasl.mechanism=PLAIN
sasl.jaas.config=org.apache.kafka.common.security.plain.PlainLoginModule required username="example-user" password="example-password"
security.protocol=SASL_SSL
ssl.endpoint.identification.algorithm=https
この場合、以下のコマンドを使用してクラスターリンク example-link
を作成できます。
kafka-cluster-links --bootstrap-server localhost:9092 --create --link-name example-link --config-file link-config.properties
クラスターリンクのリストの表示¶
コマンド例
kafka-cluster-links --list --bootstrap-server localhost:9092
出力例
Link name: 'example-link', link ID: '123-some-link-id', cluster ID: '123-some-cluster-id'
既存のクラスターリンクのリストを表示できます。このコマンドでは、リンク名、リンク ID(内部で割り当てられる一意の ID)、およびリンクされたクラスターのクラスター ID が返されます。
ちなみに
クラスターリンクのリストを表示するには、kafka-cluster-links
を bootstrap-server と以下のフラグとともに使用します。
--link-name
指定した場合、指定されたクラスターリンクのリストのみを表示します。
- 型: string
--command-config
指定した場合、リンクされているすべてのトピックのリストを表示します。
- 型: string
--include-topics
指定した場合、リンクされているすべてのトピック(ミラートピック)のリストを表示します。
- 型: string
クラスターリンクのリストを表示するには、DESCRIBE CLUSTER
認可が必要です。
ミラートピックのリストの表示¶
コマンド例
kafka-cluster-links --list --link-name example-link --include-topics --bootstrap-server localhost:9092
出力例
Link name: 'example-link', link ID: '123-some-link-id', cluster ID: '123-some-cluster-id', topics: [example-topic]
ミラートピックのリストを表示するには、kafka-cluster-links --list
を使用します。このオプションについては、前のセクションで説明しました。この例では、--include-topics
フラグを使用してクラスターリンクのリストを表示します。
クラスターリンクの詳細の表示¶
コマンド例
kafka-configs --bootstrap-server localhost:9092 \
--describe \
--cluster-link example-link
出力例
Dynamic configs for cluster-link example-link are:
metadata.max.age.ms=300000 sensitive=false synonyms={}
...
クラスターリンクの詳細を表示するには、kafka-cluster-links
を bootstrap-server と以下のフラグとともに使用します。
--cluster-link
(必須)詳細を表示するクラスターリンクの名前です。
- 型: string
--command-config
AdminClient に渡される構成が含まれているプロパティファイルです。
- 型: string
クラスターリンクの詳細を表示するには、DESCRIBE CLUSTER
認可が必要です。
クラスターリンクの変更¶
コマンド例
kafka-configs --bootstrap-server localhost:9092 \
--alter \
--cluster-link example-link \
--add-config cluster.link.retry.timeout.ms=10000 \
--delete-config request.timeout.ms
出力例
Completed updating config for cluster-link example-link.
既存のリンクを変更するには、kafka-cluster-links
を bootstrap-server と以下のフラグとともに使用します。
--alter
- (必須)リンクを変更します。
--cluster-link
(必須)変更するクラスターリンクの名前です。
- 型: string
少なくとも以下のいずれかを指定する必要があります。
--add-config
コマンドラインで構成を "キー=値" 形式で追加します。角かっこを使用して、コンマを含む値をグループ化することができます。使用できる全構成のリストについては、「リンクプロパティ」を参照してください。
- 型: string
--add-config-file
追加する 構成 が含まれているプロパティファイルのパスです。
- 型: string
--delete-config
削除する構成キーのコンマ区切りリストです。
- 型: string
「認可(ACL)」で説明するように、リンクに関連付けられているクラスターを変更するには ALTER CLUSTER
認可が必要です。
ちなみに
クラスターリンクの構成はさまざまな側面を変更できますが、その 送信元クラスター (ブートストラップサーバー、ID など)や リンク名 を変更することはできません。
新しい構成を検証するためのリンクの一時停止と再開¶
動的に更新可能な構成(SSL など)の場合、古い構成が使用される接続と新しい構成が使用される接続とが混在する可能性があります。アクションに不要な接続は、再作成されないようになっています。API と CLI の出力には最新の構成が反映されていますが、これらは、kafka-configs --alter
操作後の真の構成の状態を表してない場合があります。
新しい構成が適用されていることを確認するには、cluster.link.paused
を使用してリンクを一時停止した後、一時停止を解除してリンクを再開してください。詳細については、「Cluster Linking の構成オプション」の cluster.link.paused
を参照してください。
クラスターリンクの削除¶
コマンド例
kafka-cluster-links --bootstrap-server localhost:9092 \
--delete \
--link-name example-link
出力例
Cluster link 'example-link' deletion successfully completed.
既存のリンクを削除するには、kafka-cluster-links
を bootstrap-server と以下のフラグとともに使用します。
--link-name
(必須)詳細を表示するクラスターリンクの名前です。
- 型: string
--command-config
AdminClient に渡される構成が含まれているプロパティファイルです。
- 型: string
--validate-only
- 指定した場合、クラスターリンクの削除を検証します(実際には削除しません)。
--force
- 現在リンクされているトピックがある場合でも強制的にリンクを削除します。
ちなみに
クラスターリンクにリンクされた未処理のトピックがある場合、強制的に削除(--force
)しなければ、リクエストが失敗することがあります。強制的に削除する場合、リンクされていたトピックのリンクがバックグラウンドで解除されます。
「認可(ACL)」で説明するように、クラスターリンクを削除するには、ALTER CLUSTER
認可が必要です。
注釈
クラスターリンクが削除されると、STOPPED
トピックの履歴も削除されます。昇格された、またはフェイルオーバーされたミラートピックの Last Source Fetch Offset
や Status Time
が必要な場合は、それらを保存してからクラスターリンクを削除してください。
ミラートピックの作成¶
ミラートピックは、別のトピック内のすべてのデータとメタデータを反映した読み取り専用トピックです。
ミラートピックの作成は通常のトピックの作成とほぼ同じですが、追加のタグが必要となります。ミラートピックが作成されると、ミラーはソースからデータを自動的にフェッチし、トピックに対してミラーリングが有効にされている限りこれを継続します。
ミラーリングの継続中にソーストピックが削除されると、ミラートピックは失敗の状態になります。この場合、ミラーリングを停止するか、またはトピックを削除する必要があります。
コマンド例
kafka-topics --bootstrap-server localhost:9092 \
--create \
--topic example-topic \
--link-name example-link \
--mirror-topic example-topic
出力例
Created topic example-topic.
ミラートピックを作成するには、kafka-cluster-links
を bootstrap-server と以下のフラグとともに使用します。
--topic
(必須)作成するトピックの名前です。
- 型: string
--link-name
(必須)ソーストピックからデータを取得するために使用するクラスターリンクの名前です。
- 型: string
--mirror-topic
(必須)クラスターリンクを通じてミラーリングするソーストピックの名前です。
- 型: string
ちなみに
- プレビューでは、
--mirror-topic
の値は元の--topic
の名前と一致している必要があります。 - ミラートピックのリストを表示する には、
--include-topics
フラグを指定してkafka-cluster-links --list
を実行します。
ミラートピックを作成するには、ALTER CLUSTER
認可が必要です。
kafka-mirrors コマンド¶
クラスターリンク上でミラーリングされるトピックについての情報を表示したり、トピックを管理したりするには、kafka-mirrors
を使用します。
Confluent Platform の他の Kafka コマンドと同様に、--bootstrap-server
は kafka-mirrors
コマンドの必須フラグです。
--bootstrap-server
(必須)クラスター内のブローカーの接続文字列で、
host:port
形式です(ブローカーが複数存在する場合はコンマ区切りリストを指定できます)。ミラートピックの作成先となる送信先クラスターを指定する必要があります。送信先クラスターは、Confluent Platform 6.0.0 以降を実行している必要があります。これは Cluster Linking をサポートするために必要です。- 型: string
- デフォルト: 空の文字列
以降のすべての kafka-mirrors
の実装で --bootstrap-server
を使用します。
トピックのミラーリングの停止¶
トピックのミラーリングを停止するには、kafka-mirrors --failover
または kafka-mirrors --promote
コマンドを使用します。どちらのコマンドもミラートピックを通常の書き込み可能トピックに変換しますが、それぞれ特定のユースケースを意図して設計されています。
promote
コマンドは移行を意図したものであるため、ミラーリングを停止する前に、別途いくつかの操作が実行されます。failover
コマンドはディザスターリカバリを意図したものであるため、その作用はすぐに現れ、自動的に成功します。dry-run
オプションを使用すると、コマンドを実際に実行する前に、その結果をプレビューできます。
以下に例を示します。
ちなみに
kafka-topics --alter --mirror-action stop
は、Confluent Platform 6.2.0 では promote
と failover
が採用されて非推奨となり、Confluent Platform 7.0.0 以降では利用できなくなります。以前のバージョンのドキュメントには、そのコマンドの構文が示されていますが、今後は promote
と failover
を使用することをお勧めします。
トピックの昇格¶
ミラーリングを停止してミラートピックを通常のトピックに変換する作業を正しい(一般に移行のシナリオに適した)プロセスで行うには、kafka-mirrors --promote
を使用します。このコマンドは、送信元トピックとミラートピック間のラグが 0 であることをチェックし、メタデータ(コンシューマーグループのオフセットとトピックの構成)の最終的な同期を 1 回実行した後で、ミラートピックを通常のトピックに変換します。
注釈
promote
コマンドでは、送信元トピックに対するデータの生成は停止されません。ラグが 0 であることのチェックに成功した後で、送信元トピックに対してプロデューサーから新たにデータが生成される可能性はあります。このケースでは、両者は分岐して 2 つの異なるトピックとなり、(それまでミラーリングされていた)通常のトピックにデータはミラーリングされません。
例
promote
を使用して、同じクラスターリンク上の複数のトピックまたは 1 つのトピックを指定し、ミラーリングを停止することができます。コマンドの構文は次のとおりです。
kafka-mirrors --promote --topics <destination-topic-1> .. <destination-topic-n> \
--link <link name> --bootstrap-server <host:port>
以下に示すのは、example-link
という名前のリンクを使用するトピック example-topic
のミラーリングを、kafka-mirrors --promote
を使用して停止する例です。
kafka-mirrors --promote --topics example-topic --link example-link --bootstrap-server localhost:9092
トピックのフェイルオーバー¶
直ちにミラーリングを停止し、ミラートピックを通常のトピックに変換するには、kafka-mirrors failover
を使用します。通常、これは送信元クラスターに予期せず障害が発生した場合のディザスターリカバリに適しています。failover
コマンドでは、別途のチェックや同期は実行されません。その働きは、promote
の "強制版" といえます。1 つのトピックまたは複数のトピックを指定して、ミラーリングを停止することができます。
例
failover
を使用して、同じクラスターリンク上の複数のトピックまたは 1 つのトピックを指定し、ミラーリングを停止することができます。コマンドの構文は次のとおりです。
kafka-mirrors --failover --topics <destination-topic-1> .. <destination-topic-n> \
--link <link name> --bootstrap-server <host:port>
以下に示すのは、example-link
という名前のリンクを使用するトピック example-topic
のミラーリングを、kafka-mirrors --failover
を使用して停止する例です。
kafka-mirrors --failover --topics example-topic --link example-link --bootstrap-server localhost:9092
ドライラン¶
promote
または failover
コマンドを実行する前にその結果をテストするには、--dry-run
フラグを追加します。その例を次に示します。
kafka-mirrors --failover --topics example-topic --link example-link --bootstrap-server localhost:9092 --dry-run
ミラートピックの一時停止と再開¶
pause(kafka-mirrors --pause
)コマンドと resume(kafka-mirrors --unpause
)コマンドを使用して一時的にミラーリングを一時停止したり再開したりできます。
ミラートピックを一時停止するには、次のようにします。
kafka-mirrors --pause --topics example-topic --link example-link --bootstrap-server localhost:9092
ミラートピックを再開するには、--unpause
を使用します。
kafka-mirrors --unpause --topics example-topic --link example-link --bootstrap-server localhost:9092
送信元クラスターから送信先クラスターへのコンシューマーグループの移行¶
リンク内でコンシューマーグループを移行するには、リンク構成に consumer.offset.sync.enable=true
を設定し、JSON ファイルにグループフィルターを指定します。そのファイルの名前を kafka-cluster-links コマンド の --consumer-group-filters-json-file
フラグの値として渡します。これは、クラスターリンクを作成するときか、または既存の構成のアップデートとして設定できます。
注釈
コンシューマーグループフィルターに含めるのは、送信先で使用されていないグループのみにしてください。そうすることで、送信先の他のコンシューマーによってコミットされたオフセットがシステムによってオーバーライドされるのを防ぐことができます。フィルターに含まれているグループが送信先でも使用されている場合、システムはそのフィルターを回避しようと試みますが、この場合、その保証はありません。オフセットが上書きされる可能性はあります。ミラートピックの "昇格" が正しく機能するためには、システムがオフセットをロールバックできることが必要ですが、グループが送信先コンシューマーによって使用されていると、ロールバックを実行できません。
この例は、"someGroup" グループを "broker-west" クラスターから "broker-east" クラスターに移行し、移行を実行する前に以下のフィルターセットですべてのオフセットを移行している状態になっていると仮定しています。
{"groupFilters": [
{
"name": "*",
"patternType": "LITERAL",
"filterType": "INCLUDE"
}
]}
コンシューマーグループを送信元クラスターから送信先クラスターに移行するには、以下の手順に従います。
送信元クラスターのコンシューマーを停止します。
consumer.offset.sync.ms
の 2 倍の期間だけ待機します。Cluster Linking レプリケーションが最後のコミット済みオフセットを超えていることを確認します。これは、以下のコマンドで確認できます。
送信元クラスターの CURRENT-OFFSET をチェックします。
kafka-consumer-groups --bootstrap-server broker-west:19091 --describe --group someGroup
出力は次のようになります。
GROUP TOPIC PARTITION CURRENT-OFFSET LOG-END-OFFSET LAG CONSUMER-ID HOST CLIENT-ID someGroup west-offsets 0 100 100 0 - - -
送信先クラスターの LOG-END-OFFSET が、上の手順で記録した CURRENT-OFFSET 以上であることを確認します。
kafka-consumer-groups --bootstrap-server broker-east:19092 --describe --group someGroup
出力は次のようになります。
GROUP TOPIC PARTITION CURRENT-OFFSET LOG-END-OFFSET LAG CONSUMER-ID HOST CLIENT-ID someGroup west-offsets 0 100 100 0
以下のコマンドを実行して、現在のオフセットが送信元と送信先で一致していることを検証します。
送信元クラスターの CURRENT-OFFSET をチェックします。
kafka-consumer-groups --bootstrap-server broker-west:19091 --describe --group someGroup
出力は次のようになります。
GROUP TOPIC PARTITION CURRENT-OFFSET LOG-END-OFFSET LAG CONSUMER-ID HOST CLIENT-ID someGroup west-offsets 0 100 100 0 - - -
送信先クラスターの CURRENT-OFFSET をチェックします。
kafka-consumer-groups --bootstrap-server broker-east:19092 --describe --group someGroup
出力は次のようになります。
GROUP TOPIC PARTITION CURRENT-OFFSET LOG-END-OFFSET LAG CONSUMER-ID HOST CLIENT-ID someGroup west-offsets 0 100 100 0
オフセット移行フィルターをアップデートして、移行処理からグループを削除します。
echo "consumer.offset.group.filters={\"groupFilters\": [ \ { \ \"name\": \"*\", \ \"patternType\": \"LITERAL\", \ \"filterType\": \"INCLUDE\" \ }, \ { \ \"name\": \"someGroup\", \ \"patternType\": \"LITERAL\", \ \"filterType\": \"EXCLUDE\" \ } \ ]}" > newFilters.properties kafka-configs --bootstrap-server broker-east:19092 --alter --cluster-link offsets-cluster-link --add-config-file newFilters.properties
送信先クラスターのコンシューマーを起動します。
おすすめの関連情報¶
- ブログ記事: How Krake Makes Floating Workloads on Confluent Platform
- Cluster Linking の構成オプション
- Confluent Platform の CLI ツール
- ブローカー構成の動的な変更 (
kafka-configs
コマンドの使い方を学ぶ場合) - Kafka コンシューマーグループのコマンドツール (
kafka-consumer-groups
コマンドの使い方を学ぶ場合) - チュートリアル: トピックデータ共有での Cluster Linking の使用
- Cluster Linking のセキュリティ