Replicator の概要

Replicator

Confluent Replicator を使用すると、1 つの Kafka クラスターから別のクラスターにトピックを信頼性の高い方法で簡単にレプリケートできます。メッセージのコピーに加えて、Replicator は必要に応じてトピックを作成し、送信元クラスターにトピック構成を保持します。これには、パーティションの数、レプリケーション係数、および個々のトピックに対して指定された構成のオーバーライドが含まれます。Replicator は、コネクターとして実装されます。

機能

Replicator では、以下の機能をサポートします。

  • ホワイトリスト、ブラックリスト、および正規表現を使用したトピックの選択。
  • 一致するパーティション数、レプリケーション係数、およびトピック構成のオーバーライドによる送信先クラスターの動的なトピックの作成。
  • 新しいパーティションが送信元クラスターに追加された場合のトピックの自動サイズ変更。
  • トピック構成が送信元クラスターで変更された場合のトピックの自動再構成。
  • タイムスタンプの保持重複または循環メッセージ反復を防ぐための来歴ヘッダーの使用、および コンシューマーオフセット変換 (Confluent Platform 5.0.1 以降でサポートされている)。
  • 既存のデータセンターで MirrorMaker から Replicator に移行 できます(Confluent Platform 5.0.0 以降)。MirrorMaker から Replicator への移行は、以前のバージョンの Confluent Platform ではサポートされていません(5.5.0 より前)。
  • 少なくとも 1 回のデリバリー、つまり Replicator コネクターによって、レコードが Kafka のトピックに少なくとも 1 回は配信されることが保証されます。コネクターを再起動した場合、Kafka のトピックに重複レコードが存在している可能性があります。

マルチデータセンターユースケース

Replicator は、複数のクラスター間および複数のデータセンターにデプロイできます。マルチデータセンターデプロイにより、以下のようなユースケースが可能になります。

  • アクティブ/アクティブの地理的位置指定デプロイ: ユーザーは近くのデータセンターにアクセスして、低いレイテンシと高いパフォーマンスを実現するためにアーキテクチャを最適化することができます
  • アクティブ/パッシブの災害リカバリ(DR)デプロイ : データセンターで全体的または部分的な障害が発生した場合でも、アプリケーションをフェイルオーバーして Confluent Platform を別のデータセンターで使用できます
  • 一元的分析 : 複数の Apache Kafka® クラスターからのデータを 1 つの場所に集約して、組織全体の分析を行います
  • クラウド移行 : Kafka を使用して、オンプレミスのアプリケーションとクラウドのデプロイ間でデータを同期します

Kafka トピック内のイベントを 1 つのクラスターから別のクラスターにレプリケートする機能は、Confluent のマルチデータセンターアーキテクチャの基盤です。

レプリケーションは、Confluent Replicator またはオープンソースの Kafka MirrorMaker を使用して実行できます。Replicator は、トピックデータのレプリケーションに加えて、Schema Registry 内の スキーマの移行 にも使用できます。

このドキュメントでは、Replicator について重点的に説明します。具体的には、 アーキテクチャクイックスタートチュートリアル、Replicator を異なるコンテキストで 構成して実行する 方法、 チューニングとモニタリングクラスター間フェイルオーバー などについて取り上げます。さらに、 MirrorMaker から Replicator に移行する 方法に関するセクションもあります。

デプロイ戦略に関する一般的な考え方の中には MirrorMaker にも適用できるものがありますが、主な関心が MirrorMaker にある場合は、Kafka のドキュメントで「Mirroring data between clusters」を参照してください。

アーキテクチャ

以下の図に、Replicator のアーキテクチャを示します。Replicator は、Kafka Connect API とワーカーを使用して、高可用性、負荷分散、および一元管理を実現します。

../../_images/replicator_components.ja.png

Replicator のアーキテクチャ

ちなみに

Replicator は送信先クラスターまたは送信元クラスターの近くにデプロイすることができ、いずれの方法でも正常に動作します。ただし、ベストプラクティスとして、送信先クラスターの近くに Replicator をデプロイすることをお勧めします。これにより、ネットワーク上での信頼性とパフォーマンスが高まります。したがって、送信先クラスターが Confluent Cloud の場合、Confluent Cloud クラスターと同じリージョン内のインスタンスに Replicator をデプロイすることをお勧めします。ただし、送信元クラスターで外部接続が許可されていない場合は、送信元クラスターに Replicator をデプロイできます(「Confluent Cloud クラスター間のトピックの移行」を参照してください)。

デプロイの例

一般的なマルチデータセンターデプロイでは、別個のデータセンターに配置された、地理的に分散している 2 つの Kafka クラスターのデータは、別のデータセンターに配置された別のクラスターへと集約されます。コピーされるデータのオリジンは "送信元" クラスターと呼ばれ、コピーされるデータのターゲットは "送信先" と呼ばれます。

各送信元クラスターでは、別個の Replicator インスタンスが必要です。便宜上、集約データセンターに配置された同じ Connect クラスター内でそれらを実行できます。

../../_images/replicator.ja.png

集約クラスターへのレプリケーション

作業開始のためのガイドライン

Replicator を使用したマルチデータセンターデプロイを構成するには、以下のガイドラインに従います。

  1. Replicator クイックスタート を使用して 2 つの Kafka クラスター間のレプリケーションをセットアップします。
  2. マルチデータセンター環境で Replicator と他の Confluent Platform コンポーネントを インストールして構成する 方法について学習します。
  3. 本稼働環境で Replicator を実行する前に、モニタリングおよびチューニングガイド を参照します。
  4. 災害シナリオの発生時の回復性を高めるように複数の Kafka クラスターを設計および構成するための実用的なガイドについては、ディザスターリカバリ に関するホワイトペーパーを参照してください。このホワイトペーパーでは、フェイルオーバーとフェイルバックを行い、最終的にリカバリを成功させるための計画について説明しています。

デモとサンプル

Replicator クイックスタート を完了したら、以下に示すマルチデータセンターデプロイでの Replicator の実践的な例を確認してください。これらのデモは、GitHub からダウンロードして実行することができます。以下の図を参照して、必要なデプロイシナリオに対応する Replicator の例を判断してください。

../../_images/replicator-demos.png
  1. Kafka オンプレミスから Kafka オンプレミスへ
    • Docker での Replicator デモ: データセンター間でデータの双方向コピーを行う Replicator の 2 つのインスタンスを含む、アクティブ/アクティブのマルチデータセンター設計の完全に自動化された例
    • スキーマ変換: Replicator を使用して、Schema Registry に格納されているスキーマを 1 つのクラスターから別のクラスターに移動する例
    • Confluent Platform デモ: Kafka ストリーミング ETL を Replicator とともにデプロイしてデータをレプリケートする例
  2. Kafka オンプレミスから Confluent Cloud へ
  3. GKE の Kafka から Confluent Cloud へ
  4. Confluent Cloud から Confluent Cloud へ

トピック名の変更

デフォルトでは、Replicator は送信元クラスターと送信先クラスターの両方で同じトピック名を使用するように構成されています。これが適切に機能するのは、単一のクラスターからレプリケートする場合のみです。複数のクラスターから単一の送信先にデータをコピーする(つまり集約ユースケースの)場合、送信元クラスターのトピック間に構成の違いがある場合に備えて、送信元クラスターごとに別個のトピックを使用する必要があります。

同じ Kafka クラスターを送信元と送信先として使用できるのは、レプリケートされるトピック名が異なっている場合に限ります。同じクラスター内では一般に Kafka の組み込みのレプリケーションを使用するため、これは推奨されるパターンではありませんが、役に立つ場合もあります(テストなど)。

Confluent Platform 5.0 以降、Replicator は 来歴ヘッダー を使用して循環レプリケーションを防止します。これにより、2 つの Replicator インスタンスが実行され、1 つは DC1 から DC2 に、もう 1 つは DC2 から DC1 にレプリケートするように構成されている場合、Replicator は DC2 にレプリケートされたメッセージが DC1 に(またはその逆に)レプリケートされないようにすることができます。その結果、Replicator は各方向で安全に実行されます。

Replicator を使用すると、異なるデータセンター内のアプリケーションが同名のトピックにアクセスできますが、クライアントアプリケーションを設計する際は、いくつかの要因を考慮したトピック命名方法を採用する必要があります。

複数のデータセンターにわたって同じトピック名を使用する場合、この構成では以下のことに注意してください。

  • プロデューサーはリモートクラスターからのコミットの確認応答を待機せず、Replicator はローカルでコミットされたデータをデータセンター間で非同期でコピーします。
  • 各データセンターのプロデューサーが同名のトピックに書き込む場合、"グローバルな順序付け" は存在しません。そのため、異なるデータセンターのプロデューサーから生成されたデータについては、メッセージの順序が保証されません。
  • 安定した状態で、同じ名前のトピックから読み取る、同じグループ ID を持つコンシューマーグループが各データセンターに存在する場合、それらは各データセンターで同じメッセージを再処理します。

各データセンターで同じトピック名を使用しない方がよい場合もあります。たとえば、以下のような場合です。

  • Replicator が 5.0.1 より前のバージョンを実行している。
  • Kafka ブローカーが、メッセージヘッダーをまだサポートしていない Kafka 0.11 より前のバージョンを実行している。
  • Kafka ブローカーは Kafka バージョン 0.11 以降を実行しているが、ヘッダーを使用するための最小要件である log.message.format.version=2.0 を満たしていない。
  • クライアントアプリケーションが複数のデータセンターにまたがって同名のトピックを処理するように設計されていない。

これらの場合、ディザスターリカバリ に関するホワイトペーパーの付録「Topic Naming Strategies to Prevent Cyclic Repetition」を参照してください。

メタデータの定期アップデート

Replicator は、送信元クラスター内のトピックを定期的にチェックして、レプリケートする必要がある新しいトピックが存在するかどうか、および構成変更(パーティション数の増加など)が存在するかどうかを確認します。このチェックの頻度は、コネクター構成の metadata.max.age.ms 設定で制御します。デフォルトは 2 分に設定されています。これは、コネクターが送信元クラスターに不要な負荷を追加しないようにすると同時に、構成変更に対して妥当なタイミングで応答できるようにするためです。この設定値を小さくすると変更をより迅速に検知できますが、トピックの作成と再構成は(一般的に)比較的まれであるため、これは推奨されません。

セキュリティと ACL の構成

ACL の概要

重要

TLS に対応した ZooKeeper クォーラムは、Replicator ではサポートされていません。TLS 対応の ZooKeeper を使用して Replicator を実行するには、Replicator の ZooKeeper 関連の接続詳細をすべて削除します。

Replicator は、送信元クラスターと送信先クラスターの両方で SSL を介したセキュア Kafka による通信をサポートしています。Replicator は、SSL または SASL を使用した認証もサポートしています。送信元クラスターと送信先クラスターで異なるセキュリティ構成を使用できます。

ここで説明するすべてのプロパティは付加的です(つまり SSL 暗号化と SASL 平文認証プロパティの両方を適用できます)。ただし、security.protocol は例外です。以下の表で、このプロパティの有効な値を確認してください。

暗号化 認証 security.protocol
SSL なし SSL
SSL SSL SSL
SSL SASL SASL_SSL
プレーンテキスト SASL SASL_PLAINTEXT

送信元および送信先 Kafka への Replicator 接続は、以下を使用して構成できます。

ZooKeeper を構成するには、起動時にその JAAS ファイルの名前を JVM/Java 仮想マシンパラメーターとして渡します。

export KAFKA_OPTS="-Djava.security.auth.login.config=etc/kafka/zookeeper_jaas.conf"
bin/zookeeper-server-start etc/kafka/zookeeper.properties

重要

送信元および送信先 ZooKeeper は、同じ認証情報で保護する必要があります。

送信元クラスターでセキュリティを構成するには、「ソース Kafka: セキュリティ」のコネクター構成を参照してください。送信先クラスターでセキュリティを構成するには、「送信先 Kafka: セキュリティ」コネクター構成と、こちら の Connect ワーカーの構成を参照してください。

参考

Replicator の必須のセキュリティ構成パラメーターをまとめて確認するには、GitHub の confluentinc/examples の docker-compose 環境を試してください。

SASL または SSL 認証を使用し、ACL が送信元、送信先、またはその両方で有効にされている場合、以降の各セクションで説明されている ACL が Replicator に必要となります。

ACL は、Confluent Platform (Confluent Platform CLI コマンドリファレンス)と /cloud/current/Confluent Cloud)の両方に用意されています。

ACL の構成の詳細については、「ACL を使用した認可」を参照してください。

プリンシパルユーザー(Confluent Platform)とサービスアカウント(Confluent Cloud)

ここでは、Confluent Platform と Confluent Cloud の両方を対象に、ACL を構成するためのコマンドを取り上げます。

Confluent Platform で、サービスプリンシパル に ACL を関連付けます。

Confluent Cloud で、Confluent Cloud サービスアカウント に ACL を関連付けます。

Confluent Cloud のサービスアカウントを作成するには、次のコマンドを実行します。

ccloud service-account create <service-account-name> --description "<descripton of the service account"

その例を次に示します。

ccloud service-account create my-first-cluster-test-acls --description "test ACLs on Cloud"

+-------------+----------------------------+
| Id          |                     123456 |
| Resource ID | ab-123abc                  |
| Name        | my-first-cluster-test-acls |
| Description | test ACLs on Cloud         |
+-------------+----------------------------+

このサービスアカウント ID は、以降のコマンドで ACL を作成する際に使用します。

ちなみに

サービスアカウントの ID は、ccloud service-account list でそれらを一覧表示することによって取得できます。

ライセンス管理のための ACL

ライセンスの管理には、次の ACL が必要になります。

クラスター リソース 操作
送信先(または confluent.topic.bootstrap.servers が構成されている他のクラスター) トピック: _confluent-command All

Confluent Platform で上記の ACL を構成するためのコマンドを次に示します。

kafka-acls --bootstrap-server localhost:9092 --command-config adminclient-configs.conf --add --allow-principal User:<destination principal> --operation ALL --topic _confluent-command

送信元クラスターからの読み取りを実行するための ACL

送信元クラスターからの読み取りには、次の ACL が必要になります。

クラスター リソース 操作
送信元 CLUSTER Describe  
送信元 トピック: Replicator で Describe がレプリケートされるすべてのトピック  
送信元 トピック: Replicator で Read がレプリケートされるすべてのトピック  
送信元 グループ: コンシューマーグループ名は、Replicator 名または src.consumer.group.id プロパティによって決定されます 読み取り

Confluent Platform で上記の ACL を構成するためのコマンドを次に示します。

kafka-acls --bootstrap-server localhost:9092 --command-config adminclient-configs.conf --add --allow-principal User:<source principal> --operation DESCRIBE --cluster
kafka-acls --bootstrap-server localhost:9092 --command-config adminclient-configs.conf --add --allow-principal User:<source principal> --operation DESCRIBE --topic <source topic>
kafka-acls --bootstrap-server localhost:9092 --command-config adminclient-configs.conf --add --allow-principal User:<source principal> --operation READ --topic <source topic>

送信先クラスターへの書き込みを実行するための ACL

送信先クラスターへの書き込みには、次の ACL が必要になります。

クラスター リソース 操作
送信先 クラスター Describe
送信先 トピック - Replicator がレプリケートするすべてのトピック Describe
送信先 トピック - Replicator がレプリケートするすべてのトピック 書き込み
送信先 トピック - Replicator がレプリケートするすべてのトピック 読み取り

Confluent Platform で上記の ACL を構成するためのコマンドを次に示します。

kafka-acls --bootstrap-server localhost:9092 --command-config adminclient-configs.conf --add --allow-principal User:<destination principal> --operation DESCRIBE --cluster
kafka-acls --bootstrap-server localhost:9092 --command-config adminclient-configs.conf --add --allow-principal User:<destination principal> --operation DESCRIBE --topic <destination topic>
kafka-acls --bootstrap-server localhost:9092 --command-config adminclient-configs.conf --add --allow-principal User:<destination principal> --operation WRITE --topic <destination topic>
kafka-acls --bootstrap-server localhost:9092 --command-config adminclient-configs.conf --add --allow-principal User:<destination principal> --operation READ --topic <destination topic>

トピック作成と構成同期のための ACL

Replicator のトピック作成と構成同期の機能を使用する場合(デフォルトで有効にされています)、次の ACL が必要になります。

クラスター リソース 操作
送信元 トピック - Replicator がレプリケートするすべてのトピック DescribeConfigs
送信先 トピック - Replicator がレプリケートするすべてのトピック Create
送信先 トピック - Replicator がレプリケートするすべてのトピック DescribeConfigs
送信先 トピック - Replicator がレプリケートするすべてのトピック AlterConfigs

トピックの作成と構成の同期に関する構成オプションについては、「送信先トピック」を参照してください。

Confluent Platform で上記の ACL を構成するためのコマンドを次に示します。

kafka-acls --bootstrap-server localhost:9092 --command-config adminclient-configs.conf --add --allow-principal User:<source principal> --operation DESCRIBECONFIGS --topic <source topic>
kafka-acls --bootstrap-server localhost:9092 --command-config adminclient-configs.conf --add --allow-principal User:<destination principal> --operation DESCRIBECONFIGS --topic <destination topic>
kafka-acls --bootstrap-server localhost:9092 --command-config adminclient-configs.conf --add --allow-principal User:<destination principal> --operation CREATE --cluster
kafka-acls --bootstrap-server localhost:9092 --command-config adminclient-configs.conf --add --allow-principal User:<destination principal> --operation ALTERCONFIGS --cluster

オフセット変換のための ACL

Replicator のオフセット変換機能を使用する場合(デフォルトで有効にされています)、次の ACL が必要になります。

クラスター リソース 操作
送信元 トピック: __consumer_timestamps All
送信先 グループ: 変換されるすべてのコンシューマーグループ All

オフセット変換に関する構成オプションについては、「consumer_offset_translation」を参照してください。

Confluent Platform で上記の ACL を構成するためのコマンドを次に示します。

kafka-acls --bootstrap-server localhost:9092 --command-config adminclient-configs.conf --add --allow-principal User:<source principal> --operation ALL --topic {_consumer_timestamps}
kafka-acls --bootstrap-server localhost:9092 --command-config adminclient-configs.conf --add --allow-principal User:<source principal> --operation ALL --topic {<group name>}

タイムスタンプインターセプターのための ACL

Replicator タイムスタンプインターセプターを備えたクライアントにも、以下の ACL を付与する必要があります。

クラスター リソース 操作
送信元 トピック: __consumer_timestamps 書き込み
送信元 トピック: __consumer_timestamps Describe

Confluent Platform でこれらを構成するためのコマンドを次に示します。

kafka-acls --bootstrap-server localhost:9092 --command-config adminclient-configs.conf --add --allow-principal User:<source principal> --operation WRITE --topic __consumer_timestamps
kafka-acls --bootstrap-server localhost:9092 --command-config adminclient-configs.conf --add --allow-principal User:<source principal> --operation DESCRIBE --topic __consumer_timestamps

送信元オフセット管理のための ACL

Replicator の送信元オフセット管理機能を使用する場合(デフォルトで有効にされています)、次の ACL が必要になります。

クラスター リソース 操作
送信元 グループ: コンシューマーグループ名は、Replicator 名または src.consumer.group.id プロパティによって決定されます All

オフセット管理に関連した構成オプションについては、Replicator 構成リファレンスの「オフセット管理」を参照してください。

Confluent Platform で上記の ACL を構成するためのコマンドを次に示します。

kafka-acls --bootstrap-server localhost:9092 --command-config adminclient-configs.conf --add --allow-principal User:<source principal> --operation ALL --group <group name>

RBAC を使用した Replicator

RBAC を使用する場合は、「Kafka クライアントの構成」で説明されているように、Replicator クライアントでトークン認証を使用する必要があります。これらの構成のには、通常の Replicator プレフィックスである src.kafka.dest.kafka. を付ける必要があります。RBAC を有効にした送信元および送信先クラスターの構成の例を以下に示します。

src.kafka.sasl.jaas.config=org.apache.kafka.common.security.oauthbearer.OAuthBearerLoginModule required \
  username="sourceUser \
  password="xxx" \
  metadataServerUrls="http://sourceHost:8090";
src.kafka.security.protocol=SASL_PLAINTEXT
src.kafka.sasl.mechanism=OAUTHBEARER
src.kafka.sasl.login.callback.handler.class=io.confluent.kafka.clients.plugins.auth.token.TokenUserLoginCallbackHandler

dest.kafka.sasl.jaas.config=org.apache.kafka.common.security.oauthbearer.OAuthBearerLoginModule required \
  username="destUser \
  password="xxx" \
  metadataServerUrls="http://destHost:8090";
dest.kafka.security.protocol=SASL_PLAINTEXT
dest.kafka.sasl.mechanism=OAUTHBEARER
dest.kafka.sasl.login.callback.handler.class=io.confluent.kafka.clients.plugins.auth.token.TokenUserLoginCallbackHandler

Replicator の実行可能ファイルに対しては、プレフィックスを付けず、--consumer.config--producer.config で指定されるファイル内に記述する必要があります。例を以下に示します。

# in --consumer.config
sasl.jaas.config=org.apache.kafka.common.security.oauthbearer.OAuthBearerLoginModule required \
  username="sourceUser \
  password="xxx" \
  metadataServerUrls="http://sourceHost:8090";
security.protocol=SASL_PLAINTEXT
sasl.mechanism=OAUTHBEARER
sasl.login.callback.handler.class=io.confluent.kafka.clients.plugins.auth.token.TokenUserLoginCallbackHandler

# in --producer.config
sasl.jaas.config=org.apache.kafka.common.security.oauthbearer.OAuthBearerLoginModule required \
  username="destUser \
  password="xxx" \
  metadataServerUrls="http://destHost:8090";
security.protocol=SASL_PLAINTEXT
sasl.mechanism=OAUTHBEARER
sasl.login.callback.handler.class=io.confluent.kafka.clients.plugins.auth.token.TokenUserLoginCallbackHandler

上記の構成では、Metadata Service(MDS)が実行されている必要があります。この詳細については、「Metadata Service (MDS) の構成」を参照してください。

重要

以降のロールバインディングは、次の前提に基づいています。

  • 2 つの "独立" した RBAC デプロイがあること(DC ごとに 1 つ)
  • 各 RBAC デプロイで、そのデプロイにローカルな他のすべてのサービスがサポートされること(Connect、Schema Registry など)
  • RBAC ロールバインディングがレプリケートされないこと(Replicator はこれをサポートしていません)

Replicator では、以下に示すロールバインディングが必要です。

バッキング Connect クラスター用

クラスター リソース ロール
送信先 グループ: group.id プロパティで指定される名前 ResourceOwner
送信先 トピック: offset.storage.topic プロパティで指定される名前(デフォルトは connect-offsets) ResourceOwner
送信先 トピック: config.storage.topic プロパティで指定される名前(デフォルトは connect-configs) ResourceOwner
送信先 トピック: status.storage.topic プロパティで指定される名前(デフォルトは connect-status) ResourceOwner

ライセンス管理用

クラスター リソース ロール
送信先(または confluent.topic.bootstrap.servers が構成されている他のクラスター) トピック: _confluent-command DeveloperRead
送信先(または confluent.topic.bootstrap.servers が構成されている他のクラスター) トピック: _confluent-command DeveloperWrite
送信先(または confluent.topic.bootstrap.servers が構成されている他のクラスター) トピック: _confluent-command DeveloperManage

送信元クラスターからの読み込み用

クラスター リソース ロール
送信元 トピック: Replicator でレプリケートされるすべてのトピック DeveloperRead
送信元 トピック: Replicator でレプリケートされるすべてのトピック DeveloperManage

送信先クラスターへの書き込み用

クラスター リソース ロール
送信先 トピック: Replicator でレプリケートされるすべてのトピック ResourceOwner

重要

Replicator のトピック構成同期機能(デフォルトで有効)を使用しない場合は、ResourceOwner の代わりに次のロールを使用できます。

クラスター リソース ロール
送信先 トピック: Replicator でレプリケートされるすべてのトピック DeveloperRead
送信先 トピック: Replicator でレプリケートされるすべてのトピック DeveloperWrite
送信先 トピック: Replicator でレプリケートされるすべてのトピック DeveloperManage

Replicator のオフセット変換機能を使用する場合(デフォルトで有効にされています)

クラスター リソース ロール
送信元 トピック: __consumer_timestamps DeveloperRead
送信元 トピック: __consumer_timestamps DeveloperManage
送信先 グループ: 変換されるすべてのコンシューマーグループ(対象がわからない場合は、リテラル * を使用してすべてを許可) DeveloperRead

また、Replicator タイムスタンプインターセプターを使用する送信元クラスターのすべてのコンシューマーには以下が必要です。

クラスター リソース ロール
送信元 トピック: __consumer_timestamps DeveloperWrite
送信元 トピック: __consumer_timestamps DeveloperManage

オフセット変換に関する構成オプションについては、「コンシューマーオフセット変換」を参照してください。

Replicator の送信元オフセット管理機能を使用する場合(デフォルトで有効にされています)

クラスター リソース ロール
送信元 グループ: コンシューマーグループ名は、Replicator 名または src.consumer.group.id プロパティによって決定されます ResourceOwner

Replicator のスキーマ移行機能(デフォルトで無効)を使用する場合

クラスター リソース ロール
送信元 トピック: 基盤の Schema Registry トピック(デフォルトは _schemas) DeveloperRead
送信先 クラスター: Schema Registry クラスター ClusterAdmin

RBAC の構成の詳細については、「ロールベースアクセス制御を使用した認可」を参照してください。

スキーマを含むメッセージのレプリケート

重要

このセクションでは、2 つのアクティブな Schema Registry インスタンス間でのスキーマのレプリケーションについて説明します。送信先 Schema Registry インスタンスがアクティブであることが必須ではない場合は、このセクションの代わりに「スキーマの移行(Confluent Cloud とセルフマネージド型)」で説明するプロセスに従ってください。

スキーマが関連付けられているメッセージをレプリケートする場合、メッセージデータだけではなく関連付けられているスキーマもレプリケートする必要があります。Replicator では、Connect コンバーターを使用して、レプリケーション中にレプリケートされるメッセージに関連付けられているスキーマをフェッチし、これを送信先 Schema Registry に登録することにより、スキーマのレプリケーションを実現します。

最初に、Replicator は受信メッセージのスキーマをフェッチする必要があります。このために、src.key.converter および src.value.converter コネクター構成を使用します。これらは、送信元クラスターに関連付けられている Schema Registry インスタンスにアクセスするように構成される必要があります。

src.key.converter=io.confluent.connect.avro.AvroConverter
src.key.converter.schema.registry.url=http://source:8081
src.value.converter=io.confluent.connect.avro.AvroConverter
src.value.converter.schema.registry.url=http://source:8081

次に、スキーマを送信先 Schema Registry に登録する必要があります。このために、key.converter および value.converter コネクター構成を使用します。これらは、送信先クラスターに関連付けられている Schema Registry インスタンスにアクセスするように構成される必要があります。

key.converter=io.confluent.connect.avro.AvroConverter
key.converter.schema.registry.url=http://destination:8081
value.converter=io.confluent.connect.avro.AvroConverter
value.converter.schema.registry.url=http://destination:8081

コンバーターの詳細については、「コンバーター」を参照してください。

重要

このアプローチでは、スキーマは Replicator によって認識された順番に登録され、送信元 Schema Registry に登録された順番に登録されるとは限りません。これにより、互換性の問題が発生する可能性があります。順序付けの違いを許容するために、送信先のサブジェクトのモードを FULL または NONE に設定することをお勧めします。

Schema Validation および Replicator

デフォルトでは、Replicator は topic.config.sync=true として構成されます。送信元クラスターに Confluent Server 上の Schema Validation が有効になっているトピック(confluent.value.schema.validation=true)がある場合、Replicator は、このプロパティを送信先クラスターのレプリケートされたトピックにコピーします。

Replicator を使用してブローカーの 1 つのクラスターから別のクラスターにデータをレプリケートするときは、オーバーヘッドを抑えるために、セカンダリクラスターでの追加の検証を避けることが一般的です。

そのためには、送信先にレプリケートする前に送信元クラスターで Schema Validation を無効にするか、Replicator で topic.config.sync=false を設定し、必要な構成は送信先クラスターのブローカープロパティファイルで明示的に設定するようにします。

要件

大まかに言うと、Replicator はコンシューマーグループと同じように動作し、送信元クラスターからレプリケートされたトピックのパーティションはコネクターのタスク間で分割されます。Replicator は、送信元クラスターを定期的にポーリングして、レプリケートされるトピックの構成の変更とパーティションの数をチェックします。それに応じてトピックを作成するか、構成をアップデートすることで送信先クラスターをアップデートします。これが正しく機能するには、以下のことが必要です。

  • 送信元クラスターと送信先クラスターは Apache Kafka® または Confluent Platform である必要があります。バージョンの互換性については、「コネクターの相互運用性」を参照してください。
  • Replicator のバージョンは、デプロイ先となる Kafka Connect のバージョンと一致している必要があります。たとえば、Replicator 6.2 は Kafka Connect 6.2 にのみデプロイできます。
  • Replicator のプリンシパルには、送信先クラスターでトピックを作成および変更するためのアクセス許可が必要です。バージョン 4.0 以前では、対応する ZooKeeper への書き込みアクセス許可が必要です。それより後のバージョンでは、こちら で説明している ACL が必要です。
  • 送信元クラスターと送信先クラスターのデフォルトトピック構成が一致している必要があります。一般に、ブローカー固有の設定(broker.id など)を除いて、両方のクラスターで同じブローカー構成を使用する必要があります。
  • 送信先 Kafka クラスターには送信元クラスターと同様の容量が存在する必要があります。特に、Replicator はトピックのレプリケーション係数を送信元クラスターに保持するため、少なくとも使用する最大レプリケーション係数と同じ数のブローカーが存在する必要があります。存在しない場合、送信先クラスターに同じレプリケーション係数をサポートする容量が確保されるまでトピックの作成が失敗します。この場合、トピックの作成はコネクターによって自動的に再試行されます。そのため、送信先クラスターに十分な数のブローカーが確保されるとすぐにレプリケーションが開始されます。
  • Replicator プロパティファイル内の dest.kafka.bootstrap.servers 送信先接続設定は、複数の送信元クラスターを使用する場合でも、単一の送信先クラスターを使用するように構成する必要があります。たとえば、このセクションの始めに示した図には、異なるデータセンターに配置されている 2 つの送信元クラスターのターゲットが単一の "集約" 送信先クラスターであることが示されています。集約送信先クラスターには、関連付けられているすべての送信元クラスターの合計と同様の容量が存在する必要があることに注意してください。
  • Confluent Platform バージョン 5.3.0 以降では、Confluent Replicator に Kafka Connect のエンタープライズエディションが必要です。Confluent Platform 5.3.0 以降では、Replicator は Connect のコミュニティエディションをサポートしていません。Connect のエンタープライズエディションは、Confluent Platform オンプレミスバンドルの一部としてインストールできます。詳細については、「本稼働環境」および「Confluent Platform のクイックスタート(ローカルインストール)」を参照してください(セルフマネージド型 Confluent Platform を選択してください)。エンタープライズ Connect のデモは、「Confluent Platform のクイックスタート(Docker)」および Docker Hub の confluentinc/cp-server-connect で公開されています。

ちなみに

パフォーマンスを最大限に高めるために、Replicator をできるだけ送信先クラスターに近い場所で実行して、Replicator 内での Kafka Connect の操作のための接続のレイテンシを小さくしてください。

互換性

データ転送では、Replicator は Java クライアントと同じ互換性マトリックスを維持します。詳細については、「Kafka Java クライアント」を参照してください。ただし、一部の Replicator 機能には以下のような異なる互換性要件があります。

  • スキーマ変換では、送信元クラスターと送信先クラスターの両方で Confluent 5.2.0 以降が実行されている必要があります。
  • オフセット変換では、送信元クラスターと送信先クラスターの両方で Confluent 5.1.0 以降が実行されている必要があります。
  • トピックの自動作成と構成の同期を行うには、送信先クラスターが送信元クラスターよりも新しいバージョンである必要があります。

注釈

新しいバージョンの Replicator を使用して、古いバージョンの Kafka クラスターのデータを Confluent Cloud にレプリケートすることはできません。具体的には、Replicator バージョン 5.4.0 以降を使用して、Apache Kafka® v0.10.2 以前のクラスターや Confluent Platform v3.2.0 以前のクラスターのデータを Confluent Cloud にレプリケートすることはできません。これらの古いバージョンのクラスターを使用している場合は、アップグレードできるようになるまで、Replicator 5.0.x を使用して Confluent Cloud にレプリケートしてください。次の点に留意したうえで、適切にアップグレードを計画してください。

既知の問題

  • バージョン 5.0.4、5.1.4、5.2.3、5.3.1、または 5.4.0 以降で ConsumerTimestampsInterceptor を備えたコンシューマーがある場合、Replicator もこれらのバージョンの 1 つで実行されている必要があります。既知の問題により、Replicator が前述のバージョンより古いバージョンで実行されている場合、SerializationException が発生し、エラー Size of data received by LongDeserializer is not 8 でタスクが失敗する可能性があります。
  • バージョン 5.3.0 以降で Replicator を実行している場合は、connect.protocol=eager を設定してください。既知の問題により、デフォルトの connect.protocol=compatible または connect.protocol=sessioned を使用すると、タスクのバランス調整とレコードの重複の問題が発生する可能性があります。

Replicator コネクター

Replicator is implemented as a Kafka connector, and listed in Supported Connectors. Some general information related to connectors may apply in some cases, but most of the information you'll need to work with Replicator is in this Replicator specific documentation.

重要

このコネクターは、Confluent Platform にネイティブにバンドルされています。Confluent Platform をインストールして実行している場合、インストールに必要な追加の手順はありません。

Confluent Platform using only Confluent Community components を使用している場合、Confluent Hub クライアント を使用してコネクターをインストールする(推奨)か、ZIP ファイルを手動でダウンロードできます。

MirrorMaker

MirrorMaker は、2 つの Kafka クラスター間でデータをコピーするためのスタンドアロンツールです。詳細については、Kafka ドキュメントの「Mirroring data between clusters」を参照してください。

Confluent Replicator は、トピック構成とデータを処理するより包括的なソリューションです。また、Kafka Connect および Confluent Control Center との統合により、可用性、拡張性、および操作性を向上させます。詳細については、クイックスタートの「チュートリアル: クラスターの境界を越えたデータのレプリケーション」を試し、「MirrorMaker から Replicator への移行」を参照してください。

おすすめの関連情報

スタートガイド

リファレンス

移行ガイド

  • Confluent Cloud クラスター間のトピックの移行: Replicator を使用して 1 つのクラウドクラスターから別のクラウドクラスターにトピックデータを移行する方法を説明しています。
  • スキーマの移行: Replicator を使用してセルフマネージド型 Schema Registry を Confluent Cloud に移行する方法を説明しています。