重要
このページの日本語コンテンツは古くなっている可能性があります。最新の英語版コンテンツをご覧になるには、こちらをクリックしてください。
Confluent Cloud クラスターでのトピックの移行¶
Confluent Cloud は現在、複数のクラスタータイプを提供しています。Confluent Cloud クラスター間でトピックデータを移行する場合、次の 2 つのオプションがあります。
- 「Cluster Linking によるデータ移行」の説明に従って Cluster Linking を使用する(推奨)
- 以下のセクションの説明に従って Confluent Replicator を使用する(従来の方法)
このドキュメントでは、Replicator を使用して、最小のアプリケーションダウンタイムで既存の Confluent Cloud 専用クラスターをスタンダードクラスターに移行する方法について説明します。
使用可能なクラスタータイプについて詳しくは、「クラスタータイプごとの Confluent Cloud の機能と制限」を参照してください。
ちなみに
- この手順ではトピックデータのみを扱います。クラスターの他の要素(実行中のコネクター、ACL、Streams アプリケーション、ksqlDB ストリーム、スキーマなど)はこの手順とは別に移行する必要があります。これについては、このワークフローの中でも適宜説明しています。
- テスト目的で、セルフマネージド型の開発クラスターのトピックを Confluent Cloud に移行 することもできます。
注釈
新しいバージョンの Replicator を使用して、古いバージョンの Kafka クラスターのデータを Confluent Cloud にレプリケートすることはできません。具体的に言うと、Replicator バージョン 5.4.0 以降を使用して、Apache Kafka® v0.10.2 以前のクラスターや Confluent Platform v3.2.0 以前のクラスターのデータを Confluent Cloud にレプリケートすることはできません。これらの古いバージョンのクラスターを使用している場合は、アップグレードできるようになるまで、Replicator 5.0.x を使用して Confluent Cloud にレプリケートしてください。次の点に留意したうえで、適切にアップグレードを計画してください。
- Confluent Platform 3.2 以降に含まれている Kafka Connect ワーカーは、Confluent Platform 3.0 以降に含まれている Kafka ブローカーに対応しています(「コンポーネント間の適合性」を参照)。
- Confluent Platform 5.0.x のサポート終了日は 2020 年 7 月 31 日です(「サポートされているバージョンおよび相互運用性」を参照)。
Replicator のデプロイオプション¶
トピックデータを移行するには、3 つのモードのいずれかで Replicator を実行します。これらのモードは機能的には同じですが、どのような状況から開始するかに応じて、特定のモードが他のモードよりも適した選択肢になります。
Replicator のモード | 利点とシナリオ |
---|---|
分散 Connect クラスター内のコネクターとして(VM 上で)動作する | 送信先クラスターに、既に使用している Connect クラスターがある場合には、このモードが理想的です。 |
VM 上に置かれたパッケージ化された実行可能ファイルとして動作する | 使いやすい 3 つの構成ファイル(レプリケーター用、コンシューマー用、プロデューサー用)を分離して、Connect クラスターを明示的に構成しなくて済むようにします。 |
Kubernetes 上に置かれたパッケージ化された実行可能ファイルとして動作する | 上記と似ていますが、分離された 1 つのタスクであるほうが開始しやすい場合があります。既に Kubernetes でタスクを管理している場合は、このモードが理想的です。 |
前提条件タスク¶
トピックの移行を開始する前に、ネットワークとセキュリティの考慮事項について考慮し、トピック以外のメタデータを保存し、クラスター情報を収集してください。これらの前提条件について、以下で説明します。
場所を選択する¶
Confluent Cloud クラスターにネットワーク経由でアクセスできる仮想プライベートクラウド(VPC)に、仮想マシン(VM)または Kubernetes(k8s) インスタンスをデプロイする必要があります。ベストプラクティスとして、Replicator は、送信先クラスターのできる限り近くで実行するようにしてください。つまり、Confluent Cloud の場合は、送信先の Confluent Cloud デプロイ環境と同じリージョンで Replicator クラスターを実行する必要があります。
注釈
Confluent Cloud クラスターのいずれかに VPC ピアリングが構成されている場合は、Confluent Cloud とピアリングされている VPC のいずれかに Replicator をデプロイする必要があります。
クラスター情報を調べる¶
Replicator の構成には、以下の情報が必要です。
送信元クラスターと送信先クラスターの API キーとシークレット。
これらの情報を取得するには、Confluent Cloud Console の Kafka API keys に移動します。
bootstrap.servers
URL。この情報を取得するには、Cloud Console の Cluster Settings に移動し、
bootstrap.servers
の隣の URL をメモします。
切り替える前に、トピック以外のメタデータを保存して新規クラスターをセットアップする¶
Replicator は、ksqlDB ジョブ、スキーマ、ACL、コネクター、およびサービスアカウントを新規クラスターに同期しません。
Replicator をオンにしてトピックデータをレプリケートする前に、このようなトピック以外のメタデータで移行するものをすべてメモし、新規クラスター上に手動で再作成してください。
オフセットを変換するためにコンシューマーにタイムスタンプインターセプターを構成する¶
Java コンシューマーアプリケーションに、タイムスタンプを使用してオフセットを自動的に変換する ConsumerTimestampsInterceptor
を構成します。これによって、コンシューマーが、送信先クラスターで、送信元クラスターで中断した場所からデータを消費できるようになります。
interceptor.classes=io.confluent.connect.replicator.offsets.ConsumerTimestampsInterceptor
重要
timestamp-interceptor
はコンシューマーにのみ設定できます。- Replicator にこのインタセプターを設定しないでください。
- Connect ワーカーでインターセプターを設定する場合は注意が必要です。インターセプターが有効に機能するためには、ワーカーがシンクコネクターを実行していること、また、それらのコネクターがオフセット管理のために Kafka を使用していることが必要です。
このインターセプターは、timestamp-interceptor-current.jar
の kafka-connect-replicator
の下にあります。この JAR をコンシューマーの CLASSPATH
に指定しておく必要があります。
ちなみに
- JAR ファイルの場所は、プラットフォームと Confluent インストール のタイプによって異なります。たとえば、zip.tar を使用した Mac OS インストールの場合、デフォルトでは、
timestamp-interceptor-<version>.jar
は Confluent ディレクトリの/share/java/kafka-connect-replicator
にあります。
timestamp-interceptor
は、以下の Confluent Maven リポジトリにあります。
<repository>
<id>confluent</id>
<url>https://packages.confluent.io/maven/</url>
</repository>
Maven プロジェクトに、以下の依存関係を含めてください。
<dependency>
<groupId>io.confluent</groupId>
<artifactId>timestamp-interceptor</artifactId>
<version>current</version>
</dependency>
ConsumerTimestampsInterceptor
は、オフセット変換のために必要なタイムスタンプ情報を保持するために、新規トピック __consumer_timestamps
に書き込みを行います。Replicator は、この新規トピックから読み取ったタイムスタンプを使用して、セカンダリクラスターで正確なオフセットを判別します。
重要
ConsumerTimestampsInterceptor
は、送信元クラスターの __consumer_timestamps
トピックに対するプロデューサーであるため、適切なセキュリティ構成を必要とします。これらの構成は、timestamps.producer.
プレフィックスを使用して指定します。たとえば、timestamps.producer.security.protocol=SSL
などと指定します。セキュリティ構成の詳細については、以下を参照してください。
インターセプターには、__consumer_timestamps
トピックについての ACL も必要になります。__consumer_timestamps
トピックに対する WRITE と DESCRIBE の操作がコンシューマーのプリンシパルに必要になります。
詳細については、以下を参照してください。
- コンシューマーのオフセット変換について
- ホワイトペーパー『Disaster Recovery for Multi-Datacenter Apache Kafka Deployments』のコンシューマーのオフセットとタイムスタンプの保持に関する説明。
ACL を構成する¶
Replicator に対して、送信元クラスターの Kafka データを読み取り、送信先 Confluent Cloud クラスターに Kafka データを書き込むことを認可する必要があります。Replicator は、スーパーユーザーの認証情報ではなく、Confluent Cloud サービスアカウントを使用して実行する必要があります。そのため、Confluent CLI を使用して、Confluent Cloud で Replicator に対応するサービスアカウント ID に対して適切な ACL を構成する必要があります。Replicator ACL の詳細については、「セキュリティ」を参照してください。
confluent kafka acl create --allow --service-account <service-account-id> --operation CREATE --topic <replicated-topic>
confluent kafka acl create --allow --service-account <service-account-id> --operation WRITE --topic <replicated-topic>
confluent kafka acl create --allow --service-account <service-account-id> --operation READ --topic <replicated-topic>
confluent kafka acl create --allow --service-account <service-account-id> --operation DESCRIBE --topic <replicated-topic>
confluent kafka acl create --allow --service-account <service-account-id> --operation DESCRIBE-CONFIGS --topic <replicated-topic>
confluent kafka acl create --allow --service-account <service-account-id> --operation ALTER-CONFIGS --topic <replicated-topic>
confluent kafka acl create --allow --service-account <service-account-id> --operation DESCRIBE --cluster-scope
また、以下のライセンストピックの ACL も設定する必要があります。
confluent kafka acl create --allow --service-account <service-account-id> --operation READ --topic _confluent-command
confluent kafka acl create --allow --service-account <service-account-id> --operation WRITE --topic _confluent-command
confluent kafka acl create --allow --service-account <service-account-id> --operation DESCRIBE --topic _confluent-command
ライセンストピックは本来デフォルトで作成されますが、なんらかの理由でこのトピックがまだ存在しない場合は、DESCRIBE に加え、CREATE の ACL をクラスターに設定する必要があります。DESCRIBE は上記で既に設定していますが、目的を明確にするため、ここで改めて言及しています。これらの ACL を --cluster-scope
で設定します。このフラグに引数はありません。これにより、使用されているクラスターに ACL が設定されます。(使用されているクラスターを確認するには、confluent kafka cluster list
と入力します。アスタリスクが表示されているクラスターが使用されています。別のクラスターを使用するには、confluent kafka cluster use <cluster-id>
と入力します。)
confluent kafka acl create --allow --service-account <service-account-id> --operation CREATE --cluster-scope
confluent kafka acl create --allow --service-account <service-account-id> --operation DESCRIBE --cluster-scope
ちなみに
- Replicator を実行可能ファイルとして実行する場合は、Replicator は内部の Connect ワーカーで実行されるので、「ワーカーの ACL 要件」で説明している ACL が必要になります。
- Replicator をコネクターとして実行する場合は、特に指定がない限り、Replicator の構成は Connect クラスターの構成を継承します(上記と同じ ACL が必要になります)。
Replicator を構成および実行してトピックを移行する¶
すべての前提条件タスクが完了したら、Replicator を構成および実行して、トピックを移行します。
使用する Replicator モード に応じて、以下のいずれかの方法を選択してください。
ちなみに
下記のデプロイ方法のどれを使用するかにかかわらず、こちらの ガイドを使用して、Replicator をさらにチューニングすることができます。
Replicator を実行可能ファイルとして実行する¶
ちなみに
- これは、VM ベースの手法と Kubernetes ベースの手法の両方に適用されます。
- Replicator の実行可能ファイルは、ネットワークに接続されている任意の仮想ノード、クラウドノード、または物理マシン(デスクトップまたはノート PC)で実行できます。
プロパティの構成¶
実行可能ファイルには、3 つの構成ファイル(コンシューマー、プロデューサー、レプリケーション)があります。これらのファイルに関する最小限の構成変更を以下に示します。
consumer.properties
ssl.endpoint.identification.algorithm=https sasl.mechanism=PLAIN bootstrap.servers=<source bootstrap-server> retry.backoff.ms=500 sasl.jaas.config=org.apache.kafka.common.security.plain.PlainLoginModule required username="<api-key>" password="<secret>"; security.protocol=SASL_SSL
producer.properties
ssl.endpoint.identification.algorithm=https sasl.mechanism=PLAIN bootstrap.servers=<destination bootstrap-server> sasl.jaas.config=org.apache.kafka.common.security.plain.PlainLoginModule required username="<api-key>" password="<secret>"; security.protocol=SASL_SSL
replication.properties
-replication.properties
では特別な構成は必要ありません。
Replicator を実行する¶
Replicator の実行可能ファイルを実行して、トピックを移行します。
./bin/replicator --cluster.id replicator --consumer.config consumer.properties --producer.config producer.properties --topic.regex '.*'
ちなみに
Replicator に関するドキュメント の「Replicator を実行可能ファイルとして実行する」および「Replicator 実行可能ファイルのコマンドラインパラメーター」も参照してください。
Replicator をコネクターとして実行する¶
コネクターの JSON は以下のようになります。
{
"name": "replicate-topic",
"config": {
"connector.class": "io.confluent.connect.replicator.ReplicatorSourceConnector",
"key.converter": "io.confluent.connect.replicator.util.ByteArrayConverter",
"value.converter": "io.confluent.connect.replicator.util.ByteArrayConverter",
"src.kafka.ssl.endpoint.identification.algorithm":"https",
"src.kafka.sasl.mechanism":"PLAIN",
"src.kafka.request.timeout.ms":"20000",
"src.kafka.bootstrap.servers":"<source bootstrap server>",
"src.kafka.retry.backoff.ms":"500",
"src.kafka.sasl.jaas.config":"org.apache.kafka.common.security.plain.PlainLoginModule required username=\"<api-key>\" password=\"<secret>\";",
"src.kafka.security.protocol":"SASL_SSL",
"dest.kafka.ssl.endpoint.identification.algorithm":"https",
"dest.kafka.sasl.mechanism":"PLAIN",
"dest.kafka.request.timeout.ms":"20000",
"dest.kafka.bootstrap.servers":"<destination bootstrap server>",
"dest.kafka.retry.backoff.ms":"500",
"dest.kafka.sasl.jaas.config":"org.apache.kafka.common.security.plain.PlainLoginModule required username=\"<api-key>\" password=\"<secret>\";",
"dest.kafka.security.protocol":"SASL_SSL",
"dest.topic.replication.factor":"3",
"topic.regex":".*"
}
}
まだ構成していない場合は、分散 Connect クラスターを以下に示すように適切に構成してください。
ssl.endpoint.identification.algorithm=https
sasl.mechanism=PLAIN
bootstrap.servers=<dest bootstrap server>
sasl.jaas.config=org.apache.kafka.common.security.plain.PlainLoginModule required username="<api-key>" password="<secret>";
security.protocol=SASL_SSL
producer.ssl.endpoint.identification.algorithm=https
producer.sasl.mechanism=PLAIN
producer.bootstrap.servers=<dest bootstrap server>
producer.sasl.jaas.config=org.apache.kafka.common.security.plain.PlainLoginModule required username="<api-key>" password="<secret>";
producer.security.protocol=SASL_SSL
key.converter=org.apache.kafka.connect.storage.StringConverter
value.converter=org.apache.kafka.connect.json.JsonConverter
group.id=connect-replicator
config.storage.topic=connect-configs1
offset.storage.topic=connect-offsets1
status.storage.topic=connect-statuses1
plugin.path=<confluent install dir>/share/java
ちなみに
Replicator に関するドキュメント の「Replicator をコネクターとして実行する」も参照してください。
Kubernetes で Replicator の Docker コンテナーを実行する¶
既存のシークレットを削除します(存在する場合)。
kubectl delete secret replicator-secret-props
変更されている場合は、構成を再生成します。(「クイックスタート」を参照してください。)
新しいシークレットをアップロードします。
kubectl create secret generic replicator-secret-props --from-file=/tmp/replicator/
ポッドを再ロードします。
kubectl apply -f container/replicator-deployment.yaml
replicator-deployment.yaml
の例を以下に示します。apiVersion: extensions/v1beta1 kind: Deployment metadata: name: repl-exec-connect-cluster spec: replicas: 1 template: metadata: labels: app: replicator-app spec: containers: - name: confluent-replicator image: confluentinc/cp-enterprise-replicator-executable env: - name: CLUSTER_ID value: "replicator-k8s" - name: CLUSTER_THREADS value: "1" - name: CONNECT_GROUP_ID value: "containerized-repl" # Note: This is to avoid _overlay errors_ . You could use /etc/replicator/ here instead. - name: REPLICATION_CONFIG value: "/etc/replicator-config/replication.properties" - name: PRODUCER_CONFIG value: "/etc/replicator-config/producer.properties" - name: CONSUMER_CONFIG value: "/etc/replicator-config/consumer.properties" volumeMounts: - name: replicator-properties mountPath: /etc/replicator-config/ volumes: - name: replicator-properties secret: secretName: "replicator-secret-props" defaultMode: 0666
ステータスを確認します。
kubectl get pods kubectl logs <pod-name> -f
ちなみに
Google Kubernetes Engine(GKE)にデプロイされた Confluent Platform でデータを Confluent Cloud クラスターにレプリケートする サンプルデモ も参照してください。
クライアントを新規クラスターに切り替える¶
Replicator がセットアップされたら、次のようにしてクライアントを新規クラスターに切り替えます。
Confluent Cloud Console の Replication タブでレプリケーションラグを表示して、送信元から送信先にデータがレプリケートされていることを確認します。
プロデューサーを停止します。
コンシューマーラグがゼロになるまで待ちます。
これをモニタリングするには、こちら の説明に従って
bin/kafka-consumer-groups
を実行するか、Cloud Console で Consumer Lag に移動してコンシューマーグループを選択します。Replicator のコンシューマーグループのラグを参照し、レプリケーターラグがゼロになるまで待ってから、新規クラスターを指すようにコンシューマーを再起動します。
- レプリケーターラグをモニタリングするには、こちら の説明に従って
bin/kafka-consumer-groups
を実行するか、Cloud Console の Consumer Lag に移動してコンシューマーグループを選択します。 - コンシューマーを再起動したら、コンシューマーグループのメトリクス の説明に従って
last-heartbeat-seconds-ago
を使用して、コンシューマーが実行されていることを確認します。
- レプリケーターラグをモニタリングするには、こちら の説明に従って
すべてのコンシューマーが稼働状態になるまで待ちます。
新規クラスターでプロデューサーを開始します。
プロデューサーの outgoing-byte-rate メトリクス の説明に従って、
outgoing-byte-rate
をモニタリングします。送信先クラスターでラグをモニタリングします。
これをモニタリングするには、Cloud Console でクラスターのアクティビティモニターを使用します。
また、コンシューマーグループのメトリクス の説明に従って、送信先クラスターで
bin/kafka-consumer-groups
を実行することもできます。
クイックスタート¶
このクイックスタートでは、以下を前提としています。
- Amazon Web Services(AWS) 上にある 2 つの Confluent Cloud クラスター間でトピックを移行する。
- Amazon EC2 の Ubuntu VM で Replicator を実行する。
- Replicator を実行可能ファイルとして実行する。
実際のシナリオでは、他のクラウドプラットフォームのノード(たとえば、GCP と Google Cloud Console や Microsoft Azure など)からクラスターを移行する場合があります。その場合も大まかな手順は同じです。
クラウドインスタンスをセットアップする¶
Java をインストールします。
sudo apt-get install default-jre
「Ubuntu および Debian で Systemd を使用する手動インストール」の説明に従って、APT を使用して Confluent Platform をフルインストールします。
プロパティの構成¶
コンシューマー、プロデューサー、およびレプリケーション用の 3 つの構成ファイルがあります。これらのファイルに関する最小の構成変更を以下に示します。
ちなみに
以下の bootstrap.servers
と sasl.jaas.config
は、Confluent Cloud クラスターの対応する値に置き換えてください。
consumer.properties
bootstrap.servers=<source bootstrap server> ssl.endpoint.identification.algorithm=https sasl.mechanism=PLAIN sasl.jaas.config=org.apache.kafka.common.security.plain.PlainLoginModule required username="<username>" password="<password>"; security.protocol=SASL_SSL
producer.properties
bootstrap.servers=<destination bootstrap server> ssl.endpoint.identification.algorithm=https sasl.mechanism=PLAIN sasl.jaas.config=org.apache.kafka.common.security.plain.PlainLoginModule required username="<username>" password="<password>"; security.protocol=SASL_SSL
replication.properties
topic.whitelist
の "Movies" は、送信元クラスターからレプリケートするトピックに置き換えてください。topic.whitelist=Movies topic.rename.format=${topic}-replica topic.auto.create=true topic.timestamp.type=CreateTime dest.topic.replication.factor=3
ちなみに
上記の例では、topic.rename.format
、topic.auto.create
、および topic.timestamp.type
にデフォルト値を設定しています。そのため、実際にはこれらのプロパティを指定する必要はありません。値を変更する場合は、そのプロパティとカスタム値を replication.properties
に指定してください。
Replicator を実行する¶
Replicator の実行可能ファイルを実行して、トピックを移行します。
confluent-5.3.0/bin/replicator \
--consumer.config ./diyrepl/consumer.properties \
--producer.config ./diyrepl/producer.properties \
--cluster.id replicator \
--replication.config ./diyrepl/replication.properties
トピックの移行を検証する¶
トピックの移行が正常に行われたことを検証するには、Confluent CLI に ログイン してコマンドを実行し、トピックデータがターゲットクラスターにレプリケートされていることを確認します。
送信元クラスターと送信先クラスターの現在の内容を確認したら、クライアントを新規クラスターに切り替え、コンシューマーを実行して新規クラスターの送信先トピックから読み取りを行います。
送信先トピックの現在の内容を確認する¶
クラスターをリスト表示して、送信先クラスター ID をメモします。
confluent kafka cluster list
送信先クラスターを選択します。
confluent kafka cluster use <ID-for-destination-cluster>
コンシューマーを実行して、送信先トピック(たとえば、
Movies-replica
など)から読み取りを行います。confluent kafka topic consume --from-beginning 'Movies-replica'
送信元トピックの現在の内容を確認する¶
送信先クラスターを選択します。
confluent kafka cluster use <ID-for-source-cluster>
コンシューマーを実行して、送信元のトピック(たとえば、
Movies
など)から読み取りを行います。confluent kafka topic consume --from-beginning 'Movies'
新規クラスターに切り替える¶
「クライアントを新規クラスターに切り替える」の説明に従って、プロデューサーとコンシューマーを新規クラスターに切り替えます。
ログと送信先トピックを確認する¶
実行中の Replicator タスクの出力で、処理内容を示すログを確認します。
コンシューマーを実行して、送信先トピックから再度読み取りを行います。
confluent kafka topic consume --from-beginning 'Movies'
開発クラスターから Confluent Cloud へのトピックの移行¶
開発クラスターは、Confluent Cloud にデプロイする前に Confluent Platform の機能を試すのに最適な環境です。ただし、通常、開発クラスターでは回復性の要件が緩和されているため、Confluent Cloud に移行するときに問題が生じることがあります。
開発クラスターのトピックデータを Confluent Cloud に移行する場合に、Replicator を使用して移行することができます。手順は、クラウドのデプロイ環境間でトピックを移行する手順(こちら を参照)と同じですが、以下の追加の考慮事項を検討する必要があります。
Confluent Cloud からのトピックの移行¶
Replicator を使用して、トピックとスキーマを Confluent Cloud から移行することができます。
重要
Confluent Cloud からトピックをレプリケートする場合、送信先クラスターでトピックを事前に作成し、topic.auto.create=false
および topic.config.sync=false
を設定する必要があります。
回復性¶
Confluent Cloud トピックは、回復性を確保するために、以下の構成を使用してチューニングされています。
replication.factor=3
min.insync.replicas=2
5.5 以降では、dest.topic.replication.factor
を 3 に設定し、topic.auto.create
を有効にする必要があります。
注釈
デフォルトでは、Replicator はトピックデータのレプリケート時にトピック構成を保持しますが、開発クラスターで Confluent Cloud の要件(たとえば、replication.factor
3 に対応するには、3 台以上のブローカーが必要です)が満たされていることはほとんどありません。
これに対処するには、以下に示すように、Replicator のトピック構成同期機能とトピック自動作成機能を無効にします。
topic.config.sync=false
topic.auto.create=false
ライセンス設定¶
デフォルトでは、Replicator は送信先クラスターに Confluent Platform ライセンスを保管します。