Cluster Linking のコマンド¶
Looking for Confluent Cloud Cluster Linking docs? You are currently viewing Confluent Platform documentation. If you are looking for Confluent Cloud docs, check out Cluster Linking on Confluent Cloud.
重要
この機能はプレビュー機能として利用できます。プレビュー機能とは、開発者から早い段階でフィードバックを受けるために提供している Confluent Platform のコンポーネントのことです。この機能は、評価用、本稼働環境以外でのテスト用、あるいは Confluent にフィードバックを提供するために使用できます。
Confluent Enterprise 6.0.0 には新しい Kafka コマンド kafka-cluster-links が含まれています。このコマンドが正しく動作するには、送信先クラスターで Confluent Platform 6.0.0 以降が実行されており、Cluster Linking が送信先クラスターで有効にされている必要があります。
このセクションでは、Cluster Linking のコマンドと例について、さらに Cluster Linking に関連する Confluent Platform の CLI ツール について説明します。
クラスターリンクの構成オプションは、kafka-cluster-links
コマンドのフラグの値として使用できます。これらのオプションの一部は、以下のコマンド例のコンテキストの中で示されています。完全なリストは、「リンクプロパティ」にリファレンス形式で記載されています。
kafka-cluster-links コマンド¶
kafka-cluster-links
は、クラスター間のリンクを作成および管理するために使用します。
Confluent Platform の他の Kafka コマンドと同様に、--bootstrap-server
は kafka-cluster-links
コマンドの必須フラグです。
--bootstrap-server
(必須)クラスター内のブローカーの接続文字列で、
host:port
形式です(ブローカーが複数存在する場合はコンマ区切りリストを指定できます)。ミラートピックの作成先となる送信先クラスターを指定する必要があります。送信先クラスターは、Confluent Platform 6.0.0 以降を実行している必要があります。これは Cluster Linking をサポートするために必要です。- 型: string
- デフォルト: 空の文字列
以降のすべての kafka-cluster-links
で --bootstrap-server
を使用します。
ちなみに
ほとんどの場合、--bootstrap-server
はコマンドの最初(kafka-cluster-links
の直後)または最後に指定できます。以下のセクションの例では、--bootstrap-server
が kafka-cluster-links
の直後に使用されています。「Cluster Linking のチュートリアル」の例では、コマンドの最後に使用されています。例外は、クラスターリンクを作成するコマンドです。このコマンドでは、--bootstrap-server
の値を少なくとも 2 つ指定する必要があります。
クラスターリンクの作成¶
コマンド例
kafka-cluster-links --bootstrap-server localhost:9092 \
--create \
--link-name example-link \
--config bootstrap.servers=example-1:9092,example-2:9092,example-3:9092
出力例
Cluster link 'example-link' creation successfully completed.
クラスターリンクを作成するには、kafka-cluster-links
を bootstrap-server と以下のフラグとともに使用します。
--link-name
(必須)作成するクラスターリンクの名前です。クラスター内で一意である必要があります。
- 型: string
(必須)送信先クラスターが送信元クラスターと通信する方法を指定するために、以下の(両方ではなく)いずれかのパラメーターを指定する必要があります。使用可能な構成は、クライアントを構成するために使用する構成です。これには、必須の bootstrap.servers
と他の必要なセキュリティおよび認可プロパティが含まれます。
--config
作成時のクラスターリンクに適用される構成のコンマ区切りリストです("キー=値" 形式)。このフラグを使用する場合、構成は直接コマンドラインで指定されます(次のフラグで説明する、ファイルに指定する方法とは対照的です)。角かっこを使用して、コンマを含む値をグループ化することができます。使用できる全構成のリストについては、「リンクプロパティ」を参照してください。
- 型: string
--config-file
クラスターリンクの 構成 が含まれているプロパティファイルです。これは、クラスターリンク構成の推奨される指定方法です。
- 型: string
--consumer-group-filters-json-file
consumer.offset.group.filters
の構成に使用する JSON ファイルのパスです。詳細については、「送信元クラスターから送信先クラスターへのコンシューマーグループの移行」を参照してください。- 型: string
「認可(ACL)」(サブセクション「送信元クラスター上のリンクの ACL」)で説明するように、クラスターリンクを作成するには ALTER CLUSTER
認可が必要です。
--validate-only
- 指定した場合、クラスターリンクを指定どおりに作成できるかどうかを検証します(ただし作成は行いません)。
たとえば、link-config.properties
という名前のファイルに、以下のようにセキュアクラスターリンク用の構成を指定したとします。
bootstrap.servers=example-1:9092,example-2:9092,example-3:9092
sasl.mechanism=PLAIN
sasl.jaas.config=org.apache.kafka.common.security.plain.PlainLoginModule required username="example-user" password="example-password"
security.protocol=SASL_SSL
ssl.endpoint.identification.algorithm=https
この場合、以下のコマンドを使用してクラスターリンク example-link
を作成できます。
kafka-cluster-links --bootstrap-server localhost:9092 --create --link-name example-link --config-file link-config.properties
クラスターリンクのリストの表示¶
コマンド例
kafka-cluster-links --list --bootstrap-server localhost:9092
出力例
Link name: 'example-link', link ID: '123-some-link-id', cluster ID: '123-some-cluster-id'
既存のクラスターリンクのリストを表示できます。このコマンドでは、リンク名、リンク ID(内部で割り当てられる一意の ID)、およびリンクされたクラスターのクラスター ID が返されます。
ちなみに
クラスターリンクのリストを表示するには、kafka-cluster-links
を bootstrap-server と以下のフラグとともに使用します。
--link-name
指定した場合、指定されたクラスターリンクのリストのみを表示します。
- 型: string
--command-config
指定した場合、リンクされているすべてのトピックのリストを表示します。
- 型: string
--include-topics
指定した場合、リンクされているすべてのトピック(ミラートピック)のリストを表示します。
- 型: string
クラスターリンクのリストを表示するには、DESCRIBE CLUSTER
認可が必要です。
ミラートピックのリストの表示¶
コマンド例
kafka-cluster-links --list --link-name example-link --include-topics --bootstrap-server localhost:9092
出力例
Link name: 'example-link', link ID: '123-some-link-id', cluster ID: '123-some-cluster-id', topics: [example-topic]
ミラートピックのリストを表示するには、kafka-cluster-links --list
を使用します。このオプションについては、前のセクションで説明しました。この例では、--include-topics
フラグを使用してクラスターリンクのリストを表示します。
クラスターリンクの詳細の表示¶
コマンド例
kafka-configs --bootstrap-server localhost:9092 \
--describe \
--cluster-link example-link
出力例
Dynamic configs for cluster-link example-link are:
metadata.max.age.ms=300000 sensitive=false synonyms={}
...
クラスターリンクの詳細を表示するには、kafka-cluster-links
を bootstrap-server と以下のフラグとともに使用します。
--cluster-link
(必須)詳細を表示するクラスターリンクの名前です。
- 型: string
--command-config
AdminClient に渡される構成が含まれているプロパティファイルです。
- 型: string
クラスターリンクの詳細を表示するには、DESCRIBE CLUSTER
認可が必要です。
クラスターリンクの変更¶
コマンド例
kafka-configs --bootstrap-server localhost:9092 \
--alter \
--cluster-link example-link \
--add-config cluster.link.retry.timeout.ms=10000 \
--delete-config request.timeout.ms
出力例
Completed updating config for cluster-link example-link.
既存のリンクを変更するには、kafka-cluster-links
を bootstrap-server と以下のフラグとともに使用します。
--alter
- (必須)リンクを変更します。
--cluster-link
(必須)変更するクラスターリンクの名前です。
- 型: string
少なくとも以下のいずれかを指定する必要があります。
--add-config
コマンドラインで構成を "キー=値" 形式で追加します。角かっこを使用して、コンマを含む値をグループ化することができます。使用できる全構成のリストについては、「リンクプロパティ」を参照してください。
- 型: string
--add-config-file
追加する 構成 が含まれているプロパティファイルのパスです。
- 型: string
--delete-config
削除する構成キーのコンマ区切りリストです。
- 型: string
「認可(ACL)」で説明するように、リンクに関連付けられているクラスターを変更するには ALTER CLUSTER
認可が必要です。
新しい構成を検証するためのリンクの一時停止と再開¶
動的に更新可能な構成(SSL など)の場合、古い構成が使用される接続と新しい構成が使用される接続とが混在する可能性があります。アクションに不要な接続は、再作成されないようになっています。API と CLI の出力には最新の構成が反映されていますが、これらは、kafka-configs --alter
操作後の真の構成の状態を表してない場合があります。
新しい構成が適用されていることを確認するには、cluster.link.paused
を使用してリンクを一時停止した後、一時停止を解除してリンクを再開してください。詳細については、「Cluster Linking の構成オプション」の cluster.link.paused
を参照してください。
クラスターリンクの削除¶
コマンド例
kafka-cluster-links --bootstrap-server localhost:9092 \
--delete \
--link-name example-link
出力例
Cluster link 'example-link' deletion successfully completed.
既存のリンクを削除するには、kafka-cluster-links
を bootstrap-server と以下のフラグとともに使用します。
--link-name
(必須)詳細を表示するクラスターリンクの名前です。
- 型: string
--command-config
AdminClient に渡される構成が含まれているプロパティファイルです。
- 型: string
--validate-only
- 指定した場合、クラスターリンクの削除を検証します(実際には削除しません)。
--force
- 現在リンクされているトピックがある場合でも強制的にリンクを削除します。
ちなみに
クラスターリンクにリンクされた未処理のトピックがある場合、強制的に削除(--force
)しなければ、リクエストが失敗することがあります。強制的に削除する場合、リンクされていたトピックのリンクがバックグラウンドで解除されます。
「認可(ACL)」で説明するように、クラスターリンクを削除するには、ALTER CLUSTER
認可が必要です。
ミラートピックの作成¶
トピックミラーは、別のトピック内のすべてのデータとメタデータを反映した読み取り専用トピックです。
ミラートピックの作成は通常のトピックの作成とほぼ同じですが、追加のタグが必要となります。ミラートピックが作成されると、ミラーはソースからデータを自動的にフェッチし、トピックに対してミラーリングが有効にされている限りこれを継続します。
ミラーリングの継続中にソーストピックが削除されると、ミラートピックは失敗の状態になります。この場合、ミラーリングを停止するか、またはトピックを削除する必要があります。
コマンド例
kafka-topics --bootstrap-server localhost:9092 \
--create \
--topic example-topic \
--link-name example-link \
--mirror-topic example-topic
出力例
Created topic example-topic.
ミラートピックを作成するには、kafka-cluster-links
を bootstrap-server と以下のフラグとともに使用します。
--topic
(必須)作成するトピックの名前です。
- 型: string
--link-name
(必須)ソーストピックからデータを取得するために使用するクラスターリンクの名前です。
- 型: string
--mirror-topic
(必須)クラスターリンクを通じてミラーリングするソーストピックの名前です。
- 型: string
ちなみに
- プレビューでは、
--mirror-topic
の値は元の--topic
の名前と一致している必要があります。 - ミラートピックのリストを表示する には、
--include-topics
フラグを指定してkafka-cluster-links --list
を実行します。
ミラートピックを作成するには、ALTER CLUSTER
認可が必要です。
トピックのミラーリングの停止¶
コマンド例
kafka-topics --bootstrap-server localhost:9092 \
--alter --mirror-action stop \
--topic example-topic
出力例
Topic 'example-topic's mirror was successfully stopped.
トピックのミラーリングを停止するには、kafka-cluster-links
を bootstrap-server と以下のフラグとともに使用します。
--topic
(必須)作成するトピックの名前です。
- 型: string
トピックのミラーリングを停止するには、ALTER CLUSTER
認可が必要です。
送信元クラスターから送信先クラスターへのコンシューマーグループの移行¶
リンク内でコンシューマーグループを移行するには、リンク構成に consumer.offset.sync.enable=true
を設定し、JSON ファイルにグループフィルターを指定します。そのファイルの名前を kafka-cluster-links コマンド の --consumer-group-filters-json-file
フラグの値として渡します。これは、クラスターリンクを作成するときか、または既存の構成のアップデートとして設定できます。
この例は、"someGroup" グループを "broker-west" クラスターから "broker-east" クラスターに移行し、移行を実行する前に以下のフィルターセットですべてのオフセットを移行している状態になっていると仮定しています。
{"groupFilters": [
{
"name": "*",
"patternType": "LITERAL",
"filterType": "INCLUDE"
}
]}
コンシューマーグループを送信元クラスターから送信先クラスターに移行するには、以下の手順に従います。
送信元クラスターのコンシューマーを停止します。
consumer.offset.sync.ms
の 2 倍の期間だけ待機します。Cluster Linking レプリケーションが最後のコミット済みオフセットを超えていることを確認します。これは、以下のコマンドで確認できます。
送信元クラスターの CURRENT-OFFSET をチェックします。
kafka-consumer-groups --bootstrap-server broker-west:19091 --describe --group someGroup
出力は次のようになります。
GROUP TOPIC PARTITION CURRENT-OFFSET LOG-END-OFFSET LAG CONSUMER-ID HOST CLIENT-ID someGroup west-offsets 0 100 100 0 - - -
送信先クラスターの LOG-END-OFFSET が、上の手順で記録した CURRENT-OFFSET 以上であることを確認します。
kafka-consumer-groups --bootstrap-server broker-east:19092 --describe --group someGroup
出力は次のようになります。
GROUP TOPIC PARTITION CURRENT-OFFSET LOG-END-OFFSET LAG CONSUMER-ID HOST CLIENT-ID someGroup west-offsets 0 100 100 0
以下のコマンドを実行して、現在のオフセットが送信元と送信先で一致していることを検証します。
送信元クラスターの CURRENT-OFFSET をチェックします。
kafka-consumer-groups --bootstrap-server broker-west:19091 --describe --group someGroup
出力は次のようになります。
GROUP TOPIC PARTITION CURRENT-OFFSET LOG-END-OFFSET LAG CONSUMER-ID HOST CLIENT-ID someGroup west-offsets 0 100 100 0 - - -
送信先クラスターの CURRENT-OFFSET をチェックします。
kafka-consumer-groups --bootstrap-server broker-east:19092 --describe --group someGroup
出力は次のようになります。
GROUP TOPIC PARTITION CURRENT-OFFSET LOG-END-OFFSET LAG CONSUMER-ID HOST CLIENT-ID someGroup west-offsets 0 100 100 0
オフセット移行フィルターをアップデートして、移行処理からグループを削除します。
echo "consumer.offset.group.filters={\"groupFilters\": [ \ { \ \"name\": \"*\", \ \"patternType\": \"LITERAL\", \ \"filterType\": \"INCLUDE\" \ }, \ { \ \"name\": \"someGroup\", \ \"patternType\": \"LITERAL\", \ \"filterType\": \"EXCLUDE\" \ } \ ]}" > newFilters.properties kafka-configs --bootstrap-server broker-east:19092 --alter --cluster-link offsets-cluster-link --add-config-file newFilters.properties
送信先クラスターのコンシューマーを起動します。
おすすめの関連情報¶
- ブログ記事: How Krake Makes Floating Workloads on Confluent Platform
- Cluster Linking の構成オプション
- Confluent Platform の CLI ツール
- ブローカー構成の動的な変更 (
kafka-configs
コマンドの使い方を学ぶ場合) - Kafka コンシューマーグループのコマンドツール (
kafka-consumer-groups
コマンドの使い方を学ぶ場合)
- ブローカー構成の動的な変更 (
- Cluster Linking のチュートリアル
- Cluster Linking のセキュリティ