スキーマの移行¶
Looking for Schema Management Confluent Cloud docs? You are currently viewing Confluent Platform documentation. If you are looking for Confluent Cloud docs, check out Schema Management on Confluent Cloud.
Confluent Platform 5.2.0 以降では、Replicator を使用して、Confluent Schema Registry に格納されているスキーマを別のクラスターに移行することができます。移行先は、セルフマネージド型クラスターでも Confluent Cloud 内のクラスターでもかまいません。
このページでは主に、自己管理型クラスターから Confluent Cloud へのスキーマの移行について取り上げていますが、冒頭の短いセクションには、自己管理型クラスター間でスキーマを移行する詳細なデモへのリンクを掲載しています。最後の「おすすめの関連情報」には、Schema Registry の構成オプションへのリンクを記載しており、それらの情報はどちらのシナリオでも参考にしていただけます。
ちなみに
現在、Confluent Replicator を使用してスキーマを移行する場合、送信元クラスターとして Confluent Cloud はサポートされません。Confluent Cloud は、送信先クラスターとしてのみ使用できます。別の方法として、Schema Registry の REST API を使用してスキーマを移行し、必要なデプロイを行うこともできます。スキーマに関する Confluent Cloud の制限やストレージ領域の管理について詳しくは、「Confluent Cloud の Schema Registry」の API リファレンスを参照してください。
注釈
新しいバージョンの Replicator を使用して、古いバージョンの Kafka クラスターのデータを Confluent Cloud にレプリケートすることはできません。具体的には、Replicator バージョン 5.4.0 以降を使用して、Apache Kafka® v0.10.2 以前のクラスターや Confluent Platform v3.2.0 以前のクラスターのデータを Confluent Cloud にレプリケートすることはできません。これらの古いバージョンのクラスターを使用している場合は、アップグレードできるようになるまで、Replicator 5.0.x を使用して Confluent Cloud にレプリケートしてください。次の点に留意したうえで、適切にアップグレードを計画してください。
- Confluent Platform 3.2 以降に含まれている Kafka Connect ワーカーは、Confluent Platform 3.0 以降に含まれている Kafka ブローカーに対応しています(「クロスコンポーネントの互換性」を参照)。
- Confluent Platform 5.0.x のサポート終了日は 2020 年 7 月 31 日です(サポートされているバージョンおよび相互運用性」を参照)。
自己管理型クラスター間でのスキーマの移行¶
自己管理型(オンプレミス)クラスター間でスキーマの継続移行または 1 回限りのリフトアンドシフト移行をセットアップすることもできます。
自己管理型クラスター間でスキーマを移行するデモについては、「Replicator スキーマ変換のサンプル」を参照してください。
Confluent Cloud へのスキーマの移行¶
Confluent Cloud は、Confluent Platform をベースとしたフルマネージド型のストリーミングデータサービスです。Kafka アプリケーションは自己管理型の Kafka から Confluent Cloud に "リフトアンドシフト" したり "クラウドに拡張" したりできますが、同じことは Confluent Schema Registry でも行えます。
既に Schema Registry を使用して Kafka アプリケーションのスキーマを管理している場合に、そのデータの一部または全部をスキーマ管理とともにクラウドに移す必要があれば、Replicator を使用して、既存のスキーマを Confluent Cloud Schema Registry に移行できます。(「Replicator」および「Replicator を実行可能ファイルとして実行する」を参照してください)
Schema Registry の継続移行をセットアップしてハイブリッドデプロイ(クラウドへの拡張)を維持する方法と、1 回限りの移行を使用してすべて Confluent Cloud にリフトアンドシフトする方法とがあります。
継続移行¶
継続移行では、自己管理型の Schema Registry をプライマリとして、また Confluent Cloud Schema Registry をセカンダリとして使用することができます(クラウドへの拡張)。これは "クラウドへのブリッジ" とも呼ばれます。新しいスキーマは自己管理型の Schema Registry (オリジン)に直接登録され、そこから、IMPORT モードに設定された Confluent Cloud Schema Registry (デスティネーション)に、Confluent Replicator が継続的にスキーマをコピーします。

1 回限りの移行¶
すべてのデータをフルマネージド型の Confluent Cloud サービスに移す(リフトアンドシフトする)には、1 回限りの移行を選択します。この場合、Confluent Cloud Schema Registry をプライマリとして自己管理型の既存の Schema Registry を移行します。新しいスキーマはすべて Confluent Cloud Schema Registry に登録され、一元管理された Kafka クラスターに格納されます。このシナリオでは、Confluent Cloud Schema Registry から自己管理型の Schema Registry への再移行は行われません。

トピックとスキーマ¶
スキーマは Kafka のトピックに関連付けられ、サブジェクトの下で整理されて Schema Registry に格納されます(「用語の復習」を参照)。
以下のクイックスタートでは、Schema Registry とそこに格納されているスキーマを移行する方法について説明しています。ただし、Kafka のトピックを移行する方法は取り上げていません。
継続移行(クラウドへの拡張)の場合、トピックはプライマリ(自己管理型クラスター)に存在し続けるため、必要なのはスキーマの移行だけです。
1 回限りの移行(リフトアンドシフト)の場合、スキーマの移行後、トピックの移行を行う必要があります。クイックスタートの後の「次のステップ」で言及されているように、Replicator を使用して、Confluent Cloud クラスターにトピックを移行します。
ちなみに
次のように、スキーマのデフォルトのサブジェクトトランスレーターを使用するように Confluent Replicator を構成したとします。
schema.subject.translator.class=io.confluent.connect.replicator.schemas.DefaultSubjectTranslator
この場合、スキーマの移行中にターゲット Schema Registry 内のサブジェクトの名前を変更するには、topic.rename.format
を使用します。
たとえば、topic.rename.format
が ${topic}-replica
である場合、移行元の Schema Registry にある mytopic-value
というサブジェクトが、移行先の "新しい" レジストリでは、mytopic-replica-value
という名前に変更されます。
名前が変更されるのは、サブジェクト命名方法として TopicNameStrategy
(-key
または -value
が付いたサブジェクト名)を使用するスキーマだけです。その他のサブジェクト(異なる命名方法を使用するサブジェクト)はすべて移行されますが、名前は変更されません。
topic.rename.format
プロパティについては、「Confluent Replicator 構成プロパティ」の「送信先トピック」を参照してください。
クイックスタート¶
このクイックスタートでは、各種のデプロイ(オンプレミスサーバーまたはデータセンターから Confluent Cloud Schema Registry)に適用できる Schema Registry の移行方法について説明します。また、いくつかの例を通じて、ローカルクラスターから Confluent Cloud にスキーマを移行するにあたっての基本的な知識を身に付けることができます。
- 前提条件
- Confluent Platform
- Confluent CLI (個別のインストールが必要)
始める前に¶
Confluent Platform を初めて利用する方は、まず、次のクイックスタートとチュートリアルに取り組み、プラットフォーム(プロデューサー、コンシューマー、ブローカーの役割など)や Confluent Cloud、Schema Registry の基本的な知識を身に付けることをお勧めします。それらのワークフローを体験することで、スキーマの移行にまつわる背景が理解しやすくなります。
- Confluent Platform のクイックスタート(ローカルインストール)
- Confluent Cloud を使用した Apache Kafka のクイックスタート
- Schema Registry のチュートリアル
スキーマの移行を開始する前に、次の要件を満たしていることを確認してください。
- デスティネーション Schema Registry となる Confluent Cloud へのアクセス
- Confluent Platform のローカルインストール(たとえば、「Confluent Platform のクイックスタート(ローカルインストール)」のダウンロードまたはオリジン Schema Registry となる他のクラスターから入手したもの)。
スキーマを移行するためには、Replicator を構成して実行する必要があります。ここで取り上げる例には含まれない詳しい情報については、Replicator のチュートリアル を参照してください。
スキーマの移行¶
Schema Registry および関連するスキーマを Confluent Cloud に移行するには、次の手順に従います。
オリジンクラスターを起動します。
ローカルクラスター(「Confluent Platform のクイックスタート(ローカルインストール)」でダウンロードしたものなど)を実行している場合は、このチュートリアルの目的上、Confluent CLI confluent local コマンドを使用して Schema Registry のみを起動します。
<path-to-confluent>/bin/confluent local services schema-registry start
ちなみに
The examples here show how to use a Replicator worker in standalone mode for schema migration. In this mode, you cannot run Kafka Connect and Replicator at the same time, because Replicator also runs Connect. If you run Replicator in distributed mode, the setup is different and you do not have this limitation (you can use
./bin/confluent local services start
). For more about configuring and running Connect workers (includig Replicator) in standalone and distributed modes, see Running Workers in the Connect guide.schema-registry
、kafka
、zookeeper
が実行されていることを確認します。たとえば、
<path-to-confluent>/bin/confluent local services status
を実行します。Schema Registry is [UP] Kafka is [UP] Zookeeper is [UP]
Confluent Cloud のデスティネーション Schema Registry にサブジェクトが存在しないことを確認します。
curl -u <schema-registry-api-key>:<schema-registry-api-secret> <schema-registry-url>/subjects
サブジェクトが存在しない場合、出力は空(
[]
)になります(想定される状態です)。サブジェクトが存在する場合は削除してください。その例を次に示します。
curl -X DELETE -u <schema-registry-api-key>:<schema-registry-api-secret> <schema-registry-url>/subjects/my-existing-subject
デスティネーション Schema Registry を IMPORT モードに設定します。その例を次に示します。
curl -u <schema-registry-api-key>:<schema-registry-api-secret> -X PUT -H "Content-Type: application/json" "https://<destination-schema-registry>/mode" --data '{"mode": "IMPORT"}'
ちなみに
デスティネーション Schema Registry にサブジェクトが存在すると、
{"error_code":42205,"message":"Cannot import since found existing subjects"}
のようなメッセージが表示されてインポートに失敗します。Replicator ワーカーの構成で、送信先クラスターにおけるブローカーのアドレスを指定します。「Replicator の構成および実行」を参照してください。
ちなみに
Confluent Platform 5.2.0 以降の Replicator では、Schema Registry の移行がサポートされます。
ワーカーの構成ファイルは
<path-to-confluent>/etc/kafka/connect-standalone.properties
にあります。# Connect Standalone Worker configuration bootstrap.servers=<path-to-cloud-server>:9092
Schema Registry と送信先クラスターの情報を使用して Replicator を構成します。
スタンドアロンの Connect インスタンスの場合、
<path-to-confluent>etc/kafka-connect-replicator/quickstart-replicator.properties
で次のプロパティを構成します。# basic connector configuration name=replicator-source connector.class=io.confluent.connect.replicator.ReplicatorSourceConnector key.converter=io.confluent.connect.replicator.util.ByteArrayConverter value.converter=io.confluent.connect.replicator.util.ByteArrayConverter header.converter=io.confluent.connect.replicator.util.ByteArrayConverter tasks.max=4 # source cluster connection info src.kafka.bootstrap.servers=localhost:9092 # destination cluster connection info dest.kafka.ssl.endpoint.identification.algorithm=https dest.kafka.sasl.mechanism=PLAIN dest.kafka.request.timeout.ms=20000 dest.kafka.bootstrap.servers=<path-to-cloud-server>:9092 retry.backoff.ms=500 dest.kafka.sasl.jaas.config=org.apache.kafka.common.security.plain.PlainLoginModule required username="<encrypted-username>" password="<encrypted-password>"; dest.kafka.security.protocol=SASL_SSL # Schema Registry migration topics to replicate from source to destination # topic.whitelist indicates which topics are of interest to replicator topic.whitelist=_schemas # schema.registry.topic indicates which of the topics in the ``whitelist`` contains schemas schema.registry.topic=_schemas # Connection settings for destination Confluent Cloud Schema Registry schema.registry.url=https://<path-to-cloud-schema-registry> schema.registry.client.basic.auth.credentials.source=USER_INFO schema.registry.client.basic.auth.user.info=<schema-registry-api-key>:<schema-registry-api-secret>
クラスターで SSL が有効になっている場合は、Schema Registry クライアントの SSL 構成を適切に設定する必要があります。
# SSL configurations for clients to Schema Registry schema.registry.client.schema.registry.ssl.truststore.location schema.registry.client.schema.registry.ssl.truststore.type schema.registry.client.schema.registry.ssl.truststore.password schema.registry.client.schema.registry.ssl.keystore.location schema.registry.client.schema.registry.ssl.keystore.type schema.registry.client.schema.registry.ssl.keystore.password schema.registry.client.schema.registry.ssl.key.password
ちなみに
Schema Registry クライアントの構成には
schema.registry.client
プレフィックスが必要です。詳細については、「Schema Registry のセキュリティの概要」の「Schema Registry のクライアント」を参照してください。
quickstart-replicator.properties
では、ブローカーを 1 つ含んだ開発クラスターでのデモを想定して、レプリケーション係数が1
に設定されています。このスキーマ移行チュートリアルおよび本稼働環境では、これを3
以上に変更してください。confluent.topic.replication.factor=3
参考
分散モードの Replicator に使用される JSON 構成の例については、GitHub の examples リポジトリ にある submit_replicator_schema_migration_config.sh を参照してください。
スキーマの移行を実行できるように Replicator を起動します。
その例を次に示します。
<path-to-confluent>/bin/connect-standalone <path-to-confluent>/etc/kafka/connect-standalone.properties \ <path-to-confluent>/etc/kafka-connect-replicator/quickstart-replicator.properties
Replicator の起動に使用するメソッドまたはコマンドは、アプリケーションのセットアップに依存するため、この例とは異なる場合があります。詳細については、「Replicator の構成および実行」および「Replicator の構成および実行」を参照してください。
Kafka に対してメッセージを生成しているプロデューサーをすべて停止します。
レプリケーションラグが 0 になるまで待ちます。
詳細については、「Replicator ラグのモニタリング」を参照してください。
Replicator を停止します。
移行元である自己管理型 Schema Registry のプロパティファイルの構成に次の行を追加してモードの変更を有効にし、再起動します。
mode.mutability=true
重要
モードがサポートされるのは、Schema Registry のバージョン 5.2 以降のみです。この手順と次の手順(Schema Registry を READYONLY に設定する手順)は念のための措置であり、必須ではありません。バージョン "5.1" 以前の Schema Registry をご使用の方は、この 2 つの手順は省略してかまいません。ただし、以後ソース Schema Registry にスキーマが登録されないよう、すべてのプロデューサーを確実に停止してください。
ソース Schema Registry を READONLY モードに設定します。
curl -u <schema-registry-api-key>:<schema-registry-api-secret> -X PUT -H "Content-Type: application/json" "https://<source-schema-registry>/mode" --data '{"mode": "READONLY"}'
デスティネーション Schema Registry を READWRITE モードに設定します。
curl -u <schema-registry-api-key>:<schema-registry-api-secret> -X PUT -H "Content-Type: application/json" "https://<destination-schema-registry>/mode" --data '{"mode": "READWRITE"}'
コンシューマーをすべて停止します。
クラウド内のデスティネーション Schema Registry を参照するようにすべてのコンシューマーを構成し、それらを再起動します。
たとえば、Java クライアントで Schema Registry を構成する場合、Schema Registry の URL、認証 USER_INFO のタイプ、資格情報を指定するプロパティファイルで(またはコードから)、Schema Registry の URL をソースからデスティネーションに変更します。
その他の例については、「Java コンシューマー」を参照してください。
クラウド内の Schema Registry を参照するようにすべてのプロデューサーを構成し、それらを再起動します。
その他の例については、「Java プロデューサー」を参照してください。
(省略可)ソース Schema Registry を停止します。
次のステップ¶
- プライマリ自己管理型クラスターからクラウドへの継続移行でクラウド(ハイブリッド)に拡張する場合、移行は完了です。
- これが Confluent Cloud への 1 回限りの移行である場合、次のステップとして、Replicator を使用してクラウドクラスターにトピックを移行します。
- Confluent Cloud のスキーマとストレージ領域を REST API で管理する方法については、開発者ガイドの「Confluent Cloud の Schema Registry」を参照してください。
- Confluent Cloud における Schema Registry の構成方法と使用方法に関するガイドをお探しの方は、「Confluent Cloud におけるスキーマ管理のクイックスタート」と「Confluent Cloud を使用した Apache Kafka のクイックスタート」を参照してください。
おすすめの関連情報¶
- Confluent Cloud
- Replicator スキーマ変換のサンプル
- Confluent Cloud におけるスキーマ管理のクイックスタート
- Schema Registry の構成オプション
- 概要
- Schema Registry API リファレンス
また、「Confluent Replicator 構成プロパティ」で次のセクションを参照してください。
最後に、自己管理型のオンプレミスクラスター間でスキーマを移行するデモを「Replicator スキーマ変換のサンプル」でご覧ください。