Confluent Cloud Schema Registry のチュートリアル

Looking for Schema Management Confluent Cloud docs? You are currently viewing Confluent Platform documentation. If you are looking for Confluent Cloud docs, check out Schema Management on Confluent Cloud.

概要

このチュートリアルでは、Confluent Cloud Schema Registry を使用するためのステップバイステップのワークフローを紹介しています。Avro データの読み取りと書き込み、進化するスキーマの互換性チェックができるようにクライアントアプリケーションを設定する方法について見ていきましょう。

このチュートリアルを完了したら、Confluent Cloud でのスキーマの操作に関する基本情報を確認することをお勧めします。

このチュートリアルは、Confluent Cloud Schema Registry での実行を意図しています。ローカルの Confluent Platform 環境をご利用の方は、オンプレミスデプロイ向けの Confluent Schema Registry チュートリアルを「オンプレミス Schema Registry のチュートリアル」でご覧ください。

セットアップ

前提条件

このチュートリアルに取り組む前に、「Schema Registry のチュートリアル」で Schema Registry の概念を簡単に押さえておきましょう。

ローカルマシンに次のソフトウェアがインストールされていることを確認します。

  • 初期化された Confluent Cloud クラスター
    • Confluent Cloud Console の Billing & payment セクションでプロモーションコード C50INTEG を入力すると、Confluent Cloud で $50 相当を無料で使用できます(詳細)。このプロモーションコードで、この Confluent Cloud サンプルの 1 日分の実行費用が補填されます。これを超えてサービスを利用すると、このサンプルで作成した Confluent Cloud リソースを破棄するまで、時間単位で課金されることがあります。
  • Confluent Cloud CLI v1.7.0 以降のローカルインストール
  • Java クライアントを実行するための Java 1.8 または 1.11
  • クライアント Java コードをコンパイルするための Maven
  • Confluent Cloud Schema Registry の REST エンドポイントに対するクエリから返された結果の形式を合わせるための jq ツール

環境のセットアップ

  1. このチュートリアルは新しい Confluent Cloud 環境で実行するため、他の作業の妨げになることはありません。Confluent Cloud 向け ccloud-stack ユーティリティ を使用して、新しい Confluent Cloud スタックに新しい環境、新しいサービスアカウント、新しい Kafka クラスターおよび関連認証情報をプロビジョニングして、Confluent Cloud Schema Registry および関連認証情報であるワイルドカードを使用するサービスアカウント用 ACL を有効にできます。ccloud-stack の使用手順 に従って Confluent Cloud にログオンし、ccloud-stack を作成してください。

    ../_images/ccloud-stack-resources.png
  2. ccloud-stack ユーティリティを実行すると、Confluent Cloud と Confluent Cloud Schema Registry の接続情報をすべて含んだ構成ファイルも生成されます。自動的に生成された examples/ccloud/ccloud-stack/stack-configs/java-service-account-<account>.config ファイルが次のようになっていることを確認します。

    # ------------------------------
    # ENVIRONMENT ID: <ENVIRONMENT ID>
    # SERVICE ACCOUNT ID: <SERVICE ACCOUNT ID>
    # KAFKA CLUSTER ID: <KAFKA CLUSTER ID>
    # SCHEMA REGISTRY CLUSTER ID: <SCHEMA REGISTRY CLUSTER ID>
    # ------------------------------
    ssl.endpoint.identification.algorithm=https
    security.protocol=SASL_SSL
    sasl.mechanism=PLAIN
    bootstrap.servers=<BROKER ENDPOINT>
    sasl.jaas.config=org.apache.kafka.common.security.plain.PlainLoginModule required username='<API KEY>' password='<API SECRET>';
    basic.auth.credentials.source=USER_INFO
    schema.registry.basic.auth.user.info=<SR API KEY>:<SR API SECRET>
    schema.registry.url=https://<SR ENDPOINT>
    
  3. ccloud-stack によって生成された構成ファイルを $HOME/.confluent/java.config にコピーします。

  4. 変数をシェルにエクスポートし、<SR API KEY><SR API SECRET><SR ENDPOINT> の各値を置き換えます。そうすると、残りのチュートリアルのコマンドをコピーして貼り付けるだけで済みます。

    export SCHEMA_REGISTRY_BASIC_AUTH_USER_INFO=<SR API KEY>:<SR API SECRET>
    export SCHEMA_REGISTRY_URL=<SR ENDPOINT>
    
  5. Confluent example リポジトリのクローンを GitHub から作成し、 clients/avro/ サブディレクトリで作業します。ここにはこのチュートリアルでコンパイルし実行するサンプルコードが含まれます。

    git clone https://github.com/confluentinc/examples.git
    
    cd examples/clients/avro
    
    git checkout 6.2.4-post
    

transactions トピックの作成

このチュートリアルのエクササイズでは、transactions というトピックとの間でメッセージの生成と消費を行います。Confluent Cloud の UI でこのトピックを作成してください。

  1. Confluent Cloud の UI(https://confluent.cloud)に移動し、ご利用の環境と Kafka クラスターをクリックします。

  2. Topics を選択して Create topic をクリックします。

  3. トピックに transactions という名前を付けて、Create with defaults をクリックします。

    ../_images/ccloud-create-topic-name-sr.png

    新しいトピックが表示されます。

    ../_images/ccloud-create-topic-new-sr.png

スキーマの定義

開発者がまず行うべきことは、データの基本的なスキーマへの合意です。クライアントアプリケーションは次の "契約" を結ぶことになります。

  • プロデューサーがスキーマを使ってデータを書き込みます。
  • そのデータをコンシューマーが読み取ることができるようになります。

最初の状態の Payment スキーマ(Payment.avsc)を考えてみます。このスキーマを表示するには、次のコマンドを実行します。

cat src/main/resources/avro/io/confluent/examples/clients/basicavro/Payment.avsc

スキーマの定義に注目してください。

{
 "namespace": "io.confluent.examples.clients.basicavro",
 "type": "record",
 "name": "Payment",
 "fields": [
     {"name": "id", "type": "string"},
     {"name": "amount", "type": "double"}
 ]
}

このスキーマで定義されている内容は次のとおりです。

  • namespace: スキーマ名の競合を防止するための完全修飾名
  • type: Avro のデータ型recordenumunionarraymapfixed など)
  • name: この名前空間における一意のスキーマ名
  • fields: record の単純データ型または複合データ型。このレコードの先頭にあるのは id というフィールドで、そのデータ型は string です。このレコードの 2 つ目にあるのは amount というフィールドで、そのデータ型は double です。

Avro でデータを書き込むクライアントアプリケーション

Maven

このチュートリアルでは、Maven を使用してプロジェクトと依存関係を構成します。Avro を使用した Kafka プロデューサーまたはコンシューマーを含んだ Java アプリケーションでは、pom.xml ファイルに、特に次のものを追加する必要があります。

  • Confluent Maven リポジトリ
  • Confluent Maven プラグインリポジトリ
  • データを Avro としてシリアル化するための依存関係(org.apache.avro.avro および io.confluent.kafka-avro-serializer
  • ソーススキーマから Java クラスファイルを生成するための avro-maven-plugin プラグイン

pom.xml ファイルには、次のものを含めることもできます。

  • 進化するスキーマの互換性をチェックするためのプラグイン(kafka-schema-registry-maven-plugin

完全な pom.xml の例については、こちらの pom.xml を参照してください。

Avro の構成

Avro データと Schema Registry を使用する Kafka アプリケーションは、少なくとも次の 2 つの構成パラメーターを指定する必要があります。

  • Avro のシリアライザーまたはデシリアライザー
  • Schema Registry に接続するためのプロパティ

アプリケーションで使用できる Avro レコードの種類は、基本的に次の 2 つです。

  • コードによって生成された特定のクラス
  • 汎用的なレコード

このチュートリアルの例では、特定の Payment クラスの使用方法を紹介しています。コードによって生成された特定のクラスを使用するためには、実際のスキーマに対応した Java クラスを定義してコンパイルする必要がありますが、コード内での扱いやすさの点では、こちらの方が有利です。

ただし、それ以外のシナリオで、随時任意の型のデータを扱う必要があり、なおかつ実際のレコードタイプに使用する Java クラスがない場合は、GenericRecord <streams-data-avro> を使用してください。

Confluent Platform には、"reflection Avro" フォーマットのデータを書き込んだり読み取ったりするためのシリアライザーとデシリアライザーも用意されています。詳細については、「リフレクションベースの Avro シリアライザーと逆シリアライザー」を参照してください。

Java プロデューサー

このクライアントアプリケーション内の Java プロデューサーは、Kafka の値(または Kafka のキー)に使用する Avro シリアライザーと Schema Registry の URL を構成する必要があります。これで、Kafka の値が Payment クラスであるレコードを、このプロデューサーで書き込むことができます。

プロデューサーコードの例

プロデューサーを作成する際、アプリケーションのコードによって生成された Payment クラスを使用するようにメッセージの値クラスを構成します。以下に例を示します。

...
import io.confluent.kafka.serializers.KafkaAvroSerializer;
...
props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, KafkaAvroSerializer.class);
...
KafkaProducer<String, Payment> producer = new KafkaProducer<String, Payment>(props));
final Payment payment = new Payment(orderId, 1000.00d);
final ProducerRecord<String, Payment> record = new ProducerRecord<String, Payment>(TOPIC, payment.getId().toString(), payment);
producer.send(record);
...

pom.xmlavro-maven-plugin が含まれているため、Payment クラスはコンパイルの過程で自動的に生成されます。

この例では、Kafka ブローカーと Schema Registry への接続情報を、コードに渡された構成ファイルから取得しています。接続情報をクライアントアプリケーションで直接指定する場合は、こちらの java テンプレート を参照してください。

Java プロデューサーの完全な例については、プロデューサーの例 を参照してください。

プロデューサーの実行

シェルで、examples/clients/avro から次のコマンドを実行します。

  1. このプロデューサーを実行するには、まずプロジェクトをコンパイルします。

    mvn clean compile package
    
  2. Confluent Cloud の UI から、クラスターが選択されていることを確認して、Topics をクリックします。

    次に、transactions トピックをクリックし、Messages タブに移動します。

    このトピックにはまだメッセージが生成されていないので、メッセージは表示されません。

  3. Avro フォーマットのメッセージを transactions トピックに生成する ProducerExample を実行します。先ほど作成したファイルのパス $HOME/.confluent/java.config を渡してください。

    mvn exec:java -Dexec.mainClass=io.confluent.examples.clients.basicavro.ProducerExample \
      -Dexec.args="$HOME/.confluent/java.config"
    

    このコマンドの実行には、少し時間がかかります。完了すると、次のように表示されます。

    ...
    Successfully produced 10 messages to a topic called transactions
    [INFO] ------------------------------------------------------------------------
    [INFO] BUILD SUCCESS
    [INFO] ------------------------------------------------------------------------
    ...
    
  4. これで、Confluent Cloud の UI からメッセージを確認できるはずです。 transactions トピックをチェックしてみましょう。新しく到着したデータが動的に表示されます。

    Confluent Cloud の UI の左側で、対象のクラスターにクリックして移動し、TopicstransactionsMessages の順に移動します。

    ちなみに

    データが表示されない場合は、プロデューサーを再度実行し、正常に完了したことを確認して、もう一度 Confluent Cloud の UI を見てください。メッセージはコンソールに保持されないので、プロデューサーを実行したらすぐに確認する必要があります。

    ../_images/ccloud-inspect-transactions.png

Java コンシューマー

このクライアントアプリケーション内の Java コンシューマーは、Kafka の値(または Kafka のキー)に使用する Avro 逆シリアライザーと Schema Registry の URL を構成する必要があります。これで、Kafka の値が Payment クラスであるレコードを、このコンシューマーで読み取ることができます。

コンシューマー コードの例

デフォルトでは、各レコードが Avro GenericRecord に逆シリアル化されます。ただし、このチュートリアルでは、アプリケーションのコードによって生成された Payment クラスを使用してレコードを逆シリアル化する必要があります。そのため、Avro SpecificRecord を使用するようにデシリアライザーを構成します。つまり、SPECIFIC_AVRO_READER_CONFIGtrue に設定します。以下に例を示します。

...
import io.confluent.kafka.serializers.KafkaAvroDeserializer;
...
props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, KafkaAvroDeserializer.class);
props.put(KafkaAvroDeserializerConfig.SPECIFIC_AVRO_READER_CONFIG, true);
...
KafkaConsumer<String, Payment> consumer = new KafkaConsumer<>(props));
consumer.subscribe(Collections.singletonList(TOPIC));
while (true) {
  ConsumerRecords<String, Payment> records = consumer.poll(100);
  for (ConsumerRecord<String, Payment> record : records) {
    String key = record.key();
    Payment value = record.value();
  }
}
...

pom.xmlavro-maven-plugin が含まれているため、Payment クラスはコンパイルの過程で自動的に生成されます。

この例では、Kafka ブローカーと Schema Registry への接続情報を、コードに渡された構成ファイルから取得しています。接続情報をクライアントアプリケーションで直接指定する場合は、こちらの java テンプレート を参照してください。

Java コンシューマーの完全な例については、コンシューマーの例 を参照してください。

コンシューマーの実行

  1. このコンシューマーを実行するには、まずプロジェクトをコンパイルします。

    mvn clean compile package
    

    BUILD SUCCESS メッセージは、プロジェクトがビルドされ、コマンドプロンプトが再び利用可能な状態になったことを意味します。

  2. 次に、ConsumerExample を実行します(前述の ProducerExample は既に実行済みであるものとします)。先ほど作成したファイルのパス $HOME/.confluent/java.config を渡してください。

    mvn exec:java -Dexec.mainClass=io.confluent.examples.clients.basicavro.ConsumerExample \
      -Dexec.args="$HOME/.confluent/java.config"
    

    次のように表示されます。

    ...
    key = id0, value = {"id": "id0", "amount": 1000.0}
    key = id1, value = {"id": "id1", "amount": 1000.0}
    key = id2, value = {"id": "id2", "amount": 1000.0}
    key = id3, value = {"id": "id3", "amount": 1000.0}
    key = id4, value = {"id": "id4", "amount": 1000.0}
    key = id5, value = {"id": "id5", "amount": 1000.0}
    key = id6, value = {"id": "id6", "amount": 1000.0}
    key = id7, value = {"id": "id7", "amount": 1000.0}
    key = id8, value = {"id": "id8", "amount": 1000.0}
    key = id9, value = {"id": "id9", "amount": 1000.0}
    ...
    
  3. Ctrl + C を押して停止します。

その他の Kafka クライアント

このチュートリアルは、Avro および Schema Registry の一元化されたスキーマ管理と互換性チェックについて説明することを目的としています。例を単純化するために、このチュートリアルでは、Java のプロデューサーとコンシューマーを中心に取り上げていますが、他の Kafka クライアントも同じように動作します。Avro や Schema Registry と連携する他の Kafka クライアントの例については、次の記事を参照してください。

一元化されたスキーマ管理

Schema Registry 内のスキーマの表示

ここまでで、Avro データをシリアル化するプロデューサーと Avro データを逆シリアル化するコンシューマーが完成しました。プロデューサーは、Confluent Cloud Schema Registry にスキーマを登録し、コンシューマーは、Confluent Cloud Schema Registry からスキーマを取得します。

  1. Confluent Cloud の UI の左側で、クラスターが選択されていることを確認して、Topics をクリックします。

  2. transactions トピックをクリックして Schema タブに移動し、このトピックの最新のスキーマを Confluent Cloud Schema Registry から取得します。

    ../_images/ccloud-schema-transactions.png

    このスキーマは、Java クライアントアプリケーション用に定義されているスキーマファイル と同じです。

curl を使用した Schema Registry との対話

curl コマンドを使用して Confluent Cloud Schema Registry の REST エンドポイントに直接接続し、サブジェクトや関連するスキーマを表示することもできます。

  1. Confluent Cloud Schema Registry に登録されているすべてのサブジェクトを表示するには、次のコマンドを使用します。

    curl --silent -X GET -u $SCHEMA_REGISTRY_BASIC_AUTH_USER_INFO https://$SCHEMA_REGISTRY_URL/subjects | jq.
    

    上のコマンドを実行すると、次のような結果が出力されます。

    [
      "transactions-value"
    ]
    

    この例では、Kafka のトピックである transactions に、値(つまり "ペイロード")が Avro であるメッセージが含まれています。また、Confluent Cloud Schema Registry のサブジェクト名は、デフォルトにより transactions-value となっています。

  2. このサブジェクトの最新のスキーマをさらに詳しく表示するには、次のコマンドを使用します。

    curl --silent -X GET -u $SCHEMA_REGISTRY_BASIC_AUTH_USER_INFO https://$SCHEMA_REGISTRY_URL/subjects/transactions-value/versions/latest | jq .
    

    上のコマンドを実行すると、次のような結果が出力されます。

    {
      "subject": "transactions-value",
      "version": 1,
      "id": 100001,
      "schema": "{\"type\":\"record\",\"name\":\"Payment\",\"namespace\":\"io.confluent.examples.clients.basicavro\",\"fields\":[{\"name\":\"id\",\"type\":\"string\"},{\"name\":\"amount\",\"type\":\"double\"}]}"
    }
    

    このバージョンのスキーマで定義されている内容は次のとおりです。

    • subject: トピック transactions 内のメッセージに使用されるスキーマが進化できる範囲
    • version: このサブジェクトのスキーマのバージョン。サブジェクトごとに 1 から始まります。
    • id: グローバルに一意のスキーマバージョン ID(すべてのサブジェクトのすべてのスキーマで一意)
    • schema: スキーマのフォーマットを定義する構造

    上に示した curl コマンドの出力では、スキーマが、エスケープされた JSON になっていることに注目してください。二重引用符の前にバックスラッシュが記述されています。

  3. スキーマ ID に基づいて、関連するスキーマを取得することもできます。Confluent Cloud Schema Registry の REST エンドポイントに対し、次のようにクエリーを実行します。

    curl --silent -X GET -u $SCHEMA_REGISTRY_BASIC_AUTH_USER_INFO https://$SCHEMA_REGISTRY_URL/schemas/ids/100001 | jq .
    

    次のような結果が出力されます。

    {
      "schema": "{\"type\":\"record\",\"name\":\"Payment\",\"namespace\":\"io.confluent.examples.clients.basicavro\",\"fields\":[{\"name\":\"id\",\"type\":\"string\"},{\"name\":\"amount\",\"type\":\"double\"}]}"
    }
    

メッセージ内のスキーマ ID

Schema Registry と連携するということは、Kafka メッセージを書き込む際に、Avro スキーマ全体を含める必要がないということです。代わりに、Kafka メッセージはスキーマ ID を付けて書き込まれます。スキーマとスキーマ ID との間で同じマッピングを得るためには、メッセージを書き込むプロデューサーと、そのメッセージを読み取るコンシューマーとが、同じ Schema Registry を使用している必要があります。

この例では、プロデューサーが Payments 用の新しいスキーマを Confluent Cloud Schema Registry に送信します。Confluent Cloud Schema Registry は、この Payments スキーマを transactions-value サブジェクトに登録し、100001 というスキーマ ID をプロデューサーに返します。スキーマとスキーマ ID との間のこのマッピングは、以降、プロデューサーがメッセージを書き込むときのためにキャッシュされます。したがって、プロデューサーが Confluent Cloud Schema Registry に問い合わせを行うのは、スキーマの初回書き込み時のみです。

コンシューマーは、このデータを読み取る際、100001 という Avro スキーマ ID を参照して、Confluent Cloud Schema Registry にスキーマリクエストを送信します。Confluent Cloud Schema Registry は、スキーマ ID 100001 に関連付けられているスキーマを取得して、それをコンシューマーに返します。スキーマとスキーマ ID との間のこのマッピングは、以降、コンシューマーがメッセージを読み取るときのためにキャッシュされます。したがって、コンシューマーが Confluent Cloud Schema Registry に問い合わせを行うのは、スキーマ ID の初回読み取り時のみです。

スキーマの自動登録

クライアントアプリケーションは、デフォルトで新しいスキーマを自動的に登録します。新しいトピックに新しいメッセージを生成する場合、新しいスキーマを自動的に登録しようと試みます。これは開発環境では非常に便利ですが、本稼働環境では、クライアントアプリケーションで新しいスキーマを自動的に登録することはお勧めしません。クライアントアプリケーションの外部でスキーマを登録し、スキーマがいつ Schema Registry に登録され、どのように進化するかを制御することがベストプラクティスです。

アプリケーション内でスキーマの自動登録を無効にするには、次の例のように構成パラメーター auto.register.schemas=false を設定します。

props.put(AbstractKafkaAvroSerDeConfig.AUTO_REGISTER_SCHEMAS, false);

ちなみに

  • use.latest.version を有効にする場合は、auto.register.schemas を false に設定してスキーマの自動登録を無効にしたうえで、use.latest.version を true に設定する必要があります(デフォルトとは逆の設定)。use.latest.version を正しく機能させるには、オプション auto.register.schemas を false に設定する必要があります。

    auto.register.schemas を false に設定すると、イベントタイプの自動登録が無効になるので、サブジェクト内の最新のスキーマがオーバーライドされることはありません。use.latest.version が true に設定されている場合、シリアライザーは、サブジェクトから最新のスキーマバージョンを検索し、それをシリアル化に使用します。use.latest.version が false(デフォルト)に設定されている場合、シリアライザーは、サブジェクトからイベントタイプを探しますが、その検出に失敗します。

  • Kafka Connect の Schema Registry の「構成オプション」も参照してください。

  • 構成オプション auto.register.schemas は Confluent Platform の機能であり、Apache Kafka® では利用できません。

アプリケーションの外部からスキーマを手動で登録するには、Confluent Cloud の UI を使用します。

まず、test という新しいトピックを作成します。その方法は、このチュートリアルの中で先ほど transactions という名前の新しいトピックを作成したときと同じです。次に、Schema タブから、Set a schema をクリックして新しいスキーマを定義します。次の値を指定してください。

  • namespace: スキーマ名の競合を防止するための完全修飾名
  • type: Avro のデータ型recordenumunionarraymapfixed のいずれか)
  • name: この名前空間における一意のスキーマ名
  • fields: record の単純データ型または複合データ型。このレコードの先頭にあるのは id というフィールドで、そのデータ型は string です。このレコードの 2 つ目にあるのは amount というフィールドで、そのデータ型は double です。

仮に、先ほど使用したものと同じスキーマを定義する場合、スキーマエディターに次のように入力することになります。

{
  "type": "record",
  "name": "Payment",
  "namespace": "io.confluent.examples.clients.basicavro",
  "fields": [
    {
      "name": "id",
      "type": "string"
    },
    {
      "name": "amount",
      "type": "double"
    }
  ]
}

Schema Registry の REST エンドポイントに直接接続して、test トピックに使用する新しいサブジェクトのスキーマを定義する場合は、次のコマンドを実行してください。

curl -X POST -H "Content-Type: application/vnd.schemaregistry.v1+json" \
  --data '{"schema": "{\"type\":\"record\",\"name\":\"Payment\",\"namespace\":\"io.confluent.examples.clients.basicavro\",\"fields\":[{\"name\":\"id\",\"type\":\"string\"},{\"name\":\"amount\",\"type\":\"double\"}]}"}' \
  -u $SCHEMA_REGISTRY_BASIC_AUTH_USER_INFO \
  https://$SCHEMA_REGISTRY_URL/subjects/test-value/versions

このサンプル出力では、100001 という ID のスキーマが作成されます。

{"id":100001}

スキーマ進化と互換性

スキーマの進化

このチュートリアルではこれまで、グローバルに一意のスキーマ ID をクライアントアプリケーションが登録したり取得したりできる一元化されたスキーマ管理こそ、Schema Registry の利点であると捉えてきました。しかし、Schema Registry の最大の価値は、スキーマの進化への対応です。API は進化しても、新旧のバージョンの API に依存するすべてのアプリケーションとの互換性が確保されている必要があります。同様に、スキーマも進化します。そしてやはり、新旧のバージョンのスキーマに依存するすべてのアプリケーションとの互換性が確保されていなければなりません。このスキーマ進化は、時間の経過に伴ってアプリケーションやデータに現れる自然な性質です。

Schema Registry にはスキーマの進化が考慮されており、プロデューサーとコンシューマーとの間の契約違反を防ぐための互換性チェック機構が備わっています。そのため、プロデューサーとコンシューマーが別々にアップデートされ、それぞれのスキーマが個別に進化したとしても、プロデューサーとコンシューマーは新旧のデータを確実に読み取ることができます。プロデューサーとコンシューマーは分離されたアプリケーションであり、異なるチームによって開発されることもあるため、Kafka ではこのことが特に重要となります。

推移的な互換性のチェックは、特定のサブジェクトに複数のバージョンのスキーマがある場合に重要です。互換性が推移的である場合、新しいスキーマの互換性が登録済みの全スキーマに対してチェックされます。互換性が推移的でない場合、新しいスキーマの互換性は最新のスキーマに対してのみチェックされます。

たとえば、あるサブジェクトに X-2X-1X の順序で変化する 3 つのスキーマがある場合:

  • 推移的 : X-2 <==> X-1X-1 <==> XX-2 <==> X のいずれについても互換性が確保されます。
  • 非推移的 : X-2 <==> X-1X-1 <==> X では互換性が確保されますが、X-2 <==> X については必ずしも互換性が確保されません。

漸進的には互換性を持ち、推移的には互換性を持たない スキーマ変更の例 を参照してください。

Confluent Schema Registry のデフォルトの互換性タイプ BACKWARD は非推移的です。つまり、BACKWARD_TRANSITIVE ではありません。このため、新しいスキーマの互換性は最新のスキーマに対してのみチェックされます。

次の互換性タイプがあります。

  • BACKWARD: (デフォルト)新しいスキーマを使用するコンシューマーは、一番最後に登録されたスキーマを使用するプロデューサーからの出力データを読み取ることができます。
  • BACKWARD_TRANSITIVE: 新しいスキーマを使用するコンシューマーは、既に登録されている任意のスキーマを使用するプロデューサーからの出力データを読み取ることができます。
  • FORWARD: 一番最後に登録されたスキーマを使用するコンシューマーが、新しいスキーマを使用するプロデューサーからの出力データを読み取ることができます。
  • FORWARD_TRANSITIVE: 既に登録されている任意のスキーマを使用するコンシューマーが、新しいスキーマを使用するプロデューサーからの出力データを読み取ることができます。
  • FULL: 新しいスキーマには、一番最後に登録されたスキーマに対する前方互換性と後方互換性があります。
  • FULL_TRANSITIVE: 新しいスキーマには、既に登録されている任意のスキーマに対する前方互換性と後方互換性があります。
  • NONE: スキーマの互換性チェックを無効にします。

互換性タイプの詳細については、「スキーマ進化と互換性」を参照してください。

互換性チェックでの不合格

Schema Registry はプロデューサーとコンシューマーとの間の契約を維持するために、進化するスキーマの互換性をチェックします。Schema Registry による互換性チェックがなければ、スキーマの変更によってアプリケーションの動作に支障が生じる可能性があります。

Payment スキーマの例で、企業が個々の支払いについて、新しい情報、たとえば売上の発生場所を表す region フィールドを追跡することになったとしましょう。この新しい region フィールドを含んだ Payment2a スキーマ を考えてみます。

cat src/main/resources/avro/io/confluent/examples/clients/basicavro/Payment2a.avsc
{
 "namespace": "io.confluent.examples.clients.basicavro",
 "type": "record",
 "name": "Payment",
 "fields": [
     {"name": "id", "type": "string"},
     {"name": "amount", "type": "double"},
     {"name": "region", "type": "string"}
 ]
}

Schema Registry のデフォルトの互換性は BACKWARD です。まずは、この新しいスキーマに後方互換性があるかどうかを考えてみましょう。region フィールドのない従来のスキーマを使ってプロデューサーが書き込んだデータを、コンシューマーは、この新しいスキーマを使用して読み取ることができるでしょうか。答えは No です。従来のスキーマが使用されたデータを、コンシューマーは読み取ることができません。従来のデータには region フィールドがないためです。したがって、このスキーマには後方互換性がありません。

Confluent では Schema Registry Maven プラグイン を提供しています。このプラグインを使用して、互換性チェックを開発過程で行ったり、CI/CD パイプラインに統合したりすることができます。

サンプルの pom.xml にはこのプラグインが含まれ、互換性チェックが有効になっています。

...
<properties>
  <schemaRegistryUrl>http://localhost:8081</schemaRegistryUrl>
  <schemaRegistryBasicAuthUserInfo></schemaRegistryBasicAuthUserInfo>
</properties>
...
<build>
  <plugins>
  ...
    <plugin>
        <groupId>io.confluent</groupId>
        <artifactId>kafka-schema-registry-maven-plugin</artifactId>
        <version>${confluent.version}</version>
        <configuration>
            <schemaRegistryUrls>
                <param>${schemaRegistryUrl}</param>
            </schemaRegistryUrls>
            <userInfoConfig>${schemaRegistryBasicAuthUserInfo}</userInfoConfig>
            <subjects>
                <transactions-value>src/main/resources/avro/io/confluent/examples/clients/basicavro/Payment2a.avsc</transactions-value>
            </subjects>
        </configuration>
        <goals>
            <goal>test-compatibility</goal>
        </goals>
    </plugin>
...
  </plugins>
</build>

現在は、Schema Registry 内の transactions-value サブジェクトに対して、新しい Payment2a スキーマの互換性をチェックするように構成されています。

  1. 互換性チェックを実行します。

    mvn io.confluent:kafka-schema-registry-maven-plugin:test-compatibility \
        "-DschemaRegistryUrl=https://$SCHEMA_REGISTRY_URL" \
        "-DschemaRegistryBasicAuthUserInfo=$SCHEMA_REGISTRY_BASIC_AUTH_USER_INFO" \
        "-DschemaLocal=src/main/resources/avro/io/confluent/examples/clients/basicavro/Payment2a.avsc"
    
  2. 互換性チェックで不合格になることを確認します。返されるエラーメッセージは次のとおりです。

    ...
    [ERROR] Schema examples/clients/avro/src/main/resources/avro/io/confluent/examples/clients/basicavro/Payment2a.avsc is not compatible with subject(transactions-value)
    ...
    
  3. 新しいスキーマである Payment2a を Schema Registry に手動で登録してみましょう。これは、クライアントが Java 以外である場合に、コマンドラインから互換性をチェックする際に便利な方法です。

    curl -X POST -H "Content-Type: application/vnd.schemaregistry.v1+json" \
      --data '{"schema": "{\"type\":\"record\",\"name\":\"Payment\",\"namespace\":\"io.confluent.examples.clients.basicavro\",\"fields\":[{\"name\":\"id\",\"type\":\"string\"},{\"name\":\"amount\",\"type\":\"double\"},{\"name\":\"region\",\"type\":\"string\"}]}"}' \
      -u $SCHEMA_REGISTRY_BASIC_AUTH_USER_INFO \
      https://$SCHEMA_REGISTRY_URL/subjects/transactions-value/versions
    
  4. スキーマに互換性がないというエラーメッセージが Confluent Cloud Schema Registry から返され、スキーマが拒否されることを確認します。

    {"error_code":409,"message":"Schema being registered is incompatible with an earlier schema"}
    

互換性チェックでの合格

後方互換性 を確保するためには、新しいフィールドが存在しなかった場合のデフォルト値を新しいスキーマで想定しておく必要があります。

  1. region にデフォルト値を与えたアップデート後の Payment2b スキーマ を考えてみます。このスキーマを表示するには、次のコマンドを実行します。

    cat src/main/resources/avro/io/confluent/examples/clients/basicavro/Payment2b.avsc
    

    次のように出力されます。

    {
     "namespace": "io.confluent.examples.clients.basicavro",
     "type": "record",
     "name": "Payment",
     "fields": [
         {"name": "id", "type": "string"},
         {"name": "amount", "type": "double"},
         {"name": "region", "type": "string", "default": ""}
     ]
    }
    
  2. UI から transactions トピックをクリックして Schema タブに移動し、transactions トピックの最新のスキーマを Schema Registry から取得します。

  3. Edit Schema をクリックします。

    ../_images/tutorial-c3-edit-schema.png
  4. もう一度新しいフィールド region を追加しますが、今回は、次のようにデフォルト値を追加します。その後、Save をクリックします。

    {
     "name": "region",
     "type": "string",
     "default": ""
    }
    
  5. 新しいスキーマが受け入れられたことを確認します。

    ../_images/tutorial-c3-edit-schema-pass.png

    注釈

    無効な Avro についてのエラーメッセージが表示された場合は、構文をチェックしてください(引用符やコロン、閉じかっこ、直前のフィールドとのコンマ区切りなど)。

  6. 登録されているスキーマバージョンについて考えてみます。transactions トピックの transactions-value という Schema Registry サブジェクトには次の 2 つのスキーマが存在します。

    • バージョン 1: Payment.avsc
    • バージョン 2: Payment2b.avscregion フィールドがデフォルト値(空の値)とともに追加されています。
  7. 引き続き UI の transactions トピックの Schema タブで、Version history をクリックし、Turn on version diff を選択して 2 つのバージョンを比較します。

    ../_images/tutorial-c3-schema-compare.png
  8. コマンドラインで Schema Registry Maven プラグイン に戻り、Payment2a.avsc ではなく Payment2b.avsc を参照するように pom.xml を更新します。

  9. 互換性チェックを再実行し、合格になることを確認します。

    mvn io.confluent:kafka-schema-registry-maven-plugin:test-compatibility
    
  10. 次のメッセージが返されたことを確認します。これはスキーマが互換性チェックに合格したことを示します。

    ...
    [INFO] Schema examples/clients/avro/src/main/resources/avro/io/confluent/examples/clients/basicavro/Payment2b.avsc is compatible with subject(transactions-value)
    ...
    
  11. Schema Registry の REST エンドポイントに直接接続して、新しいスキーマ Payment2b を登録する場合は、次のコマンドを実行してください。問題なく実行されるはずです。

    curl -X POST -H "Content-Type: application/vnd.schemaregistry.v1+json" \
      --data '{"schema": "{\"type\":\"record\",\"name\":\"Payment\",\"namespace\":\"io.confluent.examples.clients.basicavro\",\"fields\":[{\"name\":\"id\",\"type\":\"string\"},{\"name\":\"amount\",\"type\":\"double\"},{\"name\":\"region\",\"type\":\"string\",\"default\":\"\"}]}"}' \
      -u $SCHEMA_REGISTRY_BASIC_AUTH_USER_INFO \
      https://$SCHEMA_REGISTRY_URL//subjects/transactions-value/versions
    

    上の curl コマンドは、成功した場合、新しいスキーマのバージョン id を返します。

    {"id":100002}
    
  12. Confluent Cloud Schema Registry にある transactions-value の最新のサブジェクトを表示します。

    curl --silent -X GET -u $SCHEMA_REGISTRY_BASIC_AUTH_USER_INFO https://$SCHEMA_REGISTRY_URL/subjects/transactions-value/versions/latest | jq .
    

    このコマンドからは、transactions-value トピックの最新の Confluent Cloud Schema Registry サブジェクトが返されます。JSON 形式で表記されたスキーマのバージョン番号、ID、説明などが含まれます。

    {
      "subject": "transactions-value",
      "version": 100002,
      "id": 100002,
      "schema": "{\"type\":\"record\",\"name\":\"Payment\",\"namespace\":\"io.confluent.examples.clients.basicavro\",\"fields\":[{\"name\":\"id\",\"type\":\"string\"},{\"name\":\"amount\",\"type\":\"double\"},{\"name\":\"region\",\"type\":\"string\",\"default\":\"\"}]}"
    }
    

    次の変更点に注目してください。

    • version: 100001 から 100002 に変わっています。
    • id: 100001 から 100002 に変わっています。
    • schema: デフォルト値を含む新しい region フィールドでアップデートされています。

互換性タイプの変更

デフォルトの互換性タイプは backward ですが、グローバルに、またはサブジェクト単位で変更することができます。

UI からサブジェクトごとに互換性タイプを変更するには、transactions トピックをクリックして Schema タブに移動し、transactions トピックの最新のスキーマを Schema Registry から取得します。Edit Schema をクリックし、Compatibility Mode をクリックします。

../_images/c3-edit-compatibility.png

このトピックの互換性はデフォルトの backward に設定されていますが、必要に応じて変更することができます。

Confluent Cloud Schema Registry の REST エンドポイントに直接接続して、transactions トピックである transactions-value サブジェクトの互換性タイプを変更する場合は、次のサンプルコマンドを実行してください。

curl -X PUT -H "Content-Type: application/vnd.schemaregistry.v1+json" \
       --data '{"compatibility": "BACKWARD_TRANSITIVE"}' \
       -u $SCHEMA_REGISTRY_BASIC_AUTH_USER_INFO https://$SCHEMA_REGISTRY_URL/config/transactions-value

ccloud-stack の破棄

チュートリアルが終了したら、Confluent Cloud で作成したリソースを破棄します。

  1. チュートリアルの最初に作成した cloud-stack を破棄するには、bash スクリプト ccloud_stack_destroy.sh を呼び出して、ccloud-stack の作成時に自動生成されたプロパティファイルを渡します。

    # Change directory if needed
    cd <path to examples>/ccloud/ccloud-stack/
    
    ./ccloud_stack_destroy.sh stack-configs/java-service-account-<account>.config
    
  2. Confluent Cloud 内のリソースが破棄されたことを必ず確認してください。

次のステップ

スキーマの操作の詳細