スキーマの移行

Looking for Schema Management Confluent Cloud docs? You are currently viewing Confluent Platform documentation. If you are looking for Confluent Cloud docs, check out Schema Management on Confluent Cloud.

Confluent Platform 5.2.0 以降では、Replicator を使用して、Confluent Schema Registry に格納されているスキーマを別のクラスターに移行することができます。移行先は、セルフマネージド型クラスターでも Confluent Cloud 内のクラスターでもかまいません。

このページでは主に、自己管理型クラスターから Confluent Cloud へのスキーマの移行について取り上げていますが、冒頭の短いセクションには、自己管理型クラスター間でスキーマを移行する詳細なデモへのリンクを掲載しています。最後の「おすすめの関連情報」には、Schema Registry の構成オプションへのリンクを記載しており、それらの情報はどちらのシナリオでも参考にしていただけます。

ちなみに

現在、Confluent Replicator を使用してスキーマを移行する場合、送信元クラスターとして Confluent Cloud はサポートされません。Confluent Cloud は、送信先クラスターとしてのみ使用できます。別の方法として、Schema Registry の REST API を使用してスキーマを移行し、必要なデプロイを行うこともできます。スキーマに関する Confluent Cloud の制限やストレージ領域の管理について詳しくは、「Confluent Cloud の Schema Registry」の API リファレンスを参照してください。

注釈

新しいバージョンの Replicator を使用して、古いバージョンの Kafka クラスターのデータを Confluent Cloud にレプリケートすることはできません。具体的には、Replicator バージョン 5.4.0 以降を使用して、Apache Kafka® v0.10.2 以前のクラスターや Confluent Platform v3.2.0 以前のクラスターのデータを Confluent Cloud にレプリケートすることはできません。これらの古いバージョンのクラスターを使用している場合は、アップグレードできるようになるまで、Replicator 5.0.x を使用して Confluent Cloud にレプリケートしてください。次の点に留意したうえで、適切にアップグレードを計画してください。

自己管理型クラスター間でのスキーマの移行

自己管理型(オンプレミス)クラスター間でスキーマの継続移行または 1 回限りのリフトアンドシフト移行をセットアップすることもできます。

自己管理型クラスター間でスキーマを移行するデモについては、「Replicator スキーマ変換のサンプル」を参照してください。

Confluent Cloud へのスキーマの移行

Confluent Cloud は、Confluent Platform をベースとしたフルマネージド型のストリーミングデータサービスです。Kafka アプリケーションは自己管理型の Kafka から Confluent Cloud に "リフトアンドシフト" したり "クラウドに拡張" したりできますが、同じことは Confluent Schema Registry でも行えます。

既に Schema Registry を使用して Kafka アプリケーションのスキーマを管理している場合に、そのデータの一部または全部をスキーマ管理とともにクラウドに移す必要があれば、Replicator を使用して、既存のスキーマを Confluent Cloud Schema Registry に移行できます。(「Replicator」および「Replicator を実行可能ファイルとして実行する」を参照してください)

Schema Registry の継続移行をセットアップしてハイブリッドデプロイ(クラウドへの拡張)を維持する方法と、1 回限りの移行を使用してすべて Confluent Cloud にリフトアンドシフトする方法とがあります。

継続移行

継続移行では、自己管理型の Schema Registry をプライマリとして、また Confluent Cloud Schema Registry をセカンダリとして使用することができます(クラウドへの拡張)。これは "クラウドへのブリッジ" とも呼ばれます。新しいスキーマは自己管理型の Schema Registry (オリジン)に直接登録され、そこから、IMPORT モードに設定された Confluent Cloud Schema Registry (デスティネーション)に、Confluent Replicator が継続的にスキーマをコピーします。

../../_images/sr-extend-to-cloud.ja.png

1 回限りの移行

すべてのデータをフルマネージド型の Confluent Cloud サービスに移す(リフトアンドシフトする)には、1 回限りの移行を選択します。この場合、Confluent Cloud Schema Registry をプライマリとして自己管理型の既存の Schema Registry を移行します。新しいスキーマはすべて Confluent Cloud Schema Registry に登録され、一元管理された Kafka クラスターに格納されます。このシナリオでは、Confluent Cloud Schema Registry から自己管理型の Schema Registry への再移行は行われません。

../../_images/sr-lift-and-shift.ja.png

トピックとスキーマ

スキーマは Kafka のトピックに関連付けられ、サブジェクトの下で整理されて Schema Registry に格納されます(「用語の復習」を参照)。

以下のクイックスタートでは、Schema Registry とそこに格納されているスキーマを移行する方法について説明しています。ただし、Kafka のトピックを移行する方法は取り上げていません。

継続移行(クラウドへの拡張)の場合、トピックはプライマリ(自己管理型クラスター)に存在し続けるため、必要なのはスキーマの移行だけです。

1 回限りの移行(リフトアンドシフト)の場合、スキーマの移行後、トピックの移行を行う必要があります。クイックスタートの後の「次のステップ」で言及されているように、Replicator を使用して、Confluent Cloud クラスターにトピックを移行します。

ちなみに

次のように、スキーマのデフォルトのサブジェクトトランスレーターを使用するように Confluent Replicator を構成したとします。

schema.subject.translator.class=io.confluent.connect.replicator.schemas.DefaultSubjectTranslator

この場合、スキーマの移行中にターゲット Schema Registry 内のサブジェクトの名前を変更するには、topic.rename.format を使用します。

たとえば、topic.rename.format${topic}-replica である場合、移行元の Schema Registry にある mytopic-value というサブジェクトが、移行先の "新しい" レジストリでは、mytopic-replica-value という名前に変更されます。

名前が変更されるのは、サブジェクト命名方法として TopicNameStrategy-key または -value が付いたサブジェクト名)を使用するスキーマだけです。その他のサブジェクト(異なる命名方法を使用するサブジェクト)はすべて移行されますが、名前は変更されません。

topic.rename.format プロパティについては、「Confluent Replicator 構成プロパティ」の「送信先トピック」を参照してください。

クイックスタート

このクイックスタートでは、各種のデプロイ(オンプレミスサーバーまたはデータセンターから Confluent Cloud Schema Registry)に適用できる Schema Registry の移行方法について説明します。また、いくつかの例を通じて、ローカルクラスターから Confluent Cloud にスキーマを移行するにあたっての基本的な知識を身に付けることができます。

前提条件

始める前に

Confluent Platform を初めて利用する方は、まず、次のクイックスタートとチュートリアルに取り組み、プラットフォーム(プロデューサー、コンシューマー、ブローカーの役割など)や Confluent Cloud、Schema Registry の基本的な知識を身に付けることをお勧めします。それらのワークフローを体験することで、スキーマの移行にまつわる背景が理解しやすくなります。

スキーマの移行を開始する前に、次の要件を満たしていることを確認してください。

スキーマを移行するためには、Replicator を構成して実行する必要があります。ここで取り上げる例には含まれない詳しい情報については、Replicator のチュートリアル を参照してください。

スキーマの移行

Schema Registry および関連するスキーマを Confluent Cloud に移行するには、次の手順に従います。

  1. オリジンクラスターを起動します。

    ローカルクラスター(「Confluent Platform のクイックスタート(ローカルインストール)」でダウンロードしたものなど)を実行している場合は、このチュートリアルの目的上、Confluent CLI confluent local コマンドを使用して Schema Registry のみを起動します。

    <path-to-confluent>/bin/confluent local services schema-registry start
    

    ちなみに

    The examples here show how to use a Replicator worker in standalone mode for schema migration. In this mode, you cannot run Kafka Connect and Replicator at the same time, because Replicator also runs Connect. If you run Replicator in distributed mode, the setup is different and you do not have this limitation (you can use ./bin/confluent local services start). For more about configuring and running Connect workers (includig Replicator) in standalone and distributed modes, see Running Workers in the Connect guide.

  2. schema-registrykafkazookeeper が実行されていることを確認します。

    たとえば、<path-to-confluent>/bin/confluent local services status を実行します。

    Schema Registry is [UP]
    Kafka is [UP]
    Zookeeper is [UP]
    
  3. Confluent Cloud のデスティネーション Schema Registry にサブジェクトが存在しないことを確認します。

    curl -u <schema-registry-api-key>:<schema-registry-api-secret> <schema-registry-url>/subjects
    

    サブジェクトが存在しない場合、出力は空([])になります(想定される状態です)。

    サブジェクトが存在する場合は削除してください。その例を次に示します。

    curl -X DELETE -u <schema-registry-api-key>:<schema-registry-api-secret> <schema-registry-url>/subjects/my-existing-subject
    
  4. デスティネーション Schema Registry を IMPORT モードに設定します。その例を次に示します。

    curl -u <schema-registry-api-key>:<schema-registry-api-secret> -X PUT -H "Content-Type: application/json" "https://<destination-schema-registry>/mode" --data '{"mode": "IMPORT"}'
    

    ちなみに

    デスティネーション Schema Registry にサブジェクトが存在すると、{"error_code":42205,"message":"Cannot import since found existing subjects"} のようなメッセージが表示されてインポートに失敗します。

  5. Replicator ワーカーの構成で、送信先クラスターにおけるブローカーのアドレスを指定します。「Replicator の構成および実行」を参照してください。

    ちなみに

    Confluent Platform 5.2.0 以降の Replicator では、Schema Registry の移行がサポートされます。

    ワーカーの構成ファイルは <path-to-confluent>/etc/kafka/connect-standalone.properties にあります。

    # Connect Standalone Worker configuration
    bootstrap.servers=<path-to-cloud-server>:9092
    
  6. Schema Registry と送信先クラスターの情報を使用して Replicator を構成します。

    • スタンドアロンの Connect インスタンスの場合、<path-to-confluent>etc/kafka-connect-replicator/quickstart-replicator.properties で次のプロパティを構成します。

      # basic connector configuration
      name=replicator-source
      connector.class=io.confluent.connect.replicator.ReplicatorSourceConnector
      
      key.converter=io.confluent.connect.replicator.util.ByteArrayConverter
      value.converter=io.confluent.connect.replicator.util.ByteArrayConverter
      header.converter=io.confluent.connect.replicator.util.ByteArrayConverter
      
      tasks.max=4
      
      # source cluster connection info
      src.kafka.bootstrap.servers=localhost:9092
      
      # destination cluster connection info
      dest.kafka.ssl.endpoint.identification.algorithm=https
      dest.kafka.sasl.mechanism=PLAIN
      dest.kafka.request.timeout.ms=20000
      dest.kafka.bootstrap.servers=<path-to-cloud-server>:9092
      retry.backoff.ms=500
      dest.kafka.sasl.jaas.config=org.apache.kafka.common.security.plain.PlainLoginModule required username="<encrypted-username>" password="<encrypted-password>";
      dest.kafka.security.protocol=SASL_SSL
      
      # Schema Registry migration topics to replicate from source to destination
      # topic.whitelist indicates which topics are of interest to replicator
      topic.whitelist=_schemas
      # schema.registry.topic indicates which of the topics in the ``whitelist`` contains schemas
      schema.registry.topic=_schemas
      
      # Connection settings for destination Confluent Cloud Schema Registry
      schema.registry.url=https://<path-to-cloud-schema-registry>
      schema.registry.client.basic.auth.credentials.source=USER_INFO
      schema.registry.client.basic.auth.user.info=<schema-registry-api-key>:<schema-registry-api-secret>
      
    • クラスターで SSL が有効になっている場合は、Schema Registry クライアントの SSL 構成を適切に設定する必要があります。

      # SSL configurations for clients to Schema Registry
       schema.registry.client.schema.registry.ssl.truststore.location
       schema.registry.client.schema.registry.ssl.truststore.type
       schema.registry.client.schema.registry.ssl.truststore.password
       schema.registry.client.schema.registry.ssl.keystore.location
       schema.registry.client.schema.registry.ssl.keystore.type
       schema.registry.client.schema.registry.ssl.keystore.password
       schema.registry.client.schema.registry.ssl.key.password
      

      ちなみに

      Schema Registry クライアントの構成には schema.registry.client プレフィックスが必要です。詳細については、「Schema Registry のセキュリティの概要」の「Schema Registry のクライアント」を参照してください。

  7. quickstart-replicator.properties では、ブローカーを 1 つ含んだ開発クラスターでのデモを想定して、レプリケーション係数が 1 に設定されています。このスキーマ移行チュートリアルおよび本稼働環境では、これを 3 以上に変更してください。

    confluent.topic.replication.factor=3
    

    参考

    分散モードの Replicator に使用される JSON 構成の例については、GitHub の examples リポジトリ にある submit_replicator_schema_migration_config.sh を参照してください。

  8. スキーマの移行を実行できるように Replicator を起動します。

    その例を次に示します。

    <path-to-confluent>/bin/connect-standalone <path-to-confluent>/etc/kafka/connect-standalone.properties \
    <path-to-confluent>/etc/kafka-connect-replicator/quickstart-replicator.properties
    

    Replicator の起動に使用するメソッドまたはコマンドは、アプリケーションのセットアップに依存するため、この例とは異なる場合があります。詳細については、「Replicator の構成および実行」および「Replicator の構成および実行」を参照してください。

  9. Kafka に対してメッセージを生成しているプロデューサーをすべて停止します。

  10. レプリケーションラグが 0 になるまで待ちます。

    詳細については、「Replicator ラグのモニタリング」を参照してください。

  11. Replicator を停止します。

  12. 移行元である自己管理型 Schema Registry のプロパティファイルの構成に次の行を追加してモードの変更を有効にし、再起動します。

    mode.mutability=true
    

    重要

    モードがサポートされるのは、Schema Registry のバージョン 5.2 以降のみです。この手順と次の手順(Schema Registry を READYONLY に設定する手順)は念のための措置であり、必須ではありません。バージョン "5.1" 以前の Schema Registry をご使用の方は、この 2 つの手順は省略してかまいません。ただし、以後ソース Schema Registry にスキーマが登録されないよう、すべてのプロデューサーを確実に停止してください。

  13. ソース Schema Registry を READONLY モードに設定します。

    curl -u <schema-registry-api-key>:<schema-registry-api-secret> -X PUT -H "Content-Type: application/json" "https://<source-schema-registry>/mode" --data '{"mode": "READONLY"}'
    
  14. デスティネーション Schema Registry を READWRITE モードに設定します。

    curl -u <schema-registry-api-key>:<schema-registry-api-secret> -X PUT -H "Content-Type: application/json" "https://<destination-schema-registry>/mode" --data '{"mode": "READWRITE"}'
    
  15. コンシューマーをすべて停止します。

  16. クラウド内のデスティネーション Schema Registry を参照するようにすべてのコンシューマーを構成し、それらを再起動します。

    たとえば、Java クライアントで Schema Registry を構成する場合、Schema Registry の URL、認証 USER_INFO のタイプ、資格情報を指定するプロパティファイルで(またはコードから)、Schema Registry の URL をソースからデスティネーションに変更します。

    その他の例については、「Java コンシューマー」を参照してください。

  17. クラウド内の Schema Registry を参照するようにすべてのプロデューサーを構成し、それらを再起動します。

    その他の例については、「Java プロデューサー」を参照してください。

  18. (省略可)ソース Schema Registry を停止します。

次のステップ

おすすめの関連情報

また、「Confluent Replicator 構成プロパティ」で次のセクションを参照してください。

最後に、自己管理型のオンプレミスクラスター間でスキーマを移行するデモを「Replicator スキーマ変換のサンプル」でご覧ください。