Looking for Schema Management Confluent Cloud docs? You are currently viewing Confluent Platform documentation. If you are looking for Confluent Cloud docs, check out Schema Management on Confluent Cloud.

フォーマット、シリアライザー、逆シリアライザー

Confluent Platform では、Confluent Platform の本来のデフォルトフォーマットである Avro に加えて、プロトコルバッファJSON スキーマ がサポートされています。こうした新しいシリアル化フォーマットのサポートは Schema Registry に限らず、Confluent Platform 全体に及んでいます。さらに、Schema Registry は、カスタムスキーマ形式をスキーマプラグインとして追加できる拡張性を備えています。

Avro とともに Protobuf と JSON Schema にも対応した新しい Kafka シリアライザーとデシリアライザーが提供されています。Protobuf メッセージや JSON シリアル化可能なオブジェクトをシリアル化する際、これらのシリアライザーでスキーマを自動的に登録できます。Protobuf シリアライザーでは、インポートされているすべてのスキーマを再帰的に登録することが可能です。

これらのシリアライザーと逆シリアライザーは、Java、.NET、Python を含む複数の言語で利用できます。

Schema Registry は、同時に複数のフォーマットをサポートします。たとえば、あるサブジェクトには Avro スキーマを使用し、別のサブジェクトには Protobuf スキーマを使用できます。さらに、Protobuf と JSON Schema には、どちらも独自の互換性ルールがあるため、Avro と同様、Protobuf スキーマの進化にも後方互換性または前方互換性を確保することができます。

Confluent Platform の Schema Registry では、import ステートメントをモデル化することにより、Protobuf での スキーマ参照 もサポートされます。

サポートされているフォーマット

Confluent Platform では、特別な設定をしなくても、次のスキーマ形式がサポートされます。それぞれのフォーマットでシリアライザー、デシリアライザー、コマンドラインツールを利用できます。

フォーマット プロデューサー コンシューマー
Avro io.confluent.kafka.serializers.KafkaAvroSerializer io.confluent.kafka.serializers.KafkaAvroDeserializer
ProtoBuf io.confluent.kafka.serializers.protobuf.KafkaProtobufSerializer io.confluent.kafka.serializers.protobuf.KafkaProtobufDeserializer
JSON Schema io.confluent.kafka.serializers.json.KafkaJsonSchemaSerializer io.confluent.kafka.serializers.json.KafkaJsonSchemaDeserializer

ご利用のスキーマ形式に合ったシリアライザーとデシリアライザーを使用してください。シリアライザーは、メッセージを送信する Kafka プロデューサーのコードで指定し、デシリアライザーは、メッセージを読み取る Kafka コンシューマーのコードで指定します。

新しい Protobuf および JSON Schema のシリアライザーとデシリアライザーでは、Avro と同じ構成プロパティが数多くサポートされます。キーと値に使用される サブジェクト命名方法 もその 1 つです。RecordNameStrategy (および TopicRecordNameStrategy )の場合、サブジェクト名は次のようになります。

  • (Avro の場合)レコード名
  • (Protobuf の場合)メッセージ名
  • (JSON Schema の場合)タイトル

Protobuf および JSON Schema で RecordNameStrategy を使用する際は、別途必要な構成があります。その点については、以下の詳細解説セクションでコマンドラインのテストユーティリティや例とともに取り上げています。

サポートされているスキーマ形式では、いずれもデフォルトで、シリアライザーおよび Kafka Connect コンバーター によって自動的にスキーマが登録されます。Protobuf シリアライザーでは、参照先のすべてのスキーマが 1 つずつ再帰的に登録されます。

Protobuf と JSON Schema のサポートにより、Confluent Platform で、スキーマプラグインを使用して新しいスキーマ形式を追加できるようになりました(既存の Avro サポートは、Avro スキーマプラグインでラップされています)。

スキーマ参照

Confluent Platform (バージョン 5.5.0 以降)は、"スキーマ参照"(スキーマから別のスキーマを参照する機能)という概念をフルサポートします。スキーマ参照は、標準のスキーマ形式である Avro、JSON Schema、Protobuf で利用することができます。スキーマ参照には、import ステートメント(Protobuf の場合)および $ref フィールド(JSON Schema の場合)が使用されます。Confluent Platform の Avro もアップデートされ、スキーマ参照がサポートされます。

スキーマ参照は次の要素から成ります。

  • 参照の名前(Avro では完全修飾スキーマ名、JSON Schema では URL、Protobuf では他の Protobuf ファイルの名前が参照名)
  • サブジェクト(参照先のスキーマが登録される際のサブジェクト)
  • バージョン(登録されたサブジェクトの下に存在するスキーマの厳密なバージョン)

関連付けられた参照先があれば、スキーマを登録する際に指定する必要があります。通常は、参照先スキーマが先に登録されます。それらを参照するスキーマの登録時に、参照先スキーマのサブジェクトとバージョンを使用できます。

参照先を含んだスキーマを Schema Registry から取得すると、必要に応じて参照先スキーマも取得されます。

フォーマットごとのスキーマ参照の例については、以下のセクションを参照してください。

同じトピック内の複数のイベントタイプ

スキーマ参照 が提供するのは、あるスキーマから他のスキーマを呼び出すしくみだけではありません。スキーマ参照を使用することで、複数のイベントタイプを同じトピックの中で効率よく組み合わせながらも、サブジェクト-トピック制約を維持することができます。その実現に向けてスキーマ参照を使用することは、同じトピックに複数のイベントタイプを含める ための新しいアプローチとなります。

フォーマットごとの例については、以下のセクションを参照してください。

API

Schema Registry API リファレンス」で詳しく説明されているように、Confluent Platform 5.5.0 以降では、利用できるエンドポイントが 2 つ追加されています。

  • このエンドポイントは、指定したサブジェクトとバージョンに該当するスキーマを参照しているスキーマの ID を示します。

    GET /subjects/{subject}/versions/{version}/referencedby
    
  • このエンドポイントは、特定の ID が使用されているサブジェクトとバージョンのすべてのペアを示します。

    GET /schemas/ids/{id}/versions
    

スキーマ形式の拡張性

Schema Registry では、Avro、Protobuf、JSON Schema のスキーマプラグインがデフォルトで読み込まれます。REST API を使用する際に、それぞれ AVRO、PROTOBUF、JSON としてスキーマタイプを指定します。

カスタムスキーマプラグインは、SchemaProvider インターフェイスと ParsedSchema インターフェイスを実装することによって作成できます。カスタムスキーマプラグインを Schema Registry に読み込むには、プラグインの JAR を CLASSPATH に格納したうえで、次の Schema Registry 構成プロパティを使用し、追加で使用するプラグインプロバイダーのクラスをコンマ区切りで指定します。

schema.providers=com.acme.MySchemaProvider

AVRO、PROTOBUF、JSON のスキーマプラグインは、Schema Registry によって常時読み込まれるので追加しないでください。

サブジェクト命名方法

シリアライザーは、Schema Registry における名前空間を定義する "サブジェクト" 名で、スキーマをレジストリに登録します。

  • 互換性チェックはサブジェクト単位で実行されます。
  • バージョンはサブジェクトに紐付けられます。
  • スキーマは進化しても、同じサブジェクトに関連付けられたままですが、新しいスキーマ ID とバージョンが割り当てられます。

概要

サブジェクト名は、サブジェクト命名方法によって決まります。次の 3 つの命名方法がサポートされています。

命名方法 説明
TopicNameStrategy サブジェクト名はトピック名に由来します。デフォルトではこの方法が使用されます。
RecordNameStrategy サブジェクト名はレコード名に由来します。イベントは、それぞれデータ構造が異なる場合がありますが、この命名方法によって、関連するイベントをサブジェクトの下で論理的にグループ化できます。
TopicRecordNameStrategy サブジェクト名はトピック名とレコード名に由来します。イベントは、それぞれデータ構造が異なる場合がありますが、この命名方法によって、関連するイベントをサブジェクトの下で論理的にグループ化できます。

注釈

上に挙げた命名方法の完全なクラス名は、命名方法の名称に、プレフィックスとして io.confluent.kafka.serializers.subject を付けたものになります。

トピックまたはその他の関係に基づくグループ化

デフォルトの命名方法(TopicNameStrategy)では、トピック名に基づく名前がスキーマに使用されます。必然的に、同じトピックのメッセージはすべて、同じスキーマに準拠することが求められます。そうしない場合、未知のレコードタイプは、トピックに対する互換性チェックで違反となる可能性があります。ログに記録されたアクティビティを集約する場合や、ウェブサイトのコメントスレッドをストリーム処理する場合など、トピック名ごとにメッセージをグループ化することが理にかなうシナリオでは、この命名方法が適しています。

1 つのトピック内のレコードで複数のスキーマが使用されているなど、トピックに基づくグループ化が最善ではないユースケースでは、デフォルト以外の命名方法( RecordNameStrategyTopicRecordNameStrategy)がスキーマ管理に役立ちます。時系列で並んだ一連のイベントをデータが表しており、メッセージのデータ構造がそれぞれ異なる際に効果的です。この場合、一連のイベントの構成要素として、順に並んだ関連するメッセージを、トピック名に関係なくひとまとめにしておきたいときに、その利便性が特に発揮されます。たとえば、顧客の口座を追跡する金融サービスであれば、当座預金や貯蓄の開始、預金の作成、その後の引き出し、ローンの申請、承認などが含まれます。

各命名方法の動作

次の表はそれぞれの命名方法を比較したものです。

動作 TopicNameStrategy RecordNameStrategy TopicRecordNameStrategy
サブジェクトのフォーマット + "-key" または "-value"(構成による) <トピック名> <完全修飾レコード名> <トピック名>-<完全修飾レコード名>
トピックごとに一意のサブジェクト ×
トピック内のすべてのスキーマを対象に互換性が Schema Registry によってチェックされる ×(トピックに関係なく、同じレコード名が見つかった場合に互換性がチェックされる) ○(しかも、互換性チェックの範囲は特定のトピック内の特定のレコード名であるため、同じレコード名で、かつ互いに互換性のないバージョンでも、異なるトピック内であれば存在できる)
同じスキーマのレコードが複数のトピックに存在できる
複数のサブジェクトに同じスキーマ ID のスキーマが存在できる(スキーマが同一である場合)
同じレコードタイプの複数のスキーマが 1 つのトピックに存在できる(つまり、スキーマ進化に対応する)
1 つのトピックに複数のレコードタイプが存在できる 一般的ではない(トピックに対する Schema Registry の互換性チェックで、未知のレコードタイプが違反となる可能性があるため)
クライアントアプリケーションの設定変更が必要 ×(すべてのクライアントで既にデフォルトになっているため)
レプリケートされて名前が変更された(つまり topic.rename.format を使用して Replicator が構成されている)トピックに対して同じサブジェクトを再利用できる ×(新しいトピック名でサブジェクトを手動で登録する必要がある) ×(新しいトピック名でサブジェクトを手動で登録する必要がある)

制限

  • Go クライアントには現在、Schema Registry 統合はありません。
  • ksqlDB で使用されるのは、デフォルトの TopicNameStrategy のみです。現時点では、1 つのトピックで複数のスキーマを使用することはできません。
  • ユーザーインターフェイスからスキーマを表示したり編集したりする Confluent Control Center のオプションは、デフォルトの TopicNameStrategy を使用するスキーマでのみ利用できます。

構成の詳細

Kafka のシリアライザーとデシリアライザーでは、スキーマの登録時または取得時に、対応するサブジェクト名として <topicName>-Key<topicName>-value がデフォルトで使用されます。

この動作は、次の構成を使用して変更できます。

key.subject.name.strategy

キースキーマを Schema Registry に登録する際に使用されるサブジェクト名の構築方法を決定します。

io.confluent.kafka.serializers.subject.strategy.SubjectNameStrategy の任意の実装を指定できます。デフォルトでは、<topic>-key がサブジェクトとして使用されます。io.confluent.kafka.serializers.subject.SubjectNameStrategy の実装を指定することは 4.1.3 時点で非推奨となっており、使用した場合、パフォーマンスが低下することがあります。

  • 型: class
  • デフォルト: class io.confluent.kafka.serializers.subject.TopicNameStrategy
  • 重要度: 中
value.subject.name.strategy

値スキーマを Schema Registry に登録する際に使用されるサブジェクト名の構築方法を決定します。

io.confluent.kafka.serializers.subject.strategy.SubjectNameStrategy の任意の実装を指定できます。デフォルトでは、<topic>-value がサブジェクトとして使用されます。io.confluent.kafka.serializers.subject.SubjectNameStrategy の実装を指定することは 4.1.3 時点で非推奨となっており、使用した場合、パフォーマンスが低下することがあります。

  • 型: class
  • デフォルト: class io.confluent.kafka.serializers.subject.TopicNameStrategy
  • 重要度: 中

その他、標準で構成できるオプションの例を次に示します。

io.confluent.kafka.serializers.subject.RecordNameStrategy
Kafka にパブリッシュされたどのレコードタイプも、(トピックに関係なく)完全修飾レコード名でスキーマがレジストリに登録されます。この命名方法では、トピック間の互換性チェックが実行されないので、複数の異なるレコードタイプを 1 つのトピックに混在させることができます。代わりに、トピックに 関係なく、同じレコード名が見つかった場合に互換性がチェックされます。
io.confluent.kafka.serializers.subject.TopicRecordNameStrategy
Kafka のトピック <topicName> にパブリッシュされたどのレコードタイプも、サブジェクト名 <topicName>-<recordName> でスキーマがレジストリに登録されます(<recordName> は完全修飾レコード名)。この命名方法では、トピック間の互換性チェックが実行されないので、複数の異なるレコードタイプを 1 つのトピックに混在させることができます。しかも、互換性チェックの範囲は特定のトピック内の特定のレコード名であるため、同じレコード名で、かつ互いに互換性のないバージョンでも、異なるトピック内であれば存在できます。

Protobuf は、スキーマ参照を自動登録する唯一のフォーマットであるため、自動登録されたスキーマ参照の命名方法を指定するための特別な構成が Protobuf には用意されています。Avro と JSON Schema の場合、参照は手動で登録されるのが一般的であるため、サブジェクト名はいつでもユーザーが選択できます。この Protobuf の動作は、次の構成を使用して変更できます。

reference.subject.name.strategy
io.confluent.kafka.serializers.subject.strategy.ReferenceSubjectNameStrategy の任意の実装を指定できます。デフォルトは DefaultReferenceSubjectNameStrategy で、参照名がサブジェクトとして使用されます。
  • 型: class
  • デフォルト: class io.confluent.kafka.serializers.subject.strategy.DefaultReferenceSubjectNameStrategy
  • 重要度: 中

互換性チェック

スキーマ進化と互換性 のために、すべてのスキーマ形式について次の互換性レベルを定義できます。

  • BACKWARD
  • FORWARD
  • FULL
  • BACKWARD_TRANSITIVE
  • FORWARD_TRANSITIVE
  • FULL_TRANSITIVE
  • NONE

ParsedSchema のメソッドの 1 つに isBackwardCompatible`(ParsedSchema previousSchema) があります。このメソッドを使用してスキーマタイプの後方互換性が定義されていれば、そこから他のタイプの互換性を導くことができます。

Avro のルールについては、Avro 仕様の「Schema Resolution」に詳しく説明されています。

Protobuf の後方互換性ルールは、Protobuf 言語仕様に基づいています。それらのルールを次に示します。

  • フィールドを追加できます。デフォルトでは、Protobuf におけるすべてのフィールドが省略可能です。デフォルトを指定した場合、後方互換性を確保するためにそれらが使用されます。
  • フィールドは削除できます。フィールド番号は、同じ型の新しいフィールドで再利用できます。フィールド番号は、異なる型の新しいフィールドで再利用できません。
  • int32 型、uint32 型、int64 型、uint64 型、bool 型は互換性があります(同じフィールド内で置き換えることができます)。
  • sint32 型と sint64 型は互換性があります(同じフィールド内で置き換えることができます)。
  • string 型と bytes 型は互換性があります(同じフィールド内で置き換えることができます)。
  • fixed32 型と sfixed32 型は互換性があります(同じフィールド内で置き換えることができます)。
  • fixed64 型と sfixed64 型は互換性があります(同じフィールド内で置き換えることができます)。
  • enum 型は、int32uint32int64uint64 と互換性があります(同じフィールド内で置き換えることができます)。
  • 単一値を新しい oneof のメンバーに変更できます(互換性があります)。

JSON Schema の後方互換性ルールはやや複雑であるため、JSON Schema の詳細解説の最後のセクション「JSON Schema の互換性ルール」で取り上げています。

メッセージの JSON エンコーディングとコマンドラインユーティリティ

Avro と Protobuf はどちらも、各スキーマ形式のメッセージをエンコードする際に、人が判読できる JSON を使用するか、格納効率に優れたバイナリフォーマットを使用するかを選択できるようになっています。この点は、それぞれの仕様に記載されています。

これらの JSON エンコーディングは、コマンドラインユーティリティから利用できます(REST Proxy および Confluent Control Center でも利用可能)。

Protobuf のコマンドラインプロデューサーを起動するには、次のコマンドを実行します。

kafka-protobuf-console-producer --broker-list localhost:9092 --topic t1 --property value.schema='message Foo { required string f1 = 1; }'

Protobuf のコマンドラインコンシューマーを起動するには、(別のターミナルから)次のコマンドを実行します。

kafka-protobuf-console-consumer --topic t1 --bootstrap-server localhost:9092

これで、{ “f1” : “some-value” } 形式の JSON メッセージを送信できます。

同様に、JSON Schema のコマンドラインプロデューサーを起動するには、次のコマンドを実行します。

kafka-json-schema-console-producer --broker-list localhost:9092 --topic t2 --property value.schema='{"type":"object","properties":{"f1":{"type":"string"}}}'

JSON Schema のコマンドラインコンシューマーを起動するには、次のコマンドを実行します。

kafka-json-schema-console-consumer --topic t2 --bootstrap-server localhost:9092

{ “f1” : “some-value” } 形式の JSON メッセージを送信できます。

プロデューサーには、<key.refs> または <value.refs> として参照を渡すこともできます。その例を次に示します。

--property value.refs=’[ { “name”: “myName”, “subject”: “mySubject”, “version”: 1 } ]’.

これらのコマンドラインユーティリティの使用例は、各フォーマットの「使用例」のセクションでもご覧いただけます。

プロデューサーとコンシューマーのための基本認証セキュリティ

Schema Registry には、基本認証ヘッダーを使用してリクエストを認証する機能が備わっています。基本認証ヘッダーは、プロデューサーまたはコンシューマーの例で、次の構成を設定することによって送信できます。

basic.auth.credentials.source

基本認証ヘッダーの認証情報の選択方法を指定します。サポートされる値は URL、USER_INFO、SASL_INHERIT です。

  • 型: string
  • デフォルト: "URL"
  • 重要度: 中

URL: ユーザー情報は、http://<username> :<password>@sr-host:<sr-port> という形式で schema.registry.url に含める形で構成します。

USER_INFO: ユーザー情報の構成には、次の構成を使用します。

basic.auth.user.info

基本認証のユーザー情報は、{username} :{password} という形式で指定します。

  • 型: password
  • デフォルト: ""
  • 重要度: 中

SASL_INHERIT: SASL SCRAM または SASL PLAIN を使用したブローカーとの通信に使用されている Kafka クライアントの設定を継承します。

wire フォーマット

ほとんどの場合、シリアライザーとフォーマッターは、メッセージがバイトにマッピングされるしくみの細部を意識する必要なく、そのまま使用できます。ただし、ご使用の言語のシリアライザーがまだ Confluent によって開発されていない場合や、単純に Confluent Platform の動作を詳しく知りたい場合には、以下の情報をご覧ください。低レベルのバイトに対するデータのマッピングについて詳しく説明しています。

現在、wire フォーマットに含まれるのは、次に示す数個のコンポーネントだけです。

バイト 領域 説明
0 マジックバイト Confluent のシリアル化フォーマットのバージョン番号(現在は常に 0)。
1-4 スキーマ ID Schema Registry から返される 4 バイトのスキーマ ID。
5-... データ 特定のスキーマ形式( Avro または Protocol Buffers のバイナリエンコーディングなど)用にシリアル化されたデータ。唯一の例外は生のバイトで、特別なエンコーディングをせずに直接書き込まれます。

重要

  • すべてのコンポーネントは、ネットワークの標準的なバイトオーダーであるビッグエンディアンの並び順でエンコードされます。
  • wire フォーマットは、Kafka のメッセージキーとメッセージ値の両方に適用されます。

Protobuf のシリアル化フォーマットでは、magic-byteschema-id の後にメッセージインデックスのリストが追加されます。つまり、Protobuf のシリアル化フォーマットは次のようになります。

magic-byte, schema-id, message-indexes, protobuf-payload

message-indexes は、メッセージタイプ(ネスト可能)に対応するインデックスの配列です。[0, 2] という配列は、最上位の最初のメッセージタイプにネストされた 3 番目のメッセージタイプを表します。同様に、[1, 5, 3] は、最上位の 2 番目のメッセージタイプの 6 番目のメッセージタイプの 4 番目のメッセージタイプを表します。

メッセージインデックスは、Avro と同様に(Avro 仕様の「Binary Encoding」を参照)、可変長のジグザグエンコーディングを使用して int としてエンコードされ、先頭に配列の長さ(同じく可変長、『Zigzag encoded』を参照)を表すプレフィックスが追加されます。したがって、上記の配列 [0, 2] は可変長の整数 2,0,2 にエンコードされ、最初の 2 は長さを表します。ほとんどの場合、実際のメッセージタイプは先頭のメッセージタイプだけ(配列 [0])です。これは通常であれば 1,0 にエンコードされますが(1 は長さ)、特例として 0 に最適化されます。このため、先頭のメッセージタイプが使用される最も一般的な状況では、単一の 0 がメッセージインデックスとしてエンコードされます。

Confluent Platform バージョン間における互換性の保証

Confluent Platform のシリアライザーによって使用されるシリアル化フォーマットについては、複数のメジャーリリースにわたる安定性が保証されています。事前の警告なく変更が加えられることはありません。シリアル化フォーマットはパーティション間のキーのマッピング方法に影響するため、このことはきわめて重要です。多くのアプリケーションは、同じ 論理 フォーマットを持つキーが同じ物理パーティションにルーティングされるという事実に依存しているため、シリアル化データの物理 バイト フォーマットが不意に変化しないことが、通常、アプリケーションにとって重要です。パーティションへのメッセージのルーティングはキーのハッシュに基づくため、ごく小さな変更でも、同じ 論理キー のレコードが異なるパーティションにルーティングされてしまう原因になります。

新しいフォーマットでシリアライザーがアップデートされても差違が生じないよう、シリアライザーの出力フォーマットのアップデートは、きわめて慎重に行われます。クライアントに安定性を保証するため、Confluent Platform とそのシリアライザーは以下の点を保証しています。

  • Confluent Platform の複数の メジャーリリース にわたって発出される大々的な警告なしにフォーマット(マジックバイトを含む)が変更されることはありません。新しい機能がデフォルトで採用されるよう、まれにデフォルトが変更されることはありますが、その変更も "非常に" 慎重に行われます。変更は、少なくとも 1 回のメジャーリリースを挟んで行われます。その間に、予期せぬ移行の必要性にユーザーが驚かないよう、関連する変更についてユーザーに向けた警告が発出されます。互換性に影響する重大な変更の場合は少なくとも 1 メジャーリリース分の、また互換性を失うレベルの変更については 2 メジャーリリース分の警告を挟むことが保証されています。
  • マジックバイトで指定されたバージョン内で、後方互換性を損なうような変更がフォーマットに加えられることはありません。すべての変更について完全な後方互換性が確保され、リリースノートに文書化されます。下流での特別な対応を要するシリアル化機能が新たに導入される場合、少なくとも 1 バージョンの期間にわたって警告が発出されます。
  • 逆シリアル化は、複数のメジャーリリースにわたってサポートされます。無限のサポートが保証されるわけではありませんが、互換性が失われることに対する特筆すべき理由がない限り、以前のフォーマットを逆シリアル化する機能は無期限にサポートされます。

互換性またはサポートの詳細については、コミュニティのメーリングリスト にお問い合わせください。

次のステップ