重要

このページの日本語コンテンツは古くなっている可能性があります。最新の英語版コンテンツをご覧になるには、こちらをクリックしてください。

Confluent Cloud クラスターでのトピックの移行

Confluent Cloud は現在、複数のクラスタータイプを提供しています。Confluent Cloud クラスター間でトピックデータを移行する場合、次の 2 つのオプションがあります。

  • Cluster Linking によるデータ移行」の説明に従って Cluster Linking を使用する(推奨)
  • 以下のセクションの説明に従って Confluent Replicator を使用する(従来の方法)

このドキュメントでは、Replicator を使用して、最小のアプリケーションダウンタイムで既存の Confluent Cloud 専用クラスターをスタンダードクラスターに移行する方法について説明します。

使用可能なクラスタータイプについて詳しくは、「クラスタータイプごとの Confluent Cloud の機能と制限」を参照してください。

ちなみに

  • この手順ではトピックデータのみを扱います。クラスターの他の要素(実行中のコネクター、ACL、Streams アプリケーション、ksqlDB ストリーム、スキーマなど)はこの手順とは別に移行する必要があります。これについては、このワークフローの中でも適宜説明しています。
  • テスト目的で、セルフマネージド型の開発クラスターのトピックを Confluent Cloud に移行 することもできます。

注釈

新しいバージョンの Replicator を使用して、古いバージョンの Kafka クラスターのデータを Confluent Cloud にレプリケートすることはできません。具体的に言うと、Replicator バージョン 5.4.0 以降を使用して、Apache Kafka® v0.10.2 以前のクラスターや Confluent Platform v3.2.0 以前のクラスターのデータを Confluent Cloud にレプリケートすることはできません。これらの古いバージョンのクラスターを使用している場合は、アップグレードできるようになるまで、Replicator 5.0.x を使用して Confluent Cloud にレプリケートしてください。次の点に留意したうえで、適切にアップグレードを計画してください。

Replicator のデプロイオプション

トピックデータを移行するには、3 つのモードのいずれかで Replicator を実行します。これらのモードは機能的には同じですが、どのような状況から開始するかに応じて、特定のモードが他のモードよりも適した選択肢になります。

Replicator のモード 利点とシナリオ
分散 Connect クラスター内のコネクターとして(VM 上で)動作する 送信先クラスターに、既に使用している Connect クラスターがある場合には、このモードが理想的です。
VM 上に置かれたパッケージ化された実行可能ファイルとして動作する 使いやすい 3 つの構成ファイル(レプリケーター用、コンシューマー用、プロデューサー用)を分離して、Connect クラスターを明示的に構成しなくて済むようにします。
Kubernetes 上に置かれたパッケージ化された実行可能ファイルとして動作する 上記と似ていますが、分離された 1 つのタスクであるほうが開始しやすい場合があります。既に Kubernetes でタスクを管理している場合は、このモードが理想的です。

前提条件タスク

トピックの移行を開始する前に、ネットワークとセキュリティの考慮事項について考慮し、トピック以外のメタデータを保存し、クラスター情報を収集してください。これらの前提条件について、以下で説明します。

場所を選択する

Confluent Cloud クラスターにネットワーク経由でアクセスできる仮想プライベートクラウド(VPC)に、仮想マシン(VM)または Kubernetes(k8s) インスタンスをデプロイする必要があります。ベストプラクティスとして、Replicator は、送信先クラスターのできる限り近くで実行するようにしてください。つまり、Confluent Cloud の場合は、送信先の Confluent Cloud デプロイ環境と同じリージョンで Replicator クラスターを実行する必要があります。

注釈

Confluent Cloud クラスターのいずれかに VPC ピアリングが構成されている場合は、Confluent Cloud とピアリングされている VPC のいずれかに Replicator をデプロイする必要があります。

クラスター情報を調べる

Replicator の構成には、以下の情報が必要です。

  • 送信元クラスターと送信先クラスターの API キーとシークレット。

    これらの情報を取得するには、Confluent Cloud Console の Kafka API keys に移動します。

  • bootstrap.servers URL。

    この情報を取得するには、Cloud Console の Cluster Settings に移動し、bootstrap.servers の隣の URL をメモします。

切り替える前に、トピック以外のメタデータを保存して新規クラスターをセットアップする

Replicator は、ksqlDB ジョブ、スキーマ、ACL、コネクター、およびサービスアカウントを新規クラスターに同期しません。

Replicator をオンにしてトピックデータをレプリケートする前に、このようなトピック以外のメタデータで移行するものをすべてメモし、新規クラスター上に手動で再作成してください。

オフセットを変換するためにコンシューマーにタイムスタンプインターセプターを構成する

Java コンシューマーアプリケーションに、タイムスタンプを使用してオフセットを自動的に変換する ConsumerTimestampsInterceptor を構成します。これによって、コンシューマーが、送信先クラスターで、送信元クラスターで中断した場所からデータを消費できるようになります。

interceptor.classes=io.confluent.connect.replicator.offsets.ConsumerTimestampsInterceptor

重要

  • timestamp-interceptor はコンシューマーにのみ設定できます。
  • Replicator にこのインタセプターを設定しないでください。
  • Connect ワーカーでインターセプターを設定する場合は注意が必要です。インターセプターが有効に機能するためには、ワーカーがシンクコネクターを実行していること、また、それらのコネクターがオフセット管理のために Kafka を使用していることが必要です。

このインターセプターは、timestamp-interceptor-current.jarkafka-connect-replicator の下にあります。この JAR をコンシューマーの CLASSPATH に指定しておく必要があります。

ちなみに

  • JAR ファイルの場所は、プラットフォームと Confluent インストール のタイプによって異なります。たとえば、zip.tar を使用した Mac OS インストールの場合、デフォルトでは、timestamp-interceptor-<version>.jar は Confluent ディレクトリの /share/java/kafka-connect-replicator にあります。

timestamp-interceptor は、以下の Confluent Maven リポジトリにあります。

<repository>
  <id>confluent</id>
  <url>https://packages.confluent.io/maven/</url>
</repository>

Maven プロジェクトに、以下の依存関係を含めてください。

<dependency>
    <groupId>io.confluent</groupId>
    <artifactId>timestamp-interceptor</artifactId>
    <version>current</version>
</dependency>

ConsumerTimestampsInterceptor は、オフセット変換のために必要なタイムスタンプ情報を保持するために、新規トピック __consumer_timestamps に書き込みを行います。Replicator は、この新規トピックから読み取ったタイムスタンプを使用して、セカンダリクラスターで正確なオフセットを判別します。

重要

ConsumerTimestampsInterceptor は、送信元クラスターの __consumer_timestamps トピックに対するプロデューサーであるため、適切なセキュリティ構成を必要とします。これらの構成は、timestamps.producer. プレフィックスを使用して指定します。たとえば、timestamps.producer.security.protocol=SSL などと指定します。セキュリティ構成の詳細については、以下を参照してください。

インターセプターには、__consumer_timestamps トピックについての ACL も必要になります。__consumer_timestamps トピックに対する WRITE と DESCRIBE の操作がコンシューマーのプリンシパルに必要になります。

詳細については、以下を参照してください。

ACL を構成する

Replicator に対して、送信元クラスターの Kafka データを読み取り、送信先 Confluent Cloud クラスターに Kafka データを書き込むことを認可する必要があります。Replicator は、スーパーユーザーの認証情報ではなく、Confluent Cloud サービスアカウントを使用して実行する必要があります。そのため、Confluent CLI を使用して、Confluent Cloud で Replicator に対応するサービスアカウント ID に対して適切な ACL を構成する必要があります。Replicator ACL の詳細については、「セキュリティ」を参照してください。

confluent kafka acl create --allow --service-account <service-account-id> --operation CREATE --topic <replicated-topic>
confluent kafka acl create --allow --service-account <service-account-id> --operation WRITE --topic <replicated-topic>
confluent kafka acl create --allow --service-account <service-account-id> --operation READ --topic <replicated-topic>
confluent kafka acl create --allow --service-account <service-account-id> --operation DESCRIBE --topic <replicated-topic>
confluent kafka acl create --allow --service-account <service-account-id> --operation DESCRIBE-CONFIGS --topic <replicated-topic>
confluent kafka acl create --allow --service-account <service-account-id> --operation ALTER-CONFIGS --topic <replicated-topic>
confluent kafka acl create --allow --service-account <service-account-id> --operation DESCRIBE --cluster-scope

また、以下のライセンストピックの ACL も設定する必要があります。

confluent kafka acl create --allow --service-account <service-account-id> --operation READ --topic _confluent-command
confluent kafka acl create --allow --service-account <service-account-id> --operation WRITE --topic _confluent-command
confluent kafka acl create --allow --service-account <service-account-id> --operation DESCRIBE --topic _confluent-command

ライセンストピックは本来デフォルトで作成されますが、なんらかの理由でこのトピックがまだ存在しない場合は、DESCRIBE に加え、CREATE の ACL をクラスターに設定する必要があります。DESCRIBE は上記で既に設定していますが、目的を明確にするため、ここで改めて言及しています。これらの ACL を --cluster-scope で設定します。このフラグに引数はありません。これにより、使用されているクラスターに ACL が設定されます。(使用されているクラスターを確認するには、confluent kafka cluster list と入力します。アスタリスクが表示されているクラスターが使用されています。別のクラスターを使用するには、confluent kafka cluster use <cluster-id> と入力します。)

confluent kafka acl create --allow --service-account <service-account-id> --operation CREATE --cluster-scope
confluent kafka acl create --allow --service-account <service-account-id> --operation DESCRIBE --cluster-scope

ちなみに

  • Replicator を実行可能ファイルとして実行する場合は、Replicator は内部の Connect ワーカーで実行されるので、「ワーカーの ACL 要件」で説明している ACL が必要になります。
  • Replicator をコネクターとして実行する場合は、特に指定がない限り、Replicator の構成は Connect クラスターの構成を継承します(上記と同じ ACL が必要になります)。

Replicator を構成および実行してトピックを移行する

すべての前提条件タスクが完了したら、Replicator を構成および実行して、トピックを移行します。

使用する Replicator モード に応じて、以下のいずれかの方法を選択してください。

ちなみに

下記のデプロイ方法のどれを使用するかにかかわらず、こちらの ガイドを使用して、Replicator をさらにチューニングすることができます。

Replicator を実行可能ファイルとして実行する

ちなみに

  • これは、VM ベースの手法と Kubernetes ベースの手法の両方に適用されます。
  • Replicator の実行可能ファイルは、ネットワークに接続されている任意の仮想ノード、クラウドノード、または物理マシン(デスクトップまたはノート PC)で実行できます。

プロパティの構成

実行可能ファイルには、3 つの構成ファイル(コンシューマー、プロデューサー、レプリケーション)があります。これらのファイルに関する最小限の構成変更を以下に示します。

  • consumer.properties

    ssl.endpoint.identification.algorithm=https
    sasl.mechanism=PLAIN
    bootstrap.servers=<source bootstrap-server>
    retry.backoff.ms=500
    sasl.jaas.config=org.apache.kafka.common.security.plain.PlainLoginModule required username="<api-key>" password="<secret>";
    security.protocol=SASL_SSL
    
  • producer.properties

    ssl.endpoint.identification.algorithm=https
    sasl.mechanism=PLAIN
    bootstrap.servers=<destination bootstrap-server>
    sasl.jaas.config=org.apache.kafka.common.security.plain.PlainLoginModule required username="<api-key>" password="<secret>";
    security.protocol=SASL_SSL
    
  • replication.properties - replication.properties では特別な構成は必要ありません。

Replicator を実行する

Replicator の実行可能ファイルを実行して、トピックを移行します。

./bin/replicator --cluster.id replicator --consumer.config consumer.properties --producer.config producer.properties --topic.regex '.*'

Replicator をコネクターとして実行する

コネクターの JSON は以下のようになります。

{
"name": "replicate-topic",
"config": {
    "connector.class": "io.confluent.connect.replicator.ReplicatorSourceConnector",
    "key.converter": "io.confluent.connect.replicator.util.ByteArrayConverter",
    "value.converter": "io.confluent.connect.replicator.util.ByteArrayConverter",
    "src.kafka.ssl.endpoint.identification.algorithm":"https",
    "src.kafka.sasl.mechanism":"PLAIN",
    "src.kafka.request.timeout.ms":"20000",
    "src.kafka.bootstrap.servers":"<source bootstrap server>",
    "src.kafka.retry.backoff.ms":"500",
    "src.kafka.sasl.jaas.config":"org.apache.kafka.common.security.plain.PlainLoginModule required username=\"<api-key>\" password=\"<secret>\";",
    "src.kafka.security.protocol":"SASL_SSL",
    "dest.kafka.ssl.endpoint.identification.algorithm":"https",
    "dest.kafka.sasl.mechanism":"PLAIN",
    "dest.kafka.request.timeout.ms":"20000",
    "dest.kafka.bootstrap.servers":"<destination bootstrap server>",
    "dest.kafka.retry.backoff.ms":"500",
    "dest.kafka.sasl.jaas.config":"org.apache.kafka.common.security.plain.PlainLoginModule required username=\"<api-key>\" password=\"<secret>\";",
    "dest.kafka.security.protocol":"SASL_SSL",
    "dest.topic.replication.factor":"3",
    "topic.regex":".*"
    }
}

まだ構成していない場合は、分散 Connect クラスターを以下に示すように適切に構成してください。

ssl.endpoint.identification.algorithm=https
sasl.mechanism=PLAIN
bootstrap.servers=<dest bootstrap server>
sasl.jaas.config=org.apache.kafka.common.security.plain.PlainLoginModule required username="<api-key>" password="<secret>";
security.protocol=SASL_SSL
producer.ssl.endpoint.identification.algorithm=https
producer.sasl.mechanism=PLAIN
producer.bootstrap.servers=<dest bootstrap server>
producer.sasl.jaas.config=org.apache.kafka.common.security.plain.PlainLoginModule required username="<api-key>" password="<secret>";
producer.security.protocol=SASL_SSL
key.converter=org.apache.kafka.connect.storage.StringConverter
value.converter=org.apache.kafka.connect.json.JsonConverter
group.id=connect-replicator
config.storage.topic=connect-configs1
offset.storage.topic=connect-offsets1
status.storage.topic=connect-statuses1
plugin.path=<confluent install dir>/share/java

Kubernetes で Replicator の Docker コンテナーを実行する

  1. 既存のシークレットを削除します(存在する場合)。

    kubectl delete secret replicator-secret-props
    
  2. 変更されている場合は、構成を再生成します。(「クイックスタート」を参照してください。)

  3. 新しいシークレットをアップロードします。

    kubectl create secret generic replicator-secret-props --from-file=/tmp/replicator/
    
  4. ポッドを再ロードします。

    kubectl apply -f container/replicator-deployment.yaml
    

    replicator-deployment.yaml の例を以下に示します。

    apiVersion: extensions/v1beta1
    kind: Deployment
    metadata:
      name: repl-exec-connect-cluster
    spec:
      replicas: 1
      template:
        metadata:
          labels:
            app: replicator-app
        spec:
          containers:
            - name: confluent-replicator
              image: confluentinc/cp-enterprise-replicator-executable
              env:
                - name: CLUSTER_ID
                  value: "replicator-k8s"
                - name: CLUSTER_THREADS
                  value: "1"
                - name: CONNECT_GROUP_ID
                  value: "containerized-repl"
    
                  # Note: This is to avoid _overlay errors_ . You could use /etc/replicator/ here instead.
                - name: REPLICATION_CONFIG
                  value: "/etc/replicator-config/replication.properties"
                - name: PRODUCER_CONFIG
                  value: "/etc/replicator-config/producer.properties"
                - name: CONSUMER_CONFIG
                  value: "/etc/replicator-config/consumer.properties"
              volumeMounts:
                - name: replicator-properties
                  mountPath: /etc/replicator-config/
          volumes:
            - name: replicator-properties
              secret:
                secretName: "replicator-secret-props"
                defaultMode: 0666
    
  5. ステータスを確認します。

    kubectl get pods kubectl logs <pod-name> -f
    

ちなみに

Google Kubernetes Engine(GKE)にデプロイされた Confluent Platform でデータを Confluent Cloud クラスターにレプリケートする サンプルデモ も参照してください。

クライアントを新規クラスターに切り替える

Replicator がセットアップされたら、次のようにしてクライアントを新規クラスターに切り替えます。

  1. Confluent Cloud Console の Replication タブでレプリケーションラグを表示して、送信元から送信先にデータがレプリケートされていることを確認します。

  2. プロデューサーを停止します。

  3. コンシューマーラグがゼロになるまで待ちます。

    これをモニタリングするには、こちら の説明に従って bin/kafka-consumer-groups を実行するか、Cloud Console で Consumer Lag に移動してコンシューマーグループを選択します。

  4. Replicator のコンシューマーグループのラグを参照し、レプリケーターラグがゼロになるまで待ってから、新規クラスターを指すようにコンシューマーを再起動します。

    • レプリケーターラグをモニタリングするには、こちら の説明に従って bin/kafka-consumer-groups を実行するか、Cloud Console の Consumer Lag に移動してコンシューマーグループを選択します。
    • コンシューマーを再起動したら、コンシューマーグループのメトリクス の説明に従って last-heartbeat-seconds-ago を使用して、コンシューマーが実行されていることを確認します。
  5. すべてのコンシューマーが稼働状態になるまで待ちます。

  6. 新規クラスターでプロデューサーを開始します。

    プロデューサーの outgoing-byte-rate メトリクス の説明に従って、outgoing-byte-rate をモニタリングします。

  7. 送信先クラスターでラグをモニタリングします。

    • これをモニタリングするには、Cloud Console でクラスターのアクティビティモニターを使用します。

      ../_images/cloud-migrate-cluster-activity-monitor.png
    • また、コンシューマーグループのメトリクス の説明に従って、送信先クラスターで bin/kafka-consumer-groups を実行することもできます。

クイックスタート

このクイックスタートでは、以下を前提としています。

  • Amazon Web Services(AWS) 上にある 2 つの Confluent Cloud クラスター間でトピックを移行する。
  • Amazon EC2 の Ubuntu VM で Replicator を実行する。
  • Replicator を実行可能ファイルとして実行する。

実際のシナリオでは、他のクラウドプラットフォームのノード(たとえば、GCP と Google Cloud ConsoleMicrosoft Azure など)からクラスターを移行する場合があります。その場合も大まかな手順は同じです。

クラウドインスタンスをセットアップする

  1. Java をインストールします。

    sudo apt-get install default-jre
    
  2. Ubuntu および Debian で Systemd を使用する手動インストール」の説明に従って、APT を使用して Confluent Platform をフルインストールします。

プロパティの構成

コンシューマー、プロデューサー、およびレプリケーション用の 3 つの構成ファイルがあります。これらのファイルに関する最小の構成変更を以下に示します。

ちなみに

以下の bootstrap.serverssasl.jaas.config は、Confluent Cloud クラスターの対応する値に置き換えてください。

  • consumer.properties

    bootstrap.servers=<source bootstrap server>
    ssl.endpoint.identification.algorithm=https
    sasl.mechanism=PLAIN
    sasl.jaas.config=org.apache.kafka.common.security.plain.PlainLoginModule required username="<username>" password="<password>";
    security.protocol=SASL_SSL
    
  • producer.properties

    bootstrap.servers=<destination bootstrap server>
    ssl.endpoint.identification.algorithm=https
    sasl.mechanism=PLAIN
    sasl.jaas.config=org.apache.kafka.common.security.plain.PlainLoginModule required username="<username>" password="<password>";
    security.protocol=SASL_SSL
    
  • replication.properties

    topic.whitelist の "Movies" は、送信元クラスターからレプリケートするトピックに置き換えてください。

    topic.whitelist=Movies
    
    topic.rename.format=${topic}-replica
    topic.auto.create=true
    topic.timestamp.type=CreateTime
    
    dest.topic.replication.factor=3
    

ちなみに

上記の例では、topic.rename.formattopic.auto.create、および topic.timestamp.type にデフォルト値を設定しています。そのため、実際にはこれらのプロパティを指定する必要はありません。値を変更する場合は、そのプロパティとカスタム値を replication.properties に指定してください。

Replicator を実行する

Replicator の実行可能ファイルを実行して、トピックを移行します。

confluent-5.3.0/bin/replicator \
  --consumer.config ./diyrepl/consumer.properties \
  --producer.config ./diyrepl/producer.properties \
  --cluster.id replicator  \
  --replication.config ./diyrepl/replication.properties

トピックの移行を検証する

トピックの移行が正常に行われたことを検証するには、Confluent CLI に ログイン してコマンドを実行し、トピックデータがターゲットクラスターにレプリケートされていることを確認します。

送信元クラスターと送信先クラスターの現在の内容を確認したら、クライアントを新規クラスターに切り替え、コンシューマーを実行して新規クラスターの送信先トピックから読み取りを行います。

送信先トピックの現在の内容を確認する

  1. クラスターをリスト表示して、送信先クラスター ID をメモします。

    confluent kafka cluster list
    
  2. 送信先クラスターを選択します。

    confluent kafka cluster use <ID-for-destination-cluster>
    
  3. コンシューマーを実行して、送信先トピック(たとえば、Movies-replica など)から読み取りを行います。

    confluent kafka topic consume --from-beginning 'Movies-replica'
    

送信元トピックの現在の内容を確認する

  1. 送信先クラスターを選択します。

    confluent kafka cluster use <ID-for-source-cluster>
    
  2. コンシューマーを実行して、送信元のトピック(たとえば、Movies など)から読み取りを行います。

    confluent kafka topic consume --from-beginning 'Movies'
    

送信元トピックに新規データを追加する

送信元トピックにデータを生成します。

confluent kafka topic produce Movies

新規クラスターに切り替える

クライアントを新規クラスターに切り替える」の説明に従って、プロデューサーとコンシューマーを新規クラスターに切り替えます。

ログと送信先トピックを確認する

  1. 実行中の Replicator タスクの出力で、処理内容を示すログを確認します。

  2. コンシューマーを実行して、送信先トピックから再度読み取りを行います。

    confluent kafka topic consume --from-beginning 'Movies'
    

開発クラスターから Confluent Cloud へのトピックの移行

開発クラスターは、Confluent Cloud にデプロイする前に Confluent Platform の機能を試すのに最適な環境です。ただし、通常、開発クラスターでは回復性の要件が緩和されているため、Confluent Cloud に移行するときに問題が生じることがあります。

開発クラスターのトピックデータを Confluent Cloud に移行する場合に、Replicator を使用して移行することができます。手順は、クラウドのデプロイ環境間でトピックを移行する手順(こちら を参照)と同じですが、以下の追加の考慮事項を検討する必要があります。

Confluent Cloud からのトピックの移行

Replicator を使用して、トピックとスキーマを Confluent Cloud から移行することができます。

重要

Confluent Cloud からトピックをレプリケートする場合、送信先クラスターでトピックを事前に作成し、topic.auto.create=false および topic.config.sync=false を設定する必要があります。

回復性

Confluent Cloud トピックは、回復性を確保するために、以下の構成を使用してチューニングされています。

replication.factor=3
min.insync.replicas=2

5.5 以降では、dest.topic.replication.factor を 3 に設定し、topic.auto.create を有効にする必要があります。

注釈

デフォルトでは、Replicator はトピックデータのレプリケート時にトピック構成を保持しますが、開発クラスターで Confluent Cloud の要件(たとえば、replication.factor 3 に対応するには、3 台以上のブローカーが必要です)が満たされていることはほとんどありません。

これに対処するには、以下に示すように、Replicator のトピック構成同期機能とトピック自動作成機能を無効にします。

topic.config.sync=false
topic.auto.create=false

ライセンス設定

デフォルトでは、Replicator は送信先クラスターに Confluent Platform ライセンスを保管します。