Confluent Cloud スキーマ管理のドキュメントをお探しですか? これらのページでは、Schema Registry の全体的な概念、スキーマフォーマット、ハイブリッドのユースケース、チュートリアル といったポイントに触れていますが、最も重点的に取り上げているのは Confluent Platform です。Confluent Cloud のドキュメントについては、Confluent Cloud でのスキーマの管理」をご覧ください。
Confluent Cloud Schema Registry のチュートリアル¶
概要¶
このチュートリアルでは、Confluent Cloud Schema Registry を使用するためのステップバイステップのワークフローを紹介しています。Avro データの読み取りと書き込み、進化するスキーマの互換性チェックができるようにクライアントアプリケーションを設定する方法について見ていきましょう。
このチュートリアルを完了したら、Confluent Cloud でのスキーマの操作に関する基本情報を確認することをお勧めします。
このチュートリアルは、Confluent Cloud Schema Registry での実行を意図しています。ローカルの Confluent Platform 環境をご利用の方は、オンプレミスデプロイ向けの Confluent Schema Registry チュートリアルを「オンプレミス Schema Registry のチュートリアル」でご覧ください。
セットアップ¶
前提条件¶
このチュートリアルに取り組む前に、「Schema Registry のチュートリアル」で Schema Registry の概念を簡単に押さえておきましょう。
ローカルマシンに次のソフトウェアがインストールされていることを確認します。
- 初期化された Confluent Cloud クラスター
- Confluent Cloud Console の Billing & payment セクションでプロモーションコード
C50INTEG
を入力すると、Confluent Cloud で $50 相当を無料で使用できます(詳細)。このプロモーションコードで、この Confluent Cloud サンプルの 1 日分の実行費用が補填されます。これを超えてサービスを利用すると、このサンプルで作成した Confluent Cloud リソースを破棄するまで、時間単位で課金されることがあります。
- Confluent Cloud Console の Billing & payment セクションでプロモーションコード
- Confluent Cloud CLI v1.7.0 以降のローカルインストール
- Java クライアントを実行するための Java 1.8 または 1.11
- クライアント Java コードをコンパイルするための Maven
- Confluent Cloud Schema Registry の REST エンドポイントに対するクエリから返された結果の形式を合わせるための
jq
ツール
環境のセットアップ¶
このチュートリアルは新しい Confluent Cloud 環境で実行するため、他の作業の妨げになることはありません。Confluent Cloud 向け ccloud-stack ユーティリティ を使用して、新しい Confluent Cloud スタックに新しい環境、新しいサービスアカウント、新しい Kafka クラスターおよび関連認証情報をプロビジョニングして、Confluent Cloud Schema Registry および関連認証情報であるワイルドカードを使用するサービスアカウント用 ACL を有効にできます。ccloud-stack の使用手順 に従って Confluent Cloud にログオンし、ccloud-stack を作成してください。
ccloud-stack
ユーティリティを実行すると、Confluent Cloud と Confluent Cloud Schema Registry の接続情報をすべて含んだ構成ファイルも生成されます。自動的に生成されたexamples/ccloud/ccloud-stack/stack-configs/java-service-account-<account>.config
ファイルが次のようになっていることを確認します。# ------------------------------ # ENVIRONMENT ID: <ENVIRONMENT ID> # SERVICE ACCOUNT ID: <SERVICE ACCOUNT ID> # KAFKA CLUSTER ID: <KAFKA CLUSTER ID> # SCHEMA REGISTRY CLUSTER ID: <SCHEMA REGISTRY CLUSTER ID> # ------------------------------ ssl.endpoint.identification.algorithm=https security.protocol=SASL_SSL sasl.mechanism=PLAIN bootstrap.servers=<BROKER ENDPOINT> sasl.jaas.config=org.apache.kafka.common.security.plain.PlainLoginModule required username='<API KEY>' password='<API SECRET>'; basic.auth.credentials.source=USER_INFO schema.registry.basic.auth.user.info=<SR API KEY>:<SR API SECRET> schema.registry.url=https://<SR ENDPOINT>
ccloud-stack
によって生成された構成ファイルを$HOME/.confluent/java.config
にコピーします。変数をシェルにエクスポートし、
<SR API KEY>
、<SR API SECRET>
、<SR ENDPOINT>
の各値を置き換えます。そうすると、残りのチュートリアルのコマンドをコピーして貼り付けるだけで済みます。export SCHEMA_REGISTRY_BASIC_AUTH_USER_INFO=<SR API KEY>:<SR API SECRET> export SCHEMA_REGISTRY_URL=<SR ENDPOINT>
Confluent example リポジトリのクローンを GitHub から作成し、
clients/avro/
サブディレクトリで作業します。ここにはこのチュートリアルでコンパイルし実行するサンプルコードが含まれます。git clone https://github.com/confluentinc/examples.git
cd examples/clients/avro
git checkout 6.2.0-post
transactions トピックの作成¶
このチュートリアルのエクササイズでは、transactions
というトピックとの間でメッセージの生成と消費を行います。Confluent Cloud の UI でこのトピックを作成してください。
Confluent Cloud の UI(https://confluent.cloud)に移動し、ご利用の環境と Kafka クラスターをクリックします。
Topics を選択して Create topic をクリックします。
トピックに
transactions
という名前を付けて、Create with defaults をクリックします。新しいトピックが表示されます。
スキーマの定義¶
開発者がまず行うべきことは、データの基本的なスキーマへの合意です。クライアントアプリケーションは次の "契約" を結ぶことになります。
- プロデューサーがスキーマを使ってデータを書き込みます。
- そのデータをコンシューマーが読み取ることができるようになります。
最初の状態の Payment スキーマ(Payment.avsc)を考えてみます。このスキーマを表示するには、次のコマンドを実行します。
cat src/main/resources/avro/io/confluent/examples/clients/basicavro/Payment.avsc
スキーマの定義に注目してください。
{
"namespace": "io.confluent.examples.clients.basicavro",
"type": "record",
"name": "Payment",
"fields": [
{"name": "id", "type": "string"},
{"name": "amount", "type": "double"}
]
}
このスキーマで定義されている内容は次のとおりです。
namespace
: スキーマ名の競合を防止するための完全修飾名type
: Avro のデータ型 (record
、enum
、union
、array
、map
、fixed
など)name
: この名前空間における一意のスキーマ名fields
:record
の単純データ型または複合データ型。このレコードの先頭にあるのは id というフィールドで、そのデータ型は string です。このレコードの 2 つ目にあるのは amount というフィールドで、そのデータ型は double です。
Avro でデータを書き込むクライアントアプリケーション¶
Maven¶
このチュートリアルでは、Maven を使用してプロジェクトと依存関係を構成します。Avro を使用した Kafka プロデューサーまたはコンシューマーを含んだ Java アプリケーションでは、pom.xml
ファイルに、特に次のものを追加する必要があります。
- Confluent Maven リポジトリ
- Confluent Maven プラグインリポジトリ
- データを Avro としてシリアル化するための依存関係(
org.apache.avro.avro
およびio.confluent.kafka-avro-serializer
) - ソーススキーマから Java クラスファイルを生成するための
avro-maven-plugin
プラグイン
pom.xml
ファイルには、次のものを含めることもできます。
- 進化するスキーマの互換性をチェックするためのプラグイン(
kafka-schema-registry-maven-plugin
)
完全な pom.xml
の例については、こちらの pom.xml を参照してください。
Avro の構成¶
Avro データと Schema Registry を使用する Kafka アプリケーションは、少なくとも次の 2 つの構成パラメーターを指定する必要があります。
- Avro のシリアライザーまたはデシリアライザー
- Schema Registry に接続するためのプロパティ
アプリケーションで使用できる Avro レコードの種類は、基本的に次の 2 つです。
- コードによって生成された特定のクラス
- 汎用的なレコード
このチュートリアルの例では、特定の Payment
クラスの使用方法を紹介しています。コードによって生成された特定のクラスを使用するためには、実際のスキーマに対応した Java クラスを定義してコンパイルする必要がありますが、コード内での扱いやすさの点では、こちらの方が有利です。
ただし、それ以外のシナリオで、随時任意の型のデータを扱う必要があり、なおかつ実際のレコードタイプに使用する Java クラスがない場合は、GenericRecord <streams-data-avro> を使用してください。
Confluent Platform には、"reflection Avro" フォーマットのデータを書き込んだり読み取ったりするためのシリアライザーとデシリアライザーも用意されています。詳細については、「リフレクションベースの Avro シリアライザーとデシリアライザー」を参照してください。
Java プロデューサー¶
このクライアントアプリケーション内の Java プロデューサーは、Kafka の値(または Kafka のキー)に使用する Avro シリアライザーと Schema Registry の URL を構成する必要があります。これで、Kafka の値が Payment
クラスであるレコードを、このプロデューサーで書き込むことができます。
プロデューサーコードの例¶
プロデューサーを作成する際、アプリケーションのコードによって生成された Payment
クラスを使用するようにメッセージの値クラスを構成します。以下に例を示します。
...
import io.confluent.kafka.serializers.KafkaAvroSerializer;
...
props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, KafkaAvroSerializer.class);
...
KafkaProducer<String, Payment> producer = new KafkaProducer<String, Payment>(props));
final Payment payment = new Payment(orderId, 1000.00d);
final ProducerRecord<String, Payment> record = new ProducerRecord<String, Payment>(TOPIC, payment.getId().toString(), payment);
producer.send(record);
...
pom.xml
に avro-maven-plugin
が含まれているため、Payment
クラスはコンパイルの過程で自動的に生成されます。
この例では、Kafka ブローカーと Schema Registry への接続情報を、コードに渡された構成ファイルから取得しています。接続情報をクライアントアプリケーションで直接指定する場合は、こちらの java テンプレート を参照してください。
Java プロデューサーの完全な例については、プロデューサーの例 を参照してください。
プロデューサーの実行¶
シェルで、examples/clients/avro
から次のコマンドを実行します。
このプロデューサーを実行するには、まずプロジェクトをコンパイルします。
mvn clean compile package
Confluent Cloud の UI から、クラスターが選択されていることを確認して、Topics をクリックします。
次に、
transactions
トピックをクリックし、Messages タブに移動します。このトピックにはまだメッセージが生成されていないので、メッセージは表示されません。
Avro フォーマットのメッセージを
transactions
トピックに生成するProducerExample
を実行します。先ほど作成したファイルのパス$HOME/.confluent/java.config
を渡してください。mvn exec:java -Dexec.mainClass=io.confluent.examples.clients.basicavro.ProducerExample \ -Dexec.args="$HOME/.confluent/java.config"
このコマンドの実行には、少し時間がかかります。完了すると、次のように表示されます。
... Successfully produced 10 messages to a topic called transactions [INFO] ------------------------------------------------------------------------ [INFO] BUILD SUCCESS [INFO] ------------------------------------------------------------------------ ...
これで、Confluent Cloud の UI からメッセージを確認できるはずです。
transactions
トピックをチェックしてみましょう。新しく到着したデータが動的に表示されます。Confluent Cloud の UI の左側で、対象のクラスターにクリックして移動し、Topics、
transactions
、Messages の順に移動します。ちなみに
データが表示されない場合は、プロデューサーを再度実行し、正常に完了したことを確認して、もう一度 Confluent Cloud の UI を見てください。メッセージはコンソールに保持されないので、プロデューサーを実行したらすぐに確認する必要があります。
Java コンシューマー¶
このクライアントアプリケーション内の Java コンシューマーは、Kafka の値(または Kafka のキー)に使用する Avro デシリアライザーと Schema Registry の URL を構成する必要があります。これで、Kafka の値が Payment
クラスであるレコードを、このコンシューマーで読み取ることができます。
コンシューマー コードの例¶
デフォルトでは、各レコードが Avro GenericRecord
に逆シリアル化されます。ただし、このチュートリアルでは、アプリケーションのコードによって生成された Payment
クラスを使用してレコードを逆シリアル化する必要があります。そのため、Avro SpecificRecord
を使用するようにデシリアライザーを構成します。つまり、SPECIFIC_AVRO_READER_CONFIG
を true
に設定します。以下に例を示します。
...
import io.confluent.kafka.serializers.KafkaAvroDeserializer;
...
props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, KafkaAvroDeserializer.class);
props.put(KafkaAvroDeserializerConfig.SPECIFIC_AVRO_READER_CONFIG, true);
...
KafkaConsumer<String, Payment> consumer = new KafkaConsumer<>(props));
consumer.subscribe(Collections.singletonList(TOPIC));
while (true) {
ConsumerRecords<String, Payment> records = consumer.poll(100);
for (ConsumerRecord<String, Payment> record : records) {
String key = record.key();
Payment value = record.value();
}
}
...
pom.xml
に avro-maven-plugin
が含まれているため、Payment
クラスはコンパイルの過程で自動的に生成されます。
この例では、Kafka ブローカーと Schema Registry への接続情報を、コードに渡された構成ファイルから取得しています。接続情報をクライアントアプリケーションで直接指定する場合は、こちらの java テンプレート を参照してください。
Java コンシューマーの完全な例については、コンシューマーの例 を参照してください。
コンシューマーの実行¶
このコンシューマーを実行するには、まずプロジェクトをコンパイルします。
mvn clean compile package
BUILD SUCCESS
メッセージは、プロジェクトがビルドされ、コマンドプロンプトが再び利用可能な状態になったことを意味します。次に、
ConsumerExample
を実行します(前述のProducerExample
は既に実行済みであるものとします)。先ほど作成したファイルのパス$HOME/.confluent/java.config
を渡してください。mvn exec:java -Dexec.mainClass=io.confluent.examples.clients.basicavro.ConsumerExample \ -Dexec.args="$HOME/.confluent/java.config"
次のように表示されます。
... key = id0, value = {"id": "id0", "amount": 1000.0} key = id1, value = {"id": "id1", "amount": 1000.0} key = id2, value = {"id": "id2", "amount": 1000.0} key = id3, value = {"id": "id3", "amount": 1000.0} key = id4, value = {"id": "id4", "amount": 1000.0} key = id5, value = {"id": "id5", "amount": 1000.0} key = id6, value = {"id": "id6", "amount": 1000.0} key = id7, value = {"id": "id7", "amount": 1000.0} key = id8, value = {"id": "id8", "amount": 1000.0} key = id9, value = {"id": "id9", "amount": 1000.0} ...
Ctrl + C
を押して停止します。
その他の Kafka クライアント¶
このチュートリアルは、Avro および Schema Registry の一元化されたスキーマ管理と互換性チェックについて説明することを目的としています。例を単純化するために、このチュートリアルでは、Java のプロデューサーとコンシューマーを中心に取り上げていますが、他の Kafka クライアントも同じように動作します。Avro や Schema Registry と連携する他の Kafka クライアントの例については、次の記事を参照してください。
一元化されたスキーマ管理¶
Schema Registry 内のスキーマの表示¶
ここまでで、Avro データをシリアル化するプロデューサーと Avro データを逆シリアル化するコンシューマーが完成しました。プロデューサーは、Confluent Cloud Schema Registry にスキーマを登録し、コンシューマーは、Confluent Cloud Schema Registry からスキーマを取得します。
Confluent Cloud の UI の左側で、クラスターが選択されていることを確認して、Topics をクリックします。
transactions
トピックをクリックして Schema タブに移動し、このトピックの最新のスキーマを Confluent Cloud Schema Registry から取得します。このスキーマは、Java クライアントアプリケーション用に定義されているスキーマファイル と同じです。
curl を使用した Schema Registry との対話¶
curl コマンドを使用して Confluent Cloud Schema Registry の REST エンドポイントに直接接続し、サブジェクトや関連するスキーマを表示することもできます。
Confluent Cloud Schema Registry に登録されているすべてのサブジェクトを表示するには、次のコマンドを使用します。
curl --silent -X GET -u $SCHEMA_REGISTRY_BASIC_AUTH_USER_INFO https://$SCHEMA_REGISTRY_URL/subjects | jq.
上のコマンドを実行すると、次のような結果が出力されます。
[ "transactions-value" ]
この例では、Kafka のトピックである
transactions
に、値(つまり "ペイロード")が Avro であるメッセージが含まれています。また、Confluent Cloud Schema Registry のサブジェクト名は、デフォルトによりtransactions-value
となっています。このサブジェクトの最新のスキーマをさらに詳しく表示するには、次のコマンドを使用します。
curl --silent -X GET -u $SCHEMA_REGISTRY_BASIC_AUTH_USER_INFO https://$SCHEMA_REGISTRY_URL/subjects/transactions-value/versions/latest | jq .
上のコマンドを実行すると、次のような結果が出力されます。
{ "subject": "transactions-value", "version": 1, "id": 100001, "schema": "{\"type\":\"record\",\"name\":\"Payment\",\"namespace\":\"io.confluent.examples.clients.basicavro\",\"fields\":[{\"name\":\"id\",\"type\":\"string\"},{\"name\":\"amount\",\"type\":\"double\"}]}" }
このバージョンのスキーマで定義されている内容は次のとおりです。
subject
: トピックtransactions
内のメッセージに使用されるスキーマが進化できる範囲version
: このサブジェクトのスキーマのバージョン。サブジェクトごとに 1 から始まります。id
: グローバルに一意のスキーマバージョン ID(すべてのサブジェクトのすべてのスキーマで一意)schema
: スキーマのフォーマットを定義する構造
上に示した
curl
コマンドの出力では、スキーマが、エスケープされた JSON になっていることに注目してください。二重引用符の前にバックスラッシュが記述されています。スキーマ ID に基づいて、関連するスキーマを取得することもできます。Confluent Cloud Schema Registry の REST エンドポイントに対し、次のようにクエリーを実行します。
curl --silent -X GET -u $SCHEMA_REGISTRY_BASIC_AUTH_USER_INFO https://$SCHEMA_REGISTRY_URL/schemas/ids/100001 | jq .
次のような結果が出力されます。
{ "schema": "{\"type\":\"record\",\"name\":\"Payment\",\"namespace\":\"io.confluent.examples.clients.basicavro\",\"fields\":[{\"name\":\"id\",\"type\":\"string\"},{\"name\":\"amount\",\"type\":\"double\"}]}" }
メッセージ内のスキーマ ID¶
Schema Registry と連携するということは、Kafka メッセージを書き込む際に、Avro スキーマ全体を含める必要がないということです。代わりに、Kafka メッセージはスキーマ ID を付けて書き込まれます。スキーマとスキーマ ID との間で同じマッピングを得るためには、メッセージを書き込むプロデューサーと、そのメッセージを読み取るコンシューマーとが、同じ Schema Registry を使用している必要があります。
この例では、プロデューサーが Payments
用の新しいスキーマを Confluent Cloud Schema Registry に送信します。Confluent Cloud Schema Registry は、この Payments
スキーマを transactions-value
サブジェクトに登録し、100001
というスキーマ ID をプロデューサーに返します。スキーマとスキーマ ID との間のこのマッピングは、以降、プロデューサーがメッセージを書き込むときのためにキャッシュされます。したがって、プロデューサーが Confluent Cloud Schema Registry に問い合わせを行うのは、スキーマの初回書き込み時のみです。
コンシューマーは、このデータを読み取る際、100001
という Avro スキーマ ID を参照して、Confluent Cloud Schema Registry にスキーマリクエストを送信します。Confluent Cloud Schema Registry は、スキーマ ID 100001
に関連付けられているスキーマを取得して、それをコンシューマーに返します。スキーマとスキーマ ID との間のこのマッピングは、以降、コンシューマーがメッセージを読み取るときのためにキャッシュされます。したがって、コンシューマーが Confluent Cloud Schema Registry に問い合わせを行うのは、スキーマ ID の初回読み取り時のみです。
スキーマの自動登録¶
クライアントアプリケーションは、デフォルトで新しいスキーマを自動的に登録します。新しいトピックに新しいメッセージを生成する場合、新しいスキーマを自動的に登録しようと試みます。これは開発環境では非常に便利ですが、本稼働環境では、クライアントアプリケーションで新しいスキーマを自動的に登録することはお勧めしません。クライアントアプリケーションの外部でスキーマを登録し、スキーマがいつ Schema Registry に登録され、どのように進化するかを制御することがベストプラクティスです。
アプリケーション内でスキーマの自動登録を無効にするには、次の例のように構成パラメーター auto.register.schemas=false
を設定します。
props.put(AbstractKafkaAvroSerDeConfig.AUTO_REGISTER_SCHEMAS, false);
ちなみに
use.latest.version
を有効にする場合は、auto.register.schemas
を false に設定してスキーマの自動登録を無効にしたうえで、use.latest.version
を true に設定する必要があります(デフォルトとは逆の設定)。use.latest.version
を正しく機能させるには、オプションauto.register.schemas
を false に設定する必要があります。auto.register.schemas
を false に設定すると、イベントタイプの自動登録が無効になるので、サブジェクト内の最新のスキーマがオーバーライドされることはありません。use.latest.version
が true に設定されている場合、シリアライザーは、サブジェクトから最新のスキーマバージョンを検索し、それをシリアル化に使用します。use.latest.version
が false(デフォルト)に設定されている場合、シリアライザーは、サブジェクトからイベントタイプを探しますが、その検出に失敗します。Kafka Connect の Schema Registry の「構成オプション」も参照してください。
構成オプション
auto.register.schemas
は Confluent Platform の機能であり、Apache Kafka® では利用できません。
アプリケーションの外部からスキーマを手動で登録するには、Confluent Cloud の UI を使用します。
まず、test
という新しいトピックを作成します。その方法は、このチュートリアルの中で先ほど transactions
という名前の新しいトピックを作成したときと同じです。次に、Schema タブから、Set a schema をクリックして新しいスキーマを定義します。次の値を指定してください。
namespace
: スキーマ名の競合を防止するための完全修飾名type
: Avro のデータ型 (record
、enum
、union
、array
、map
、fixed
のいずれか)name
: この名前空間における一意のスキーマ名fields
:record
の単純データ型または複合データ型。このレコードの先頭にあるのはid
というフィールドで、そのデータ型はstring
です。このレコードの 2 つ目にあるのはamount
というフィールドで、そのデータ型はdouble
です。
仮に、先ほど使用したものと同じスキーマを定義する場合、スキーマエディターに次のように入力することになります。
{
"type": "record",
"name": "Payment",
"namespace": "io.confluent.examples.clients.basicavro",
"fields": [
{
"name": "id",
"type": "string"
},
{
"name": "amount",
"type": "double"
}
]
}
Schema Registry の REST エンドポイントに直接接続して、test
トピックに使用する新しいサブジェクトのスキーマを定義する場合は、次のコマンドを実行してください。
curl -X POST -H "Content-Type: application/vnd.schemaregistry.v1+json" \
--data '{"schema": "{\"type\":\"record\",\"name\":\"Payment\",\"namespace\":\"io.confluent.examples.clients.basicavro\",\"fields\":[{\"name\":\"id\",\"type\":\"string\"},{\"name\":\"amount\",\"type\":\"double\"}]}"}' \
-u $SCHEMA_REGISTRY_BASIC_AUTH_USER_INFO \
https://$SCHEMA_REGISTRY_URL/subjects/test-value/versions
このサンプル出力では、100001
という ID のスキーマが作成されます。
{"id":100001}
スキーマ進化と互換性¶
スキーマの進化¶
このチュートリアルではこれまで、グローバルに一意のスキーマ ID をクライアントアプリケーションが登録したり取得したりできる一元化されたスキーマ管理こそ、Schema Registry の利点であると捉えてきました。しかし、Schema Registry の最大の価値は、スキーマの進化への対応です。API は進化しても、新旧のバージョンの API に依存するすべてのアプリケーションとの互換性が確保されている必要があります。同様に、スキーマも進化します。そしてやはり、新旧のバージョンのスキーマに依存するすべてのアプリケーションとの互換性が確保されていなければなりません。このスキーマ進化は、時間の経過に伴ってアプリケーションやデータに現れる自然な性質です。
Schema Registry にはスキーマの進化が考慮されており、プロデューサーとコンシューマーとの間の契約違反を防ぐための互換性チェック機構が備わっています。そのため、プロデューサーとコンシューマーが別々にアップデートされ、それぞれのスキーマが個別に進化したとしても、プロデューサーとコンシューマーは新旧のデータを確実に読み取ることができます。プロデューサーとコンシューマーは分離されたアプリケーションであり、異なるチームによって開発されることもあるため、Kafka ではこのことが特に重要となります。
推移的な互換性のチェックは、特定のサブジェクトに複数のバージョンのスキーマがある場合に重要です。互換性が推移的である場合、新しいスキーマの互換性が登録済みの全スキーマに対してチェックされます。互換性が推移的でない場合、新しいスキーマの互換性は最新のスキーマに対してのみチェックされます。
たとえば、あるサブジェクトに X-2、X-1、X の順序で変化する 3 つのスキーマがある場合:
- 推移的 : X-2 <==> X-1、X-1 <==> X、X-2 <==> X のいずれについても互換性が確保されます。
- 非推移的 : X-2 <==> X-1 と X-1 <==> X では互換性が確保されますが、X-2 <==> X については必ずしも互換性が確保されません。
漸進的には互換性を持ち、推移的には互換性を持たない スキーマ変更の例 を参照してください。
Confluent Schema Registry のデフォルトの互換性タイプ BACKWARD
は非推移的です。つまり、BACKWARD_TRANSITIVE
ではありません。このため、新しいスキーマの互換性は最新のスキーマに対してのみチェックされます。
次の互換性タイプがあります。
BACKWARD
:("デフォルト")新しいスキーマを使用するコンシューマーは、一番最後に登録されたスキーマを使用するプロデューサーからの出力データを読み取ることができます。BACKWARD_TRANSITIVE
: 新しいスキーマを使用するコンシューマーは、既に登録されている任意のスキーマを使用するプロデューサーからの出力データを読み取ることができます。FORWARD
: 一番最後に登録されたスキーマを使用するコンシューマーが、新しいスキーマを使用するプロデューサーからの出力データを読み取ることができます。FORWARD_TRANSITIVE:
既に登録されている任意のスキーマを使用するコンシューマーが、新しいスキーマを使用するプロデューサーからの出力データを読み取ることができます。FULL
: 新しいスキーマには、一番最後に登録されたスキーマに対する前方互換性と後方互換性があります。FULL_TRANSITIVE
: 新しいスキーマには、既に登録されている任意のスキーマに対する前方互換性と後方互換性があります。NONE
: スキーマの互換性チェックは無効です。
互換性タイプの詳細については、「スキーマ進化と互換性」を参照してください。
互換性チェックでの不合格¶
Schema Registry はプロデューサーとコンシューマーとの間の契約を維持するために、進化するスキーマの互換性をチェックします。Schema Registry による互換性チェックがなければ、スキーマの変更によってアプリケーションの動作に支障が生じる可能性があります。
Payment スキーマの例で、企業が個々の支払いについて、新しい情報、たとえば売上の発生場所を表す region
フィールドを追跡することになったとしましょう。この新しい region
フィールドを含んだ Payment2a スキーマ を考えてみます。
cat src/main/resources/avro/io/confluent/examples/clients/basicavro/Payment2a.avsc
{
"namespace": "io.confluent.examples.clients.basicavro",
"type": "record",
"name": "Payment",
"fields": [
{"name": "id", "type": "string"},
{"name": "amount", "type": "double"},
{"name": "region", "type": "string"}
]
}
Schema Registry のデフォルトの互換性は BACKWARD です。まずは、この新しいスキーマに後方互換性があるかどうかを考えてみましょう。region フィールドのない従来のスキーマを使ってプロデューサーが書き込んだデータを、コンシューマーは、この新しいスキーマを使用して読み取ることができるでしょうか。答えは No です。従来のスキーマが使用されたデータを、コンシューマーは読み取ることができません。従来のデータには region フィールドがないためです。したがって、このスキーマには後方互換性がありません。
Confluent では Schema Registry Maven プラグイン を提供しています。このプラグインを使用して、互換性チェックを開発過程で行ったり、CI/CD パイプラインに統合したりすることができます。
サンプルの pom.xml にはこのプラグインが含まれ、互換性チェックが有効になっています。
...
<properties>
<schemaRegistryUrl>http://localhost:8081</schemaRegistryUrl>
<schemaRegistryBasicAuthUserInfo></schemaRegistryBasicAuthUserInfo>
</properties>
...
<build>
<plugins>
...
<plugin>
<groupId>io.confluent</groupId>
<artifactId>kafka-schema-registry-maven-plugin</artifactId>
<version>${confluent.version}</version>
<configuration>
<schemaRegistryUrls>
<param>${schemaRegistryUrl}</param>
</schemaRegistryUrls>
<userInfoConfig>${schemaRegistryBasicAuthUserInfo}</userInfoConfig>
<subjects>
<transactions-value>src/main/resources/avro/io/confluent/examples/clients/basicavro/Payment2a.avsc</transactions-value>
</subjects>
</configuration>
<goals>
<goal>test-compatibility</goal>
</goals>
</plugin>
...
</plugins>
</build>
現在は、Schema Registry 内の transactions-value
サブジェクトに対して、新しい Payment2a
スキーマの互換性をチェックするように構成されています。
互換性チェックを実行します。
mvn io.confluent:kafka-schema-registry-maven-plugin:test-compatibility \ "-DschemaRegistryUrl=https://$SCHEMA_REGISTRY_URL" \ "-DschemaRegistryBasicAuthUserInfo=$SCHEMA_REGISTRY_BASIC_AUTH_USER_INFO" \ "-DschemaLocal=src/main/resources/avro/io/confluent/examples/clients/basicavro/Payment2a.avsc"
互換性チェックで不合格になることを確認します。返されるエラーメッセージは次のとおりです。
... [ERROR] Schema examples/clients/avro/src/main/resources/avro/io/confluent/examples/clients/basicavro/Payment2a.avsc is not compatible with subject(transactions-value) ...
新しいスキーマである
Payment2a
を Schema Registry に手動で登録してみましょう。これは、クライアントが Java 以外である場合に、コマンドラインから互換性をチェックする際に便利な方法です。curl -X POST -H "Content-Type: application/vnd.schemaregistry.v1+json" \ --data '{"schema": "{\"type\":\"record\",\"name\":\"Payment\",\"namespace\":\"io.confluent.examples.clients.basicavro\",\"fields\":[{\"name\":\"id\",\"type\":\"string\"},{\"name\":\"amount\",\"type\":\"double\"},{\"name\":\"region\",\"type\":\"string\"}]}"}' \ -u $SCHEMA_REGISTRY_BASIC_AUTH_USER_INFO \ https://$SCHEMA_REGISTRY_URL/subjects/transactions-value/versions
スキーマに互換性がないというエラーメッセージが Confluent Cloud Schema Registry から返され、スキーマが拒否されることを確認します。
{"error_code":409,"message":"Schema being registered is incompatible with an earlier schema"}
互換性チェックでの合格¶
後方互換性 を確保するためには、新しいフィールドが存在しなかった場合のデフォルト値を新しいスキーマで想定しておく必要があります。
region
にデフォルト値を与えたアップデート後の Payment2b スキーマ を考えてみます。このスキーマを表示するには、次のコマンドを実行します。cat src/main/resources/avro/io/confluent/examples/clients/basicavro/Payment2b.avsc
次のように出力されます。
{ "namespace": "io.confluent.examples.clients.basicavro", "type": "record", "name": "Payment", "fields": [ {"name": "id", "type": "string"}, {"name": "amount", "type": "double"}, {"name": "region", "type": "string", "default": ""} ] }
UI から
transactions
トピックをクリックして Schema タブに移動し、transactions
トピックの最新のスキーマを Schema Registry から取得します。Edit Schema をクリックします。
もう一度新しいフィールド
region
を追加しますが、今回は、次のようにデフォルト値を追加します。その後、Save をクリックします。{ "name": "region", "type": "string", "default": "" }
新しいスキーマが受け入れられたことを確認します。
注釈
無効な Avro についてのエラーメッセージが表示された場合は、構文をチェックしてください(引用符やコロン、閉じかっこ、直前のフィールドとのコンマ区切りなど)。
登録されているスキーマバージョンについて考えてみます。
transactions
トピックのtransactions-value
という Schema Registry サブジェクトには次の 2 つのスキーマが存在します。- バージョン 1:
Payment.avsc
- バージョン 2:
Payment2b.avsc
。region
フィールドがデフォルト値(空の値)とともに追加されています。
- バージョン 1:
引き続き UI の
transactions
トピックの Schema タブで、Version history をクリックし、Turn on version diff を選択して 2 つのバージョンを比較します。コマンドラインで Schema Registry Maven プラグイン に戻り、
Payment2a.avsc
ではなくPayment2b.avsc
を参照するように pom.xml を更新します。互換性チェックを再実行し、合格になることを確認します。
mvn io.confluent:kafka-schema-registry-maven-plugin:test-compatibility
次のメッセージが返されたことを確認します。これはスキーマが互換性チェックに合格したことを示します。
... [INFO] Schema examples/clients/avro/src/main/resources/avro/io/confluent/examples/clients/basicavro/Payment2b.avsc is compatible with subject(transactions-value) ...
Schema Registry の REST エンドポイントに直接接続して、新しいスキーマ
Payment2b
を登録する場合は、次のコマンドを実行してください。問題なく実行されるはずです。curl -X POST -H "Content-Type: application/vnd.schemaregistry.v1+json" \ --data '{"schema": "{\"type\":\"record\",\"name\":\"Payment\",\"namespace\":\"io.confluent.examples.clients.basicavro\",\"fields\":[{\"name\":\"id\",\"type\":\"string\"},{\"name\":\"amount\",\"type\":\"double\"},{\"name\":\"region\",\"type\":\"string\",\"default\":\"\"}]}"}' \ -u $SCHEMA_REGISTRY_BASIC_AUTH_USER_INFO \ https://$SCHEMA_REGISTRY_URL//subjects/transactions-value/versions
上の
curl
コマンドは、成功した場合、新しいスキーマのバージョンid
を返します。{"id":100002}
Confluent Cloud Schema Registry にある
transactions-value
の最新のサブジェクトを表示します。curl --silent -X GET -u $SCHEMA_REGISTRY_BASIC_AUTH_USER_INFO https://$SCHEMA_REGISTRY_URL/subjects/transactions-value/versions/latest | jq .
このコマンドからは、
transactions-value
トピックの最新の Confluent Cloud Schema Registry サブジェクトが返されます。JSON 形式で表記されたスキーマのバージョン番号、ID、説明などが含まれます。{ "subject": "transactions-value", "version": 100002, "id": 100002, "schema": "{\"type\":\"record\",\"name\":\"Payment\",\"namespace\":\"io.confluent.examples.clients.basicavro\",\"fields\":[{\"name\":\"id\",\"type\":\"string\"},{\"name\":\"amount\",\"type\":\"double\"},{\"name\":\"region\",\"type\":\"string\",\"default\":\"\"}]}" }
次の変更点に注目してください。
version
:100001
から100002
に変わっています。id
:100001
から100002
に変わっています。schema
: デフォルト値を含む新しいregion
フィールドでアップデートされています。
互換性タイプの変更¶
デフォルトの互換性タイプは backward ですが、グローバルに、またはサブジェクト単位で変更することができます。
UI からサブジェクトごとに互換性タイプを変更するには、transactions
トピックをクリックして Schema タブに移動し、transactions
トピックの最新のスキーマを Schema Registry から取得します。Edit Schema をクリックし、Compatibility Mode をクリックします。

このトピックの互換性はデフォルトの backward に設定されていますが、必要に応じて変更することができます。
Confluent Cloud Schema Registry の REST エンドポイントに直接接続して、transactions
トピックである transactions-value
サブジェクトの互換性タイプを変更する場合は、次のサンプルコマンドを実行してください。
curl -X PUT -H "Content-Type: application/vnd.schemaregistry.v1+json" \
--data '{"compatibility": "BACKWARD_TRANSITIVE"}' \
-u $SCHEMA_REGISTRY_BASIC_AUTH_USER_INFO https://$SCHEMA_REGISTRY_URL/config/transactions-value
ccloud-stack の破棄¶
チュートリアルが終了したら、Confluent Cloud で作成したリソースを破棄します。
チュートリアルの最初に作成した
cloud-stack
を破棄するには、bash スクリプト ccloud_stack_destroy.sh を呼び出して、ccloud-stack
の作成時に自動生成されたプロパティファイルを渡します。# Change directory if needed cd <path to examples>/ccloud/ccloud-stack/ ./ccloud_stack_destroy.sh stack-configs/java-service-account-<account>.config
Confluent Cloud 内のリソースが破棄されたことを必ず確認してください。
次のステップ¶
Confluent Cloud における Schema Registry の基礎¶
スキーマの操作の詳細¶
- ブログの投稿記事: Why Avro For Kafka Data
- ブログの投稿記事: Yes, Virginia, You Really Do Need a Schema Registry
- ブログの投稿記事: 17 Ways to Mess Up Self-Managed Schema Registry
- Apache Avro® 公式サイト: How to get started with Apache Avro using Java Clients
- Confluent でサポートされるスキーマフォーマット、および Avro、Protobuf、JSON スキーマを使用してクライアントを構成する方法: フォーマット、シリアライザー、逆シリアライザー
- 実践: 「Schema Registry API の使用例」では、HTTP および HTTPS を使用するさまざまな curl コマンドを紹介