スキーマの移行¶
Looking for Schema Management Confluent Cloud docs? You are currently viewing Confluent Platform documentation. If you are looking for Confluent Cloud docs, check out Schema Management on Confluent Cloud.
Confluent Platform 5.2.0 以降では、Confluent Replicator を使用して Confluent Schema Registry に格納されているスキーマを別の(自己管理型または Confluent Cloud 内の)クラスターに移行することができます。
このページでは主に、自己管理型クラスターから Confluent Cloud へのスキーマの移行について取り上げていますが、冒頭の短いセクションには、自己管理型クラスター間でスキーマを移行する詳細なデモへのリンクを掲載しています。最後の「おすすめの関連情報」には、スキーマレジストリ の構成オプションへのリンクを記載しており、それらの情報はどちらのシナリオでも参考にしていただけます。
ちなみに
現在、Confluent Replicator を使用してスキーマを移行する場合、ソースクラスターとして Confluent Cloud はサポートされません。Confluent Cloud は、デスティネーションクラスターとしてのみ使用できます。別の方法として、Schema Registry の REST API を使用してスキーマを移行し、必要なデプロイを行うこともできます。スキーマに関する Confluent Cloud の制限やストレージ領域の管理について詳しくは、「Confluent Cloud の スキーマレジストリ」の API リファレンスを参照してください。
注釈
より新しいバージョンの Replicator を使用して、以前のバージョンの Kafka クラスターから Confluent Cloud にデータをレプリケートすることはできません。具体的に言うと、Replicator バージョン 5.4.0 以降を使用して、Apache Kafka® v0.10.2 以前のクラスターや Confluent Platform v3.2.0 以前から Confluent Cloud にデータをレプリケートすることはできません。こうした以前のバージョンのクラスターが存在する場合、アップグレードするまでは、Replicator 5.0.x を使用して Confluent Cloud にレプリケートしてください。次の点に留意したうえで、適切にアップグレードを計画してください。
- Confluent Platform 3.2 以降に付属する Kafka Connect ワーカーは、Confluent Platform 3.0 以降に付属するすべての Kafka ブローカーと互換性があります。「クロスコンポーネントの互換性」を参照してください。
- Confluent Platform 5.0.x のサポート終了日は 2020 年 7 月 31 日です。「サポートされているバージョンおよび相互運用性」を参照してください。
自己管理型クラスター間でのスキーマの移行¶
自己管理型(オンプレミス)クラスター間でスキーマの継続移行または 1 回限りのリフトアンドシフト移行をセットアップすることもできます。
自己管理型クラスター間でスキーマを移行するデモについては、「Replicator スキーマ変換のサンプル」を参照してください。
Confluent Cloud へのスキーマの移行¶
Confluent Cloud は、Confluent Platform をベースとした完全マネージド型のストリーミングデータサービスです。Kafka アプリケーションは自己管理型の Kafka から Confluent Cloud に "リフトアンドシフト" したり "クラウドに拡張" したりできますが、同じことは Confluent Schema Registry でも行えます。
既に スキーマレジストリ を使用して Kafka アプリケーションのスキーマを管理している場合に、そのデータの一部または全部をスキーマ管理とともにクラウドに移す必要があれば、Confluent Replicator を使用して、既存のスキーマを Confluent Cloud Schema Registry に移行できます。(「Confluent Replicator」および「Replicator を実行可能ファイルとして実行する」を参照してください)
スキーマレジストリ の継続移行をセットアップしてハイブリッドデプロイ(クラウドへの拡張)を維持する方法と、1 回限りの移行を使用してすべて Confluent Cloud にリフトアンドシフトする方法とがあります。
継続移行¶
継続移行では、自己管理型の スキーマレジストリ をプライマリとして、また Confluent Cloud Schema Registry をセカンダリとして使用することができます(クラウドへの拡張)。これは "クラウドへのブリッジ" とも呼ばれます。新しいスキーマは自己管理型の スキーマレジストリ (オリジン)に直接登録され、そこから、IMPORT モードに設定された Confluent Cloud Schema Registry (デスティネーション)に、Confluent Replicator が継続的にスキーマをコピーします。

1 回限りの移行¶
すべてのデータを完全マネージド型の Confluent Cloud サービスに移す(リフトアンドシフトする)には、1 回限りの移行を選択します。この場合、Confluent Cloud Schema Registry をプライマリとして自己管理型の既存の スキーマレジストリ を移行します。新しいスキーマはすべて Confluent Cloud Schema Registry に登録され、一元管理された Kafka クラスターに格納されます。このシナリオでは、Confluent Cloud Schema Registry から自己管理型の スキーマレジストリ への再移行は行われません。

トピックとスキーマ¶
スキーマは Kafka のトピックに関連付けられ、サブジェクトの下で整理されて スキーマレジストリ に格納されます(「用語の復習」を参照)。
以下のクイックスタートでは、スキーマレジストリ とそこに格納されているスキーマを移行する方法について説明しています。ただし、Kafka のトピックを移行する方法は取り上げていません。
継続移行(クラウドへの拡張)の場合、トピックはプライマリ(自己管理型クラスター)に存在し続けるため、必要なのはスキーマの移行だけです。
1 回限りの移行(リフトアンドシフト)の場合、スキーマの移行後、トピックの移行を行う必要があります。クイックスタートの後の「次のステップ」で言及されているように、Replicator を使用して、Confluent Cloud クラスターにトピックを移行します。
ちなみに
次のように、スキーマのデフォルトのサブジェクトトランスレーターを使用するように Confluent Replicator を構成したとします。
schema.subject.translator.class=io.confluent.connect.replicator.schemas.DefaultSubjectTranslator
この場合、スキーマの移行中にターゲット スキーマレジストリ 内のサブジェクトの名前を変更するには、topic.rename.format
を使用します。
たとえば、topic.rename.format
が ${topic}-replica
である場合、移行元の スキーマレジストリ にある mytopic-value
というトピックが、移行先の "新しい" レジストリでは、mytopic-replica-value
という名前に変更されます。
名前が変更されるのは、サブジェクト命名方法として TopicNameStrategy
(-key
または -value
が付いたサブジェクト名)を使用するスキーマだけです。その他のサブジェクト(異なる命名方法を使用するサブジェクト)はすべて移行されますが、名前は変更されません。
topic.rename.format
プロパティについては、「Confluent Replicator 構成プロパティ」の「送信先トピック」を参照してください。
クイックスタート¶
このクイックスタートでは、各種のデプロイ(オンプレミスサーバーまたはデータセンターから Confluent Cloud Schema Registry)に適用できる スキーマレジストリ の移行方法について説明します。また、いくつかの例を通じて、ローカルクラスターから Confluent Cloud にスキーマを移行するにあたっての基本的な知識を身に付けることができます。
- 前提条件
- Confluent Platform
- Confluent CLI (個別のインストールが必要)
始める前に¶
Confluent Platform を初めて利用する方は、まず、次のクイックスタートとチュートリアルに取り組み、プラットフォーム(プロデューサー、コンシューマー、ブローカーの役割など)や Confluent Cloud、スキーマレジストリ の基本的な知識を身に付けることをお勧めします。それらのワークフローを体験することで、スキーマの移行にまつわる背景が理解しやすくなります。
- Confluent Platform を使用した Apache Kafka のクイックスタート(ローカル)
- Confluent Cloud を使用した Apache Kafka のクイックスタート
- スキーマレジストリ のチュートリアル
スキーマの移行を開始する前に、次の要件を満たしていることを確認してください。
- デスティネーション スキーマレジストリ となる Confluent Cloud へのアクセス
- Confluent Platform のローカルインストール(たとえば、「Confluent Platform を使用した Apache Kafka のクイックスタート(ローカル)」のダウンロードまたはオリジン スキーマレジストリ となる他のクラスターから入手したもの)。
スキーマを移行するためには、Replicator を構成して実行する必要があります。ここで取り上げる例には含まれない詳しい情報については、Replicator のチュートリアル を参照してください。
スキーマの移行¶
スキーマレジストリ および関連するスキーマを Confluent Cloud に移行するには、次の手順に従います。
オリジンクラスターを起動します。
ローカルクラスター(「Confluent Platform を使用した Apache Kafka のクイックスタート(ローカル)」でダウンロードしたものなど)を実行している場合は、このチュートリアルの目的上、Confluent CLI confluent local コマンドを使用して スキーマレジストリ のみを起動します。
<path-to-confluent>/bin/confluent local services schema-registry start
ちなみに
ここで取り上げているのは、"スタンドアロンモード" の Replicator ワーカーを使用してスキーマを移行する例です。このモードでは、Kafka Connect と Replicator を同時に実行することはできません。Replicator によって Connect も実行されるためです。Replicator を "分散モード" で実行する場合は、セットアップが異なり、またこの制限もありません(
./bin/confluent local services start
を使用できます)。Connect ワーカー(Replicator を含む)をスタンドアロンモードおよび分散モードで構成して実行する方法の詳細については、Connect ガイドの「ワーカーの実行」を参照してください。schema-registry
、kafka
、zookeeper
が実行されていることを確認します。たとえば、
<path-to-confluent>/bin/confluent local services status
を実行します。Schema Registry is [UP] Kafka is [UP] Zookeeper is [UP]
Confluent Cloud のデスティネーション スキーマレジストリ にサブジェクトが存在しないことを確認します。
curl -u <schema-registry-api-key>:<schema-registry-api-secret> <schema-registry-url>/subjects
サブジェクトが存在しない場合、出力は空(
[]
)になります(想定される状態です)。サブジェクトが存在する場合は削除してください。その例を次に示します。
curl -X DELETE -u <schema-registry-api-key>:<schema-registry-api-secret> <schema-registry-url>/subjects/my-existing-subject
デスティネーション スキーマレジストリ を IMPORT モードに設定します。その例を次に示します。
curl -u <schema-registry-api-key>:<schema-registry-api-secret> -X PUT -H "Content-Type: application/json" "https://<destination-schema-registry>/mode" --data '{"mode": "IMPORT"}'
ちなみに
デスティネーション スキーマレジストリ にサブジェクトが存在すると、
{"error_code":42205,"message":"Cannot import since found existing subjects"}
のようなメッセージが表示されてインポートに失敗します。Replicator ワーカーの構成で、デスティネーションクラスターにおけるブローカーのアドレスを指定します。「Replicator の構成および実行」を参照してください。
ちなみに
Confluent Platform 5.2.0 以降の Replicator では、スキーマレジストリ の移行がサポートされます。
ワーカーの構成ファイルは
<path-to-confluent>/etc/kafka/connect-standalone.properties
にあります。# Connect Standalone Worker configuration bootstrap.servers=<path-to-cloud-server>:9092
スキーマレジストリ とデスティネーションクラスターの情報を使用して Replicator を構成します。
スタンドアロンの Connect インスタンスの場合、
<path-to-confluent>etc/kafka-connect-replicator/quickstart-replicator.properties
で次のプロパティを構成します。# basic connector configuration name=replicator-source connector.class=io.confluent.connect.replicator.ReplicatorSourceConnector key.converter=io.confluent.connect.replicator.util.ByteArrayConverter value.converter=io.confluent.connect.replicator.util.ByteArrayConverter header.converter=io.confluent.connect.replicator.util.ByteArrayConverter tasks.max=4 # source cluster connection info src.kafka.bootstrap.servers=localhost:9092 # destination cluster connection info dest.kafka.ssl.endpoint.identification.algorithm=https dest.kafka.sasl.mechanism=PLAIN dest.kafka.request.timeout.ms=20000 dest.kafka.bootstrap.servers=<path-to-cloud-server>:9092 retry.backoff.ms=500 dest.kafka.sasl.jaas.config=org.apache.kafka.common.security.plain.PlainLoginModule required username="<encrypted-username>" password="<encrypted-password>"; dest.kafka.security.protocol=SASL_SSL # Schema Registry migration topics to replicate from source to destination # topic.whitelist indicates which topics are of interest to replicator topic.whitelist=_schemas # schema.registry.topic indicates which of the topics in the ``whitelist`` contains schemas schema.registry.topic=_schemas # Connection settings for destination Confluent Cloud Schema Registry schema.registry.url=https://<path-to-cloud-schema-registry> schema.registry.client.basic.auth.credentials.source=USER_INFO schema.registry.client.basic.auth.user.info=<schema-registry-api-key>:<schema-registry-api-secret>
クラスターで SSL が有効になっている場合は、スキーマレジストリ クライアントの SSL 構成を適切に設定する必要があります。
# SSL configurations for clients to Schema Registry schema.registry.client.schema.registry.ssl.truststore.location schema.registry.client.schema.registry.ssl.truststore.type schema.registry.client.schema.registry.ssl.truststore.password schema.registry.client.schema.registry.ssl.keystore.location schema.registry.client.schema.registry.ssl.keystore.type schema.registry.client.schema.registry.ssl.keystore.password schema.registry.client.schema.registry.ssl.key.password
ちなみに
スキーマレジストリ クライアントの構成には
schema.registry.client
プレフィックスが必要です。詳細については、「スキーマレジストリ のセキュリティの概要」の「スキーマレジストリ のクライアント」を参照してください。
quickstart-replicator.properties
では、ブローカーを 1 つ含んだ開発クラスターでのデモを想定して、レプリケーション係数が1
に設定されています。このスキーマ移行チュートリアルおよび本稼働環境では、これを3
以上に変更してください。confluent.topic.replication.factor=3
参考
分散モードの Replicator に使用される JSON 構成の例については、GitHub の examples リポジトリにある submit_replicator_schema_migration_config.sh を参照してください。
スキーマの移行を実行できるように Replicator を起動します。
その例を次に示します。
<path-to-confluent>/bin/connect-standalone <path-to-confluent>/etc/kafka/connect-standalone.properties \ <path-to-confluent>/etc/kafka-connect-replicator/quickstart-replicator.properties
Replicator の起動に使用するメソッドまたはコマンドは、アプリケーションのセットアップに依存するため、この例とは異なる場合があります。詳細については、「Replicator の構成および実行」および「Replicator の構成および実行」を参照してください。
Kafka に対してメッセージを生成しているプロデューサーをすべて停止します。
レプリケーションラグが 0 になるまで待ちます。
詳細については、「Replicator ラグのモニタリング」を参照してください。
Replicator を停止します。
移行元である自己管理型 スキーマレジストリ のプロパティファイルの構成に次の行を追加してモードの変更を有効にし、再起動します。
mode.mutability=true
重要
モードがサポートされるのは、スキーマレジストリ のバージョン 5.2 以降のみです。この手順と次の手順(スキーマレジストリ を READYONLY に設定する手順)は念のための措置であり、必須ではありません。バージョン "5.1" 以前の スキーマレジストリ をご使用の方は、この 2 つの手順は省略してかまいません。ただし、以後ソース スキーマレジストリ にスキーマが登録されないよう、すべてのプロデューサーを確実に停止してください。
ソース スキーマレジストリ を READONLY モードに設定します。
curl -u <schema-registry-api-key>:<schema-registry-api-secret> -X PUT -H "Content-Type: application/json" "https://<source-schema-registry>/mode" --data '{"mode": "READONLY"}'
デスティネーション スキーマレジストリ を READWRITE モードに設定します。
curl -u <schema-registry-api-key>:<schema-registry-api-secret> -X PUT -H "Content-Type: application/json" "https://<destination-schema-registry>/mode" --data '{"mode": "READWRITE"}'
コンシューマーをすべて停止します。
クラウド内のデスティネーション スキーマレジストリ を参照するようにすべてのコンシューマーを構成し、それらを再起動します。
たとえば、Java クライアントで スキーマレジストリ を構成する場合、スキーマレジストリ の URL、認証 USER_INFO のタイプ、資格情報を指定するプロパティファイルで(またはコードから)、スキーマレジストリ の URL をソースからデスティネーションに変更します。
その他の例については、「Java コンシューマー」を参照してください。
クラウド内の スキーマレジストリ を参照するようにすべてのプロデューサーを構成し、それらを再起動します。
その他の例については、「Java プロデューサー」を参照してください。
(省略可)ソース スキーマレジストリ を停止します。
次のステップ¶
- プライマリ自己管理型クラスターからクラウドへの継続移行でクラウド(ハイブリッド)に拡張する場合、移行は完了です。
- これが Confluent Cloud への 1 回限りの移行である場合、次のステップとして、Replicator を使用してクラウドクラスターにトピックを移行します。
- Confluent Cloud のスキーマとストレージ領域を REST API で管理する方法については、開発者ガイドの「Confluent Cloud の スキーマレジストリ」を参照してください。
- Confluent Cloud における スキーマレジストリ の構成方法と使用方法に関するガイドをお探しの方は、「Confluent Cloud におけるスキーマ管理のクイックスタート」と「Confluent Cloud を使用した Apache Kafka のクイックスタート」を参照してください。
おすすめの関連情報¶
- Confluent Cloud
- Replicator スキーマ変換のサンプル
- Confluent Cloud におけるスキーマ管理のクイックスタート
- スキーマレジストリ の構成オプション
- 概要
- スキーマレジストリ API リファレンス
また、「Confluent Replicator 構成プロパティ」で次のセクションを参照してください。
最後に、自己管理型のオンプレミスクラスター間でスキーマを移行するデモを「Replicator スキーマ変換のサンプル」でご覧ください。