Confluent for Kubernetes を使用した Cluster Linking¶
Cluster Linking 機能を使用すると、Kafka クラスターをまとめて直接接続し、1 つのクラスター(送信元)から別のクラスター(送信先)にトピックをミラーリングすることができます。この機能により、マルチデータセンター、マルチクラスター、およびハイブリッドクラウドのデプロイを簡単に構築できるようになります。
Confluent for Kubernetes (CFK)には、クラスターリンクの定義、アップデート、管理を行うための宣言型 API、ClusterLink のカスタムリソース定義(CRD)が用意されています。
クラスターリンクの構成は、送信先クラスターで行います。送信元クラスターでの変更は必要ありません。
要件¶
クラスターリンクを作成する前に、次の要件を満たしていることを確認してください。
- 送信先クラスターは、Confluent Platform クラスターまたは Confluent Cloud クラスターである必要があります。
- クラスターリンクは、Admin REST APIs を使用して作成され、管理されます。また、Admin REST APIs が送信先クラスターで実行されている必要があります。送信先クラスターが CFK で管理されている場合は、「Confluent Admin REST API」にある Admin REST APIs のセットアップ手順を参照してください。
- 送信先 Kafka は、送信元 Kafka にとってはコンシューマーです。したがって、送信元 Kafka でセキュリティ設定が有効になっている場合は、送信先クラスターで認証/暗号化を構成して、送信元クラスターと通信する必要があります。必要な手順については、「送信先の Kafka のセキュリティの構成」を参照してください。
クラスターリンクの作成¶
新しい ClusterLink カスタムリソース(CR)を送信先クラスターで作成してクラスターリンクを作成し、kubectl apply -f <Cluster Link CR>
コマンドで CR を適用します。
apiVersion: platform.confluent.io/v1beta1
kind: ClusterLink
metadata:
name: clusterlink --- [1]
namespace: --- [2]
spec:
destinationKafkaCluster:
kafkaRestClassRef:
name: --- [3]
namespace: --- [4]
sourceKafkaCluster: --- [5]
consumerGroupFilters: --- [6]
aclFilters: --- [7]
configs: --- [8]
mirrorTopics: --- [9]
[1](必須)ClusterLink CR の名前を定義します。
[2] ClusterLink CR の名前空間を定義します。
[3](必須) Confluent Admin REST API カスタムリソース(KafkaRestClass CR) の名前。KafkaRestClass CR は、送信先クラスターでクラスターリンクに必要です。
[4](省略可)
[5](必須)送信元クラスターに関する情報を指定します。「送信元 Kafka クラスターの指定」を参照してください。
[6](省略可)送信元クラスターから送信先クラスターに移行されるコンシューマーグループの配列。「コンシューマーグループフィルターの定義」を参照してください。
[7](省略可)送信元クラスターから送信先クラスターに移行されるアクセス制御リスト(ACL)の配列。「ACL フィルターの定義」を参照してください。
[8](省略可)クラスターリンクを作成するための追加構成のマップ。
その例を次に示します。
spec: configs: connections.max.idle.ms: "620000" cluster.link.retry.timeout.ms: "200000"
構成オプションのリストについては、「Cluster Linking の構成オプション」を参照してください。
[9](省略可)ミラートピックの配列。「ミラートピックの作成」を参照してください。
送信元 Kafka クラスターの指定¶
送信元 Kafka クラスターの接続情報を構成します。
spec:
sourceKafkaCluster:
bootstrapEndpoint: --- [1]
kafkaRestClassRef:
name: --- [2]
namespace: --- [3]
clusterID: --- [4]
[1] 送信元 Kafka が実行されているエンドポイント。
[2] 送信元 Kafka クラスター上の KafkaRestClass の CR の名前。
[3] 送信元 Kafka クラスターの名前空間。
[4] 送信元 Kafka クラスターのクラスター ID。
clusterID
と Kafka REST クラス名 [2] の両方を指定している場合は、このclusterID
の値が Kafka REST クラス名 [2] よりも優先されます。クラスター ID は、適切なフラグとともに
curl
またはkafka-cluster
コマンドを使用して取得できます。その例を次に示します。curl https://<cluster url>:8090/kafka/v3/clusters/ -kv
kafka-cluster cluster-id --bootstrap-server kafka.operator.svc.cluster.local:9092 --config /tmp/kafka.properties
送信元 Kafka クラスターに対する SASL/PLAIN 認証
spec:
sourceKafkaCluster:
authentication:
type: plain --- [1]
jaasConfig:
secretRef: --- [2]
jaasConfigPassThrough: --- [3]
secretRef: --- [4]
directoryPathInContainer:--- [5]
[1](必須)
[2](省略可) 参照されるシークレットで、ユーザー名とパスワードを入力します。
[3](省略可)カスタムのログインハンドラーを使用するなど、カスタマイズが必要な場合は、CFK による自動構成を使用せずに構成を直接指定することができます。
[4] Kafka に対する認証用の Kubernetes シークレットの名前に設定します。
[5] Vault によって必要な Kafka 認証情報が挿入されるコンテナーのディレクトリパスに設定します。
Vault を使用する際の認証情報と必要なアノテーションを提供する方法については、「Confluent Platform アプリケーションの CR へのシークレットの提供」を参照してください。
jaasConfig
および jaasConfigPassThrough
の詳細については、「Configure Confluent components to authenticate to Kafka using SASL/PLAIN」を参照してください。
送信元 Kafka クラスターに対する mTLS 認証
spec:
sourceKafkaCluster:
authentication:
type: mtls --- [1]
tls:
enabled: true --- [2]
secretRef: --- [3]
- [1](必須)
- [2](必須) TLS 構成については、次のセクションを参照してください。
- [3](必須)証明書を格納するシークレットの名前。証明書の詳細については、「Confluent for Kubernetes でのネットワーク暗号化の構成」を参照してください。
送信元 Kafka クラスター用の TLS 暗号化
spec:
sourceKafkaCluster:
tls:
enabled: true --- [1]
secretRef: --- [2]
- [1](必須)
- [2](必須) 証明書を格納するシークレットの名前。証明書の詳細については、「Confluent for Kubernetes でのネットワーク暗号化の構成」を参照してください。
コンシューマーグループフィルターの定義¶
コンシューマーグループフィルター により、送信元クラスターから送信先クラスターに移行する一連のコンシューマーグループが指定されます。
コンシューマーグループフィルターを定義するには、ClusterLink の CR で次のプロパティを設定します。
spec:
consumerGroupFilters:
- name: --- [1]
filterType: --- [2]
patternType: --- [3]
- [1](省略可)このフィルターに関連付けられたコンシューマーグループの名前。指定しない場合は、デフォルトでワイルドカード(
*
)が使用されます。 - [2](必須) このフィルターのタイプ。
INCLUDE
またはEXCLUDE
を指定します。 - [3](省略可)リソースのパターンが、
PREFIXED
とLITERAL
のどちらであるかを定義します。デフォルトではLITERAL
です。
次の例では、"someGroup" というコンシューマーグループを除き、すべてのコンシューマーグループを、送信元クラスターから送信先クラスターに移行するよう指定しています。
spec:
consumerGroupFilters:
- name: "*"
patternType: LITERAL
filterType: INCLUDE
- name: "someGroup"
patternType: LITERAL
filterType: EXCLUDE
ACL フィルターの定義¶
ACL フィルター は、送信元クラスターから送信先クラスターに移行する ACL のリストを指定します。
ACL フィルターを定義するには、ClusterLink の CR で次のプロパティを設定します。
spec:
aclFilters:
- accessFilter: --- [1]
host: --- [2]
operation: --- [3]
permissionType: --- [4]
principal: --- [5]
resourceFilter: --- [6]
name: --- [7]
patternType: --- [8]
resourceType: --- [9]
- [1] ACL のアクセスフィルターを指定します。
- [2] 処理の発生元になることができるホスト。デフォルト値は、すべてのホストに一致する
*
です。 - [3](必須)フィルターの処理タイプを指定します。
ANY
、または Confluent ACL ドキュメント で定義されているリソースタイプに基づく処理を指定できます。 - [4](必須)フィルターの権限のタイプ。指定可能なオプションは、
any
、allow
、deny
です。 - [5] プリンシパルの名前。デフォルト値は
*
です。 - [6] この ACL フィルターのリソースを指定します。リソースには、クラスター、グループ、Kafka トピック、トランザクション ID、委任トークンがあります。
- [7] このフィルターに関連付けられたリソースの名前。デフォルト値は
*
です。 - [8](必須)リソースのパターン。指定可能なオプションは、
prefixed
、literal
、any
、match
です。 - [9](必須)フィルターのタイプ。指定可能なオプションは、
any
、cluster
、group
、topic
、transactionId
、delegationToken
です。
送信先の Kafka のセキュリティの構成¶
セキュリティが有効になっている場合は、パスワードエンコーダーシークレットを使用して送信先の Kafka ブローカーを構成して、SASL/PLAIN、mTLS、または TLS モードの証明書とパスワードなど、重要なリンク構成を暗号化する必要があります。
パスワードエンコーダーシークレットの設定の詳細については、「パスワードエンコーダーシークレットの管理」を参照してください。
シークレットエンコーダーと、エンコーダーシークレットのローテーションを行う方法の詳細については、「パスワード構成の動的なアップデート」を参照してください。
クラスターリンクの編集¶
構成オプションのリストについては、「Cluster Linking の構成オプション」を参照してください。
ClusterLink の CR をアップデートし、以下を実行して変更を適用します。
kubectl apply -f <ClusterLink CR>
クラスターリンクの削除¶
クラスターリンクを削除すると、そのクラスターリンクで管理されていたすべてのミラートピックがフェイルオーバーされ、それらのトピックのために KafkaTopic の CR が作成されます。
When you delete a cluster link, the history of any STOPPED
topics is also
deleted. If you need the Last Source Fetch Offset
or the Status Time
of
your promoted or failed-over mirror topics, ensure you save those before you
delete the cluster link.
クラスターリンクを削除するには、以下を実行します。
kubectl delete -f <Cluster Link CR>
クラスターリンクの説明¶
以下の情報を、クラスターリンクのステータスから取得できます。
- クラスターリンク ID
- クラスターリンク名
- 送信先 Kafka クラスター ID
- 送信元 Kafka クラスター ID
- ミラートピックの数
- クラスターのすべてのミラートピックのステータス
送信先 Kafka クラスターのクラスターリンクのリストを表示するには、以下を実行します。
kubectl get clusterlink <cluster link name> -oyaml
クラスターリンクでは、プロモートされたトピックがミラートピックではなくなっていても、それらのトピックに関する情報が維持されます。
または、次のコマンドを使用して、クラスターリンクのステータスを取得することができます。
kubectl explain clusterlink.status
ミラートピックの作成¶
ミラートピックは、クラスターリンクによって作成され、所有されます。
ミラートピックは、送信先クラスター上で、一意の名前とともに、新しいトピックとして作成されます。クラスターリンクの送信元クラスター上に、同じ名前のトピックが存在していることが必要です。これが、ミラートピックの送信元トピックとなります。
既存の読み取りおよび書き込み可能トピックをミラートピックに変換することはできません。
For details on mirror topic creation in Confluent Platform, see Mirror Topic Creation.
ミラートピックを作成するには、以下を実行します。
ClusterLink の CR で以下を設定します。
spec: mirrorTopics: - name: --- [1] state: --- [2] configs: --- [3]
- [1](必須)送信元トピックの名前。これと同じ名前のトピックが、クラスターリンクの送信元クラスター上に存在する必要があります。
- [2](省略可) このミラートピックの作成時のステート。指定可能な値は、
PAUSE
、PROMOTE
、FAILOVER
、ACTIVE
です。ステートを定義しない場合は、デフォルト値であるACTIVE
となります。 - [3](省略可)構成オプションのリストについては、「Cluster Linking の構成オプション」を参照してください。
以下を実行して変更を適用します。
kubectl apply -f <Cluster Link CR>
ミラートピックの変更¶
ミラートピックをプロモート、フェイルオーバー、一時停止することができます。
- トピックのプロモート: ミラーリングを停止し、送信先のトピックを書き込み可能にするときは、トピックをプロモートします。ラグがゼロであるかどうかを確認し、トピックのプロモート前に最終同期を行います。
- トピックのフェイルオーバー: これは、プロモートに似ていますが、ラグがゼロになるまで待機しません。
To learn more about what happens to a Confluent Platform mirror topic when promoted or failed over, see Converting a Mirror Topic to a Normal Topic.
For the available states and statues of a mirror topic in Confluent Platform, see Mirror Topic States and Statuses.
ミラートピックのプロモートまたはフェイルオーバーを行うと、KafkaTopic の CR が CFK によって作成されます。新しい KafkaTopic の CR の名前は次のとおりです。送信先クラスターのトピック名は変わらず、ミラートピックの名前と同じままです。
clink-toLowerCase(<topic-name>)-<First section of the ClusterLink object UID>
たとえば、ミラートピック myMirrorTopic
をプロモートすると、新しい KafkaTopic の CR の名前は次のようになります。
clink-mymirrortopic-0154a475
新しい KafkaTopic の CR の名前が 63 文字を超える場合は、64 文字目以降が切り捨てられます。
アクティブなミラートピックを変更してプロモート、フェイルオーバー、一時停止を行うには、次の手順に従います。
ClusterLink の CR の
mirrorTopics
セクションを変更します。spec: mirrorTopics: - name: --- [1] state: --- [2]
- [1](必須)
- [2](省略可) このミラートピックのステート。指定可能な値は、
PAUSE
、PROMOTE
、FAILOVER
、ACTIVE
です。
以下を実行して変更を適用します。
kubectl apply -f clusterlink.yaml
ミラートピックの削除¶
For the process of deleting a mirror topic in Confluent Platform, see Mirror Topic Deletion.
For the available states and statues of a mirror topic in Confluent Platform, see Mirror Topic States and Statuses.
ミラートピックを削除するには、次の手順に従います。
ClusterLink の CR にあるリストからミラートピックを削除します。
spec: mirrorTopics: - name: --- [1]
- [1] ミラートピックの名前とその他のプロパティを削除します。
以下を実行して変更を適用します。
kubectl apply -f clusterlink.yaml