Cluster Linking の構成オプション¶
Looking for Confluent Cloud Cluster Linking docs? You are currently viewing Confluent Platform documentation. If you are looking for Confluent Cloud docs, check out Cluster Linking on Confluent Cloud.
Cluster Linking と Confluent for Kubernetes の使用¶
Cluster Linking を、Kubernetes 用の Confluent でデプロイされた Confluent Platform とともに使用できます。
Confluent for Kubernetes 2.2 では、組み込みの Cluster Linking サポートがリリースされました。CFK ドキュメントの「Confluent for Kubernetes を使用する Cluster Linking」セクションを参照してください。
CFK の以前のバージョンで Cluster Linking を構成するには、Kafka カスタムリソースの configOverrides
を使用します。configOverrides
の使用の詳細については、CFK ドキュメントの「構成のオーバーライド」を参照してください。
その例を次に示します。
apiVersion: platform.confluent.io/v1beta1
kind: Kafka
metadata:
name: kafka
namespace: confluent
spec:
replicas: 3
image:
application: confluentinc/cp-server:7.1.1
init: confluentinc/confluent-init-container:2.0.1
configOverrides:
server:
- confluent.cluster.link.enable=true # Enable Cluster Linking
Cluster Linking と Ansible の使用¶
You can use Cluster Linking with Confluent Platform deployed with Ansible .
To configure Cluster Linking with Ansible, add to the
kafka_broker_custom_properties
section in the inventory as describe in
Configure Confluent Platform with Ansible.
その例を次に示します。
all:
vars:
kafka_broker_custom_properties:
confluent.cluster.link.enable: "true"
リンクプロパティ¶
クラスターリンクには複数の構成を使用できます。以下のセクションでは、 CLI コマンド を使用してこれらを設定する方法について説明し、使用できるプロパティのリストを示します。
クラスターリンクに対するプロパティの設定¶
個々のクラスターリンクのそれぞれに構成を設定することができます。そのためには、構成を "キー=値" ペアとしてプロパティファイルに指定し、以下のいずれかを使用してそのファイルを引数として CLI コマンド に渡します。
--config-file
フラグ(初めてリンクを作成する場合)--add-config-file
フラグ(既存のリンクの構成をアップデートする場合)
代わりに、以下のいずれかを使用して、コマンドラインで直接 "キー=値" ペアを指定することでクラスターリンクのプロパティを指定またはアップデートすることもできます。
--config
フラグ(初めてリンクを作成する場合)--add-config
フラグ(既存のリンクの構成をアップデートする場合)
ちなみに
既存のクラスターリンクの構成を更新するときには、変更された構成のみを渡します。構成ファイルの完全なセットを渡すほうが簡単ではありますが、
--add-config-file
で構成ファイルを使用する場合は、更新する構成のみがそれに含まれていることに、特に留意してください。たとえば、my-update-configs.txt
には以下のものが含まれる可能性があります。consumer.offset.sync.ms=25384 topic.config.sync.ms=38254
クラスターリンクの構成はさまざまな側面を変更できますが、その 送信元クラスター (ブートストラップサーバー、送信元クラスター ID など)やプレフィックス、リンク名 を変更することはできません。
リンクプロパティをファイルおよびコマンドラインで指定する例とコマンド構文は、「クラスターリンクの作成」と「クラスターリンクの変更」、および「チュートリアル: トピックデータ共有での Cluster Linking の使用」に記載されています。
構成オプション¶
クラスターリンクを指定するために使用するプロパティは以下のとおりです。
フィルター(ACL 同期、コンシューマーオフセット同期、ミラートピック自動作成)を持つ機能を最初に有効にしてから無効にした場合は、既存のフィルターがすべて、クラスターリンクからクリア(削除)されます。
acl.filters
移行する ACL のリストを指定する JSON 文字列です。ACL を
acl.filters.json
ファイルに定義し、そのファイル名を引数として--acl-filters-json-file
フラグに渡します。ACL を JSON ファイルに定義する方法の例については、「送信元クラスターから送信先クラスターへの ACL の移行」を参照してください。- 型: string
- デフォルト: ""
注釈
acl.filters
に情報を入力するには、「送信元クラスターから送信先クラスターへの ACL の移行」の説明に従って、ACL を指定する JSON ファイルをコマンドラインで渡します。acl.sync.enable
ACL を移行するかどうかを指定します。詳細については、「送信元クラスターから送信先クラスターへの ACL の移行」を参照してください。
- 型: ブール値
- デフォルト: false
acl.sync.ms
ACL をリフレッシュする頻度をミリ秒単位で指定します(ACL 移行が有効にされている場合)。デフォルトは 5000 ミリ秒(5 秒)です。
- 型: int
- デフォルト: 5000
auto.create.mirror.topics.enable
- 送信元クラスターのトピックを基にミラートピックを自動作成するかどうかを指定します。「true」に設定した場合、ミラートピックは自動作成されます。このオプションを「false」に設定すると、ミラートピック作成が無効になり、既存のフィルターがある場合はクリアされます。このオプションの詳細については、「ミラートピックの自動作成」を参照してください。
auto.create.mirror.topics.filters
- 1 つのプロパティ
topicFilters
を持つ JSON オブジェクト。適用されるフィルターの配列が含まれており、どのトピックをミラーリングするかを示します。このオプションの詳細については、「ミラートピックの自動作成」を参照してください。 cluster.link.paused
クラスターリンクが実行中であるか一時停止されているか。デフォルト値は false です。
- 型: ブール値
- デフォルト: false
cluster.link.retry.timeout.ms
失敗を再試行せず、パーティションが失敗としてマークされるまでのミリ秒数です。ソーストピックが削除され、このタイムアウト以内に再作成されない場合、リンクには古いトピックのレコードに加えて新しいトピックのレコードが含まれる場合があります。
- 型: int
- デフォルト: 300000(5 分)
availability.check.ms
送信元クラスターが利用可能かどうかをクラスターリンクがチェックする頻度。クラスターリンクがチェックする頻度をミリ秒単位で指定します。
- 型: int
- デフォルト: 60000(1 分)
クラスターリンクは、送信元クラスターがデータのミラーリングにまだ利用できる状態であるかどうかを定期的にチェックします。送信元クラスターが(停止、災害などで)利用できない状態に陥った場合、クラスターリンクは、そのステータスおよび対応するミラートピックのステータスを更新することでその旨を伝えます。
availability.check.ms
は availability.check.consecutive.failure.threshold と連動します。
availability.check.consecutive.failure.threshold
送信元クラスターに対する可用性チェックの不合格が連続して許容される回数。この回数を超えると、クラスターリンクのステータスが
SOURCE_UNAVAILABLE
になります。- 型: int
- デフォルト: 5
たとえばデフォルト(5)を使用した場合、送信元クラスターは、チェックの結果が連続 5 回不合格と判定されると利用不可と判断されます。さらに、availability.check.ms がそのデフォルトである 1 分に設定されていて、チェックの結果が 5 回連続で不合格だった場合、クラスターリンクは 5 分後に
SOURCE_UNAVAILABLE
と表示されることになります。connections.max.idle.ms
アイドル接続タイムアウトです。サーバーソケットプロセッサースレッドは、これより長い時間アイドル状態になっている接続を閉じます。
- 型: int
- デフォルト: 600000
connections.max.idle.ms
Used only for source-initiated links. Set this to INBOUND on the destination cluster’s link (which you create first). Set this to OUTBOUND on the source cluster’s link (which you create second). You must use this in combination with
link.mode
. This property should only be set for source-initiated cluster links.- 型: string
- Default: OUTBOUND
consumer.offset.group.filters
移行するコンシューマーグループのリストを指定するための JSON です。詳細については、「送信元クラスターから送信先クラスターへのコンシューマーグループの移行」を参照してください。
- 型: string
- デフォルト: ""
注釈
コンシューマーグループフィルターに含めるのは、送信先で使用されていないグループのみにしてください。そうすることで、送信先の他のコンシューマーによってコミットされたオフセットがシステムによってオーバーライドされるのを防ぐことができます。フィルターに含まれているグループが送信先でも使用されている場合、システムはそのフィルターを回避しようと試みますが、この場合、その保証はありません。オフセットが上書きされる可能性はあります。ミラートピックの "昇格" が正しく機能するためには、システムがオフセットをロールバックできることが必要ですが、グループが送信先コンシューマーによって使用されていると、ロールバックを実行できません。
consumer.offset.sync.enable
コンシューマーオフセットを送信元クラスターから移行するかどうかを指定します。
これを設定して Cluster Linking を実行し、その後で無効にすると、フィルターがクラスターリンクからクリア(削除)されます。
- 型: ブール値
- デフォルト: false
consumer.offset.sync.ms
コンシューマーオフセットを同期する頻度をミリ秒単位で指定します(有効にされている場合)。
- 型: int
- デフォルト: 30000
num.cluster.link.fetchers
クラスターリンクのソースブローカーからメッセージをレプリケートするために使用するフェッチャースレッドの数です。
- 型: int
- デフォルト: 1
topic.config.sync.ms
トピック構成をリフレッシュする頻度をミリ秒単位で指定します。
- 型: int
- デフォルト: 5000
local.listener.name
- ソース開始クラスターリンクの場合は、送信元クラスターのクラスターリンクで使用される代替リスナーです。詳細については、「Cluster Linking におけるリスナーについて」を参照してください。
ssl.provider
Used only for source-initiated links. Set this to DESTINATION on the destination cluster’s link (which you create first). Set this to SOURCE on the source cluster’s link (which you create second). You must use this in combination with
connection.mode
. This property should only be set for source-initiated cluster links.- 型: string
- Default: DESTINATION
共通構成オプション¶
以下の共通プロパティは Cluster Linking に固有のものではありませんが、クラスターリンクのセットアップと管理に特に関係があります。これらは Confluent Platform のクライアント、ブローカー、およびセキュリティ構成に共通するプロパティです。これらについては、示されている各リンク先のセクションで詳しく説明します。
クライアントの構成¶
AdminClient
全構成のリストについては、「AdminClient の構成」を参照してください。
bootstrap.servers
client.dns.lookup
metadata.max.age.ms
retry.backoff.ms
request.timeout.ms
クラスターリンクレプリケーションの構成¶
以下の構成オプションについては、「Kafka ブローカーの構成」を参照してください。
replica.fetch.backoff.ms
replica.fetch.max.bytes
replica.fetch.min.bytes
replica.fetch.response.max.bytes
replica.fetch.wait.max.ms
replica.socket.receive.buffer.bytes
replica.socket.timeout.ms
クライアント SASL および SSL の構成¶
sasl.client.callback.handler.class
sasl.jaas.config
sasl.kerberos.kinit.cmd
sasl.kerberos.min.time.before.relogin
sasl.kerberos.service.name
sasl.kerberos.ticket.renew.jitter
sasl.kerberos.ticket.renew.window.factor
sasl.login.callback.handler.class
sasl.login.class
sasl.login.refresh.buffer.seconds
sasl.login.refresh.min.period.seconds
sasl.login.refresh.window.factor
sasl.login.refresh.window.jitter
sasl.mechanism
security.protocol
ssl.cipher.suites
ssl.enabled.protocols
ssl.endpoint.identification.algorithm
ssl.engine.factory.class
ssl.key.password
ssl.keymanager.algorithm
ssl.keystore.location
ssl.keystore.password
ssl.keystore.type
ssl.protocol
ssl.provider
ssl.secure.random.implementation
ssl.trustmanager.algorithm
ssl.truststore.location
ssl.truststore.password
ssl.truststore.type
Control Center の必須構成¶
Cluster Linking では、Confluent Control Center と通信して Control Center UI でミラートピックを適切に表示するために、組み込みの v3 Confluent REST Proxy が必要です。REST 構成が実装されていない場合、ミラートピックは、Control Center に通常のトピックとして表示され、情報が不正確になります(「既知の制限とベストプラクティス」も参照してください)。
Control Center のプロパティファイルでの REST エンドポイントの構成¶
Control Center で Cluster Linking を使用する場合は、Control Center クラスターに REST エンドポイントを構成し、ブローカーで HTTP サーバーを有効にする必要があります。すべてのブローカーでこれが適切に構成されていない場合、Cluster Linking に Confluent Control Center からアクセスできません。
適切な Control Center プロパティファイル($CONFLUENT_HOME/etc/confluent-control-center/control-center-dev.properties
または control-center.properties
など)で、confluent.controlcenter.streams.cprest.url
を使用して controlcenter.cluster
の REST エンドポイントを定義します。デフォルトは以下に示すように http://localhost:8090
です。
# Kafka REST endpoint URL
confluent.controlcenter.streams.cprest.url="http://localhost:8090"
各ブローカーの関連 URL を特定します。クラスターに複数のブローカーがある場合は、コンマ区切りのリストを使用します。
参考
「Control Center の構成リファレンス」の confluent.controlcenter.streams.cprest.url
Kafka ブローカーでの REST エンドポイントの認証の構成(セキュアなセットアップ)¶
ちなみに
- Cluster Linking では Metadata Service(MDS)やセキュリティを実行する必要はありませんが、セキュリティを構成する場合は、RBAC の MDS クライアント構成を示す以下の例から始めることができます。
confluent.http.server.listeners
の代わりに(Metadata Service を有効にする)confluent.metadata.server.listeners
を使用して API リクエストをリッスンできます。confluent.metadata.server.listeners
またはconfluent.http.server.listeners
を使用できますが、両方を使用することはできません。リスナーが HTTPS を使用する場合は、適切な SSL 構成パラメーターも設定する必要があります。詳細については、「Admin REST APIs の構成オプション」を参照してください。
セキュアなセットアップで Cluster Linking を実行するには、Kafka ブローカーの各 server.properties
ファイルで REST エンドポイントの認証を構成する必要があります。Kafka ブローカーファイルにこれらの構成がない場合、Control Center はセキュアなセットアップの Cluster Linking にアクセスできません。
最小限でも、以下の構成が必要になります。
# EmbeddedKafkaRest: HTTP Auth Configuration
kafka.rest.kafka.rest.resource.extension.class=io.confluent.kafkarest.security.KafkaRestSecurityResourceExtension
kafka.rest.rest.servlet.initializor.classes=io.confluent.common.security.jetty.initializer.InstallBearerOrBasicSecurityHandler
以下は、ブローカーの server.properties
ファイルでの Kafka RBAC 用 MDS クライアント構成の例です。
# EmbeddedKafkaRest: Kafka Client Configuration
kafka.rest.bootstrap.servers=<host:port>, <host:port>, <host:port>
kafka.rest.client.security.protocol=SASL_PLAINTEXT
# EmbeddedKafkaRest: HTTP Auth Configuration
kafka.rest.kafka.rest.resource.extension.class=io.confluent.kafkarest.security.KafkaRestSecurityResourceExtension
kafka.rest.rest.servlet.initializor.classes=io.confluent.common.security.jetty.initializer.InstallBearerOrBasicSecurityHandler
kafka.rest.public.key.path=<rbac_enabled_public_pem_path>
# EmbeddedKafkaRest: MDS Client configuration
kafka.rest.confluent.metadata.bootstrap.server.urls=<host:port>, <host:port>, <host:port>
kafka.rest.ssl.truststore.location=<truststore_location>
kafka.rest.ssl.truststore.password=<password>
kafka.rest.confluent.metadata.http.auth.credentials.provider=BASIC
kafka.rest.confluent.metadata.basic.auth.user.info=<user:password>
kafka.rest.confluent.metadata.server.urls.max.age.ms=60000
kafka.rest.client.confluent.metadata.server.urls.max.age.ms=60000
参考
- Admin REST APIs のセキュリティ
- REST Proxy のセキュリティ
- 「Confluent Platform のデモ(cp-demo)」のオンプレミスのチュートリアル(「セキュリティ」セクションで提供されている、さまざまな種類の構成の例)
Cluster Linking の無効化¶
Confluent Enterprise バージョン 7.0.0 以降を実行しているクラスターで Cluster Linking を無効にするには、以下の行を送信先クラスターのブローカー構成($CONFLUENT_HOME/etc/server.properties
など)に追加します。
confluent.cluster.link.enable=false
これで、そのクラスターを送信先とするクラスターリンク、またはそのクラスターを送信元とした "ソース開始クラスターリンク" の作成が無効になります。このクラスターを送信元とする送信先開始クラスターリンクの作成は無効にならない点に注意してください。
Cluster Linking is not available as a dynamic configuration. You must either
enable it before starting the brokers, or to enable it on a running cluster, set
the configuration confluent.cluster.link.enable=true
on the brokers and
restart them to perform a rolling update.
Cluster Linking におけるリスナーについて¶
フォワード接続では、どのリスナーが接続を受け付けるかはターゲットサーバーが把握しており、ターゲットサーバーによって、そのリスナーと接続とが関連付けられます。その接続にメタデータリクエストが到着すると、サーバーは、リスナーに対応するメタデータを返します。
たとえば Confluent Cloud で、外部リスナーのクライアントが topicA
のリーダーを要求すると必ずリーダーの外部エンドポイントが返されます。クライアントに内部エンドポイントが返されることはありません。接続に基づいてリスナー名が認識されるためです。
リバース接続では、ターゲットサーバー(つまり、送信元クラスター)によって接続が確立されます。接続がリバースされた場合、そのリバース接続をどのリスナーに関連付けるか、たとえば、リーダーリクエストで送信先に返すべきエンドポイントをこのターゲットサーバーが把握している必要があります。
デフォルトでは、リンクが作成された送信元クラスターに基づいてリスナーが関連付けられます。使用される外部リスナーは通常 1 つであるため、ほとんどの場合、これで事足ります。Confluent Cloud では、このデフォルトが使用され、ユーザーがそれをオーバーライドすることはできません。
セルフマネージド型 Confluent Platform では、デフォルトのリスナー/接続の関連付けをユーザーが必要に応じてオーバーライドできます。こうして、内部リスナーに送信元リンクを作成しながらも、リバース接続に外部リスナーを関連付けるという柔軟性が実現されています。
local.listener.name
構成は、送信元クラスターのリスナー名を表します。デフォルトでは、送信元リンクの作成に使用されたリスナーになります。別のリスナーを使用する必要がある場合は、明示的にそれを構成する必要があります。Confluent Cloud が送信元である場合は、これが外部リスナー(デフォルト)となり、オーバーライドすることはできません。
送信先については、リスナーは、bootstrap.servers によって決定され、オーバーライドすることはできません。