Schema Registry API の使用例¶
Looking for Schema Management Confluent Cloud docs? You are currently viewing Confluent Platform documentation. If you are looking for Confluent Cloud docs, check out Schema Management on Confluent Cloud.
このセクションでは、curl コマンドを使用して Schema Registry API を呼び出す例を紹介しています。
また、「Schema Registry のチュートリアル」でも、curl を使用してこれらの API と対話する例をいくつか確認できます。
API のテストを行う場合、curl の --silent
フラグに jq を組み合わせてみてください。指定したコマンドから、きれいに書式設定された出力結果が得られます。たとえば、curl -X GET http://localhost:8081/subjects
というコマンドを実行すると、次の結果が返されます。
["my-cool-topic-value","my-other-cool-topic-value"]
一方、サイレントモードの curl を使用し、同じコマンドをパイプで jq に連結した場合(curl --silent -X GET http://localhost:8081/subjects | jq
)は、次の結果が返されます。
"my-cool-topic-value",
"my-other-cool-topic-value"
Schema Registry の起動¶
Schema Registry とその依存サービスである ZooKeeper および Kafka を起動します。各サービスの構成は、etc
下の対応するプロパティファイルから読み取られます。
開発またはテスト環境¶
- 前提条件
- Confluent Platform
- Confluent CLI (個別のインストールが必要)
Schema Registry とその依存サービスを起動するには、次のように Confluent CLI confluent local コマンドを使用します。
confluent local services schema-registry start
重要
confluent local コマンドは、単一ノードの開発環境向けであり、本稼働環境には適していません。生成されるデータは一過性で、一時的なものです。本稼働環境対応のワークフローについては、「Confluent Platform のインストールおよびアップグレード」を参照してください。
本稼働環境¶
各 Confluent Platform サービスを個別のターミナルで起動します。次の操作手順に従ってください。
ZooKeeper を起動します。このコマンドは個別のターミナルで実行します。
bin/zookeeper-server-start ./etc/kafka/zookeeper.properties
Kafka を起動します。このコマンドは個別のターミナルで実行します。
bin/kafka-server-start ./etc/kafka/server.properties
Schema Registry を起動します。このコマンドは個別のターミナルで実行します。
bin/schema-registry-start ./etc/schema-registry/schema-registry.properties
これらのサービスを開始する方法について詳しくは、「オンプレミスのデプロイ」を参照してください。
Schema Registry API の一般的な使用例¶
参考
- Schema Registry のチュートリアル
- フォーマット、シリアライザー、逆シリアライザー
- Schema Registry API リファレンス
- セキュリティが構成された Schema Registry を使用する詳細な例については、「Confluent Platform のデモ」を参照してください。
これらの例では、curl のコマンドを使用して Schema Registry Schema Registry API を操作します。
シェルにコマンドをコピーアンドペーストしやすいよう、コマンドと結果は別々に掲載しています。
Confluent Cloud の Schema Registry では、curl コマンドの --user
(または -u
)フラグで API キーとシークレットを渡します。たとえば、レジストリ内のすべてのサブジェクトを表示するには、次のように入力します。
curl --user <schema-registry-api-key>:<schema-registry-api-secret> \
<schema-registry-url>/subjects
新しいバージョンのスキーマをサブジェクト "Kafka-key" で登録する¶
curl -X POST -H "Content-Type: application/vnd.schemaregistry.v1+json" \
--data '{"schema": "{\"type\": \"string\"}"}' \
http://localhost:8081/subjects/Kafka-key/versions
結果の例:
{"id":1}
新しいバージョンのスキーマをサブジェクト "Kafka-value" で登録する¶
curl -X POST -H "Content-Type: application/vnd.schemaregistry.v1+json" \
--data '{"schema": "{\"type\": \"string\"}"}' \
http://localhost:8081/subjects/Kafka-value/versions
結果の例:
{"id":1}
既存のスキーマを新しいサブジェクト名に登録する¶
ユースケース: Kafka1
というサブジェクトに登録されたスキーマが既に存在します。これと同じスキーマを他のサブジェクト(Kafka2
)から利用できるようにする必要があります。次に示した 1 行のコマンドは、Kafka1-value
から既存のスキーマを読み取って、Kafka2-value
に登録するものです。ご利用のマシンにツール jq
がインストールされていることを想定しています。
curl -X POST -H "Content-Type: application/vnd.schemaregistry.v1+json" \
--data "{\"schema\": $(curl -s http://localhost:8081/subjects/Kafka1-value/versions/latest | jq '.schema')}" \
http://localhost:8081/subjects/Kafka2-value/versions
結果の例:
{"id":1}
ちなみに
You do not need to use the AvroConverter for topic replication or
schema management, even if the topic is Avro format. The ByteArrayConverter
retains
the "magic byte", which is the schema ID. When a replicator is created, messages are
replicated with the schema ID. You do not need to create a schema subject. A best practice
is to avoid the use of Avro due to the overhead, as there is no real value to it in this context.
詳細については、「チュートリアル: クラスターの境界を越えたデータのレプリケーション」(「Replicator の構成および実行」のサブセクションにある topic.rename.format=${topic}.replica
の使用など)と「Replicator とクラスター間フェイルオーバー」を参照してください。
すべてのサブジェクトを一覧表示する¶
次の API 呼び出しは、すべてのスキーマサブジェクトのリストを表示します。
curl -X GET http://localhost:8081/subjects
結果の例:
["Kafka-value","Kafka-key"]
リクエストの末尾に deleted
フラグを追加すると、論理的に削除されたサブジェクトを含めてすべてのサブジェクトを表示できます(?deleted=true
)。
curl -X GET http://localhost:8081/subjects?deleted=true
"my-cool-topic-value" というスキーマサブジェクトが以前に論理的に削除されたとすると、結果の例は次のようになります。
["Kafka-value","Kafka-key","my-cool-topic-value"]
特定の ID に関連付けられているすべてのサブジェクトを一覧表示する¶
特定の ID に関連付けられているサブジェクトは、次の 2 段階のプロセスで検索します。
サブジェクトを一覧表示します。
curl -X GET http://localhost:8081/subjects
サブジェクトの一覧出力を次のように反復処理し、結果に含まれる ID を確認します。
curl -X GET http://localhost:8081/subjects/<INSERT SUBJECT NAME>-value/versions/latest
グローバルに一意の ID 1 でスキーマをフェッチする¶
curl -X GET http://localhost:8081/schemas/ids/1
結果の例:
{"schema":"\"string\""}
サブジェクト "Kafka-value" で登録されているすべてのスキーマバージョンを一覧表示する¶
curl -X GET http://localhost:8081/subjects/Kafka-value/versions
結果の例:
[1]
サブジェクト "Kafka-value" で登録されているスキーマのバージョン 1 をフェッチする¶
curl -X GET http://localhost:8081/subjects/Kafka-value/versions/1
結果の例:
{"subject":"Kafka-value","version":1,"id":1,"schema":"\"string\""}
ちなみに
スキーマタイプが JSON スキーマ、または Protobuf の場合、応答にはスキーマタイプも含まれます。スキーマタイプが Avro(デフォルト)の場合は、上の例に示したように、応答にスキーマタイプは含まれません。
サブジェクト "Kafka-value" で登録されているスキーマのバージョン 1 を削除する¶
curl -X DELETE http://localhost:8081/subjects/Kafka-value/versions/1
結果の例:
1
サブジェクト "Kafka-value" で直近に登録されたスキーマを削除する¶
curl -X DELETE http://localhost:8081/subjects/Kafka-value/versions/latest
結果の例:
2
同じスキーマをサブジェクト "Kafka-value" で登録する¶
curl -X POST -H "Content-Type: application/vnd.schemaregistry.v1+json" \
--data '{"schema": "{\"type\": \"string\"}"}' \
http://localhost:8081/subjects/Kafka-value/versions
結果の例:
{"id":1}
グローバルに一意の ID 1 で再度スキーマをフェッチする¶
curl -X GET http://localhost:8081/schemas/ids/1
結果の例:
{"schema":"\"string\""}
スキーマがサブジェクト "Kafka-key" で登録されているかどうかを確認する¶
curl -X POST -H "Content-Type: application/vnd.schemaregistry.v1+json" \
--data '{"schema": "{\"type\": \"string\"}"}' \
http://localhost:8081/subjects/Kafka-key
結果の例:
{"subject":"Kafka-key","version":3,"id":1,"schema":"\"string\""}
サブジェクト "Kafka-value" の最新のスキーマとの間でスキーマの互換性をテストする¶
curl -X POST -H "Content-Type: application/vnd.schemaregistry.v1+json" \
--data '{"schema": "{\"type\": \"string\"}"}' \
http://localhost:8081/compatibility/subjects/Kafka-value/versions/latest
結果の例:
{"is_compatible":true}
互換性の要件をグローバルにアップデートする¶
curl -X PUT -H "Content-Type: application/vnd.schemaregistry.v1+json" \
--data '{"compatibility": "NONE"}' \
http://localhost:8081/config
結果の例:
{"compatibility":"NONE"}
新しいトピックのスキーマを登録する¶
ちなみに
この例とそれに続くいくつかの例では、my-kafka
という新しいトピックに言及しています。サブジェクトレベルの互換性構成を示す際にこのトピックを使用します。これらの例では、Confluent Control Center または Kafka コマンドラインを使用して、このトピックが既に作成されていることを前提としています。あくまでコマンドラインを使用して、デモに必要なトピックをここで作成する場合は、次のような Kafka コマンドでトピックを作成した後、その存在を確認してください。
kafka-topics --create --bootstrap-server localhost:9092 --partitions 1 --replication-factor 1 --topic my-kafka
kafka-topics --list --bootstrap-server localhost:9092
Schema Registry API を使用して、トピック my-kafka
のスキーマを追加します。
curl -X POST -H "Content-Type: application/vnd.schemaregistry.v1+json" --data '{"schema": "{\"type\":\"record\",\"name\":\"Payment\",\"namespace\":\"my.examples\",\"fields\":[{\"name\":\"id\",\"type\":\"string\"},{\"name\":\"amount\",\"type\":\"double\"}]}"}' http://localhost:8081/subjects/my-kafka-value/versions
結果の例:
{"id":1}
サブジェクトの互換性の要件をアップデートする¶
curl -X PUT -H "Content-Type: application/vnd.schemaregistry.v1+json" --data '{"compatibility": "FULL"}' http://localhost:8081/config/my-kafka-value
結果の例:
{"compatibility":"FULL"}
サブジェクトの互換性の要件を取得する¶
curl -X GET http://localhost:8081/config/my-kafka-value
結果の例:
{"compatibilityLevel":"FULL"}
ちなみに
問い合わせの対象となるサブジェクトにサブジェクト固有の互換性レベルが設定されていない場合、コマンドからエラーコードが返されます。たとえば、サブジェクト固有の互換性が設定されていない Kafka-value
サブジェクトに対して同じコマンドを実行すると、次の結果が返されます。
{"error_code":40401,"message":"Subject 'Kafka-value' not found."}
サブジェクトに対して有効になっている互換性要件を表示する¶
サブジェクトレベルでどの互換性要件(存在する場合)が設定され、互換性チェックにどの要件が使用されるかは、defaultToGlobal
フラグを使用して調べることができます。サブジェクトレベルの互換性がサブジェクトに設定されている場合、それらはえてして同じですが、必ず同じというわけではありません。
サブジェクト固有の互換性が "FULL" に設定されているサブジェクト
my-kafka-value
の場合、defaultToGlobal=true
とdefaultToGlobal=false
の "両方" で{"compatibilityLevel":"FULL"}
が返されます。curl -X GET http://localhost:8081/config/my-kafka-value/?defaultToGlobal=true
結果の例:
{"compatibilityLevel":"FULL"}
curl -X GET http://localhost:8081/config/my-kafka-value/?defaultToGlobal=false
結果の例:
{"compatibilityLevel":"FULL"}
サブジェクト固有の互換性が設定されていない
Kafka-value
サブジェクトの場合、defaultToGlobal=true
からは現在のグローバルデフォルトが返されます(例:{"compatibilityLevel":"NONE"}
)。curl -X GET http://localhost:8081/config/Kafka-value/?defaultToGlobal=true
結果の例:
{"compatibilityLevel":"NONE"}
一方、サブジェクト
Kafka-value
に対するdefaultToGlobal=false
からは、エラーコードが返されます。curl -X GET http://localhost:8081/config/Kafka-value/?defaultToGlobal=false
結果の例:
{"error_code":40401,"message":"Subject 'Kafka-value' not found."}
サブジェクト "Kafka-value" で登録されているすべてのスキーマバージョンを削除する¶
curl -X DELETE http://localhost:8081/subjects/Kafka-value
結果の例:
[3]
Schema Registry に現在登録されているスキーマタイプを一覧表示する¶
curl -X GET http://localhost:8081/schemas/types
結果の例:
["JSON", "PROTOBUF", "AVRO"]
特定の ID が使用されているサブジェクトとバージョンのペアをすべて一覧表示する¶
curl -X GET http://localhost:8081/schemas/ids/2/versions
結果の例:
[{"subject":"testproto-value","version":1}]
特定のスキーマを参照するスキーマの ID を一覧表示する¶
curl -X GET http://localhost:8081/subjects/other.proto/versions/1/referencedby
結果の例:
[2]
HTTPS での Schema Registry の使用¶
前述の curl コマンド例で紹介したのは、HTTP を使用して Schema Registry とやり取りする方法です。
以下の例では、HTTPS を使用して Schema Registry と通信を行う方法を紹介しています。そのいくつかの例から、さらに別のコマンドを実行する方法も見えてきます。HTTPS が有効な状態で Schema Registry と通信を行い、各 curl コマンドに示したパターン(証明書、キーなどの指定)を適用すれば、既に取り上げた他の使用例にも対応できます。セキュリティが有効な状態で Schema Registry を構成したり使用したりする方法について詳しくは、「Schema Registry のセキュリティの概要」を参照してください。
Schema Registry の HTTPS を確認する¶
openssl s_client -connect schemaregistry:8082/subjects -cert client.certificate.pem -key client.key -tls1
新しいバージョンのスキーマをサブジェクト "Kafka-key" で登録する¶
curl -v -X POST -H "Content-Type: application/vnd.schemaregistry.v1+json" --data '{"schema": "{\"type\": \"string\"}"}' --cert /etc/kafka/secrets/client.certificate.pem --key /etc/kafka/secrets/client.key --tlsv1.2 --cacert /etc/kafka/secrets/snakeoil-ca-1.crt https://schemaregistry:8082/subjects/Kafka-key/versions
すべてのサブジェクトを一覧表示する¶
curl -v -X GET --cert /etc/kafka/secrets/client.certificate.pem --key /etc/kafka/secrets/client.key --tlsv1.2 --cacert /etc/kafka/secrets/snakeoil-ca-1.crt https://schemaregistry:8082/subjects/
Confluent Cloud の Schema Registry に curl を使用してアクセスする方法¶
Confluent Cloud 上のスキーマは、curl コマンドを使用して表示したり管理したりすることもできます。
Confluent Cloud 上の Schema Registry では、--user
(または -u
)フラグを使用して API キーとシークレットを渡す必要があります。たとえば、レジストリ内のすべての サブジェクト を表示するには、次のように入力します。
curl --user <schema-registry-api-key>:<schema-registry-api-secret> \
<schema-registry-url>/subjects
Confluent Cloud における Schema Registry の使用について詳しくは、「Confluent Cloud におけるスキーマ管理のクイックスタート」を参照してください。