Schema Registry API の使用例

Looking for Schema Management Confluent Cloud docs? You are currently viewing Confluent Platform documentation. If you are looking for Confluent Cloud docs, check out Schema Management on Confluent Cloud.

このセクションでは、curl コマンドを使用して Schema Registry API を呼び出す例を紹介しています。

また、「Schema Registry のチュートリアル」でも、curl を使用してこれらの API と対話する例をいくつか確認できます。

API のテストを行う場合、curl--silent フラグに jq を組み合わせてみてください。指定したコマンドから、きれいに書式設定された出力結果が得られます。たとえば、curl -X GET http://localhost:8081/subjects というコマンドを実行すると、次の結果が返されます。

["my-cool-topic-value","my-other-cool-topic-value"]

一方、サイレントモードの curl を使用し、同じコマンドをパイプで jq に連結した場合(curl --silent -X GET http://localhost:8081/subjects | jq)は、次の結果が返されます。

"my-cool-topic-value",
"my-other-cool-topic-value"

Schema Registry の起動

Schema Registry とその依存サービスである ZooKeeper および Kafka を起動します。各サービスの構成は、etc 下の対応するプロパティファイルから読み取られます。

開発またはテスト環境

前提条件

Schema Registry とその依存サービスを起動するには、次のように Confluent CLI confluent local コマンドを使用します。

confluent local services schema-registry start

重要

confluent local コマンドは、単一ノードの開発環境向けであり、本稼働環境には適していません。生成されるデータは一過性で、一時的なものです。本稼働環境対応のワークフローについては、「Confluent Platform のインストールおよびアップグレード」を参照してください。

本稼働環境

各 Confluent Platform サービスを個別のターミナルで起動します。次の操作手順に従ってください。

  1. ZooKeeper を起動します。このコマンドは個別のターミナルで実行します。

    bin/zookeeper-server-start ./etc/kafka/zookeeper.properties
    
  2. Kafka を起動します。このコマンドは個別のターミナルで実行します。

    bin/kafka-server-start ./etc/kafka/server.properties
    
  3. Schema Registry を起動します。このコマンドは個別のターミナルで実行します。

    bin/schema-registry-start ./etc/schema-registry/schema-registry.properties
    

これらのサービスを開始する方法について詳しくは、「オンプレミスのデプロイ」を参照してください。

Schema Registry API の一般的な使用例

参考

これらの例では、curl のコマンドを使用して Schema Registry Schema Registry API を操作します。

シェルにコマンドをコピーアンドペーストしやすいよう、コマンドと結果は別々に掲載しています。

Confluent Cloud の Schema Registry では、curl コマンドの --user (または -u)フラグで API キーとシークレットを渡します。たとえば、レジストリ内のすべてのサブジェクトを表示するには、次のように入力します。

curl --user <schema-registry-api-key>:<schema-registry-api-secret> \
<schema-registry-url>/subjects

新しいバージョンのスキーマをサブジェクト "Kafka-key" で登録する

curl -X POST -H "Content-Type: application/vnd.schemaregistry.v1+json" \
  --data '{"schema": "{\"type\": \"string\"}"}' \
  http://localhost:8081/subjects/Kafka-key/versions

結果の例:

{"id":1}

新しいバージョンのスキーマをサブジェクト "Kafka-value" で登録する

curl -X POST -H "Content-Type: application/vnd.schemaregistry.v1+json" \
--data '{"schema": "{\"type\": \"string\"}"}' \
http://localhost:8081/subjects/Kafka-value/versions

結果の例:

{"id":1}

既存のスキーマを新しいサブジェクト名に登録する

ユースケース: Kafka1 というサブジェクトに登録されたスキーマが既に存在します。これと同じスキーマを他のサブジェクト(Kafka2)から利用できるようにする必要があります。次に示した 1 行のコマンドは、Kafka1-value から既存のスキーマを読み取って、Kafka2-value に登録するものです。ご利用のマシンにツール jq がインストールされていることを想定しています。

curl -X POST -H "Content-Type: application/vnd.schemaregistry.v1+json" \
--data "{\"schema\": $(curl -s http://localhost:8081/subjects/Kafka1-value/versions/latest | jq '.schema')}" \
http://localhost:8081/subjects/Kafka2-value/versions

結果の例:

{"id":1}

ちなみに

You do not need to use the AvroConverter for topic replication or schema management, even if the topic is Avro format. The ByteArrayConverter retains the "magic byte", which is the schema ID. When a replicator is created, messages are replicated with the schema ID. You do not need to create a schema subject. A best practice is to avoid the use of Avro due to the overhead, as there is no real value to it in this context.

詳細については、「チュートリアル: クラスターの境界を越えたデータのレプリケーション」(「Replicator の構成および実行」のサブセクションにある topic.rename.format=${topic}.replica の使用など)と「Replicator とクラスター間フェイルオーバー」を参照してください。

すべてのサブジェクトを一覧表示する

次の API 呼び出しは、すべてのスキーマサブジェクトのリストを表示します。

curl -X GET http://localhost:8081/subjects

結果の例:

["Kafka-value","Kafka-key"]

リクエストの末尾に deleted フラグを追加すると、論理的に削除されたサブジェクトを含めてすべてのサブジェクトを表示できます(?deleted=true)。

curl -X GET http://localhost:8081/subjects?deleted=true

"my-cool-topic-value" というスキーマサブジェクトが以前に論理的に削除されたとすると、結果の例は次のようになります。

["Kafka-value","Kafka-key","my-cool-topic-value"]

特定の ID に関連付けられているすべてのサブジェクトを一覧表示する

特定の ID に関連付けられているサブジェクトは、次の 2 段階のプロセスで検索します。

  1. サブジェクトを一覧表示します。

    curl -X GET http://localhost:8081/subjects
    
  2. サブジェクトの一覧出力を次のように反復処理し、結果に含まれる ID を確認します。

    curl -X GET http://localhost:8081/subjects/<INSERT SUBJECT NAME>-value/versions/latest
    

グローバルに一意の ID 1 でスキーマをフェッチする

curl -X GET http://localhost:8081/schemas/ids/1

結果の例:

{"schema":"\"string\""}

サブジェクト "Kafka-value" で登録されているすべてのスキーマバージョンを一覧表示する

curl -X GET http://localhost:8081/subjects/Kafka-value/versions

結果の例:

[1]

サブジェクト "Kafka-value" で登録されているスキーマのバージョン 1 をフェッチする

curl -X GET http://localhost:8081/subjects/Kafka-value/versions/1

結果の例:

{"subject":"Kafka-value","version":1,"id":1,"schema":"\"string\""}

ちなみに

スキーマタイプが JSON スキーマ、または Protobuf の場合、応答にはスキーマタイプも含まれます。スキーマタイプが Avro(デフォルト)の場合は、上の例に示したように、応答にスキーマタイプは含まれません。

サブジェクト "Kafka-value" で登録されているスキーマのバージョン 1 を削除する

curl -X DELETE http://localhost:8081/subjects/Kafka-value/versions/1

結果の例:

1

サブジェクト "Kafka-value" で直近に登録されたスキーマを削除する

curl -X DELETE http://localhost:8081/subjects/Kafka-value/versions/latest

結果の例:

2

同じスキーマをサブジェクト "Kafka-value" で登録する

curl -X POST -H "Content-Type: application/vnd.schemaregistry.v1+json" \
  --data '{"schema": "{\"type\": \"string\"}"}' \
   http://localhost:8081/subjects/Kafka-value/versions

結果の例:

{"id":1}

グローバルに一意の ID 1 で再度スキーマをフェッチする

curl -X GET http://localhost:8081/schemas/ids/1

結果の例:

{"schema":"\"string\""}

スキーマがサブジェクト "Kafka-key" で登録されているかどうかを確認する

curl -X POST -H "Content-Type: application/vnd.schemaregistry.v1+json" \
  --data '{"schema": "{\"type\": \"string\"}"}' \
  http://localhost:8081/subjects/Kafka-key

結果の例:

{"subject":"Kafka-key","version":3,"id":1,"schema":"\"string\""}

サブジェクト "Kafka-value" の最新のスキーマとの間でスキーマの互換性をテストする

curl -X POST -H "Content-Type: application/vnd.schemaregistry.v1+json" \
  --data '{"schema": "{\"type\": \"string\"}"}' \
  http://localhost:8081/compatibility/subjects/Kafka-value/versions/latest

結果の例:

{"is_compatible":true}

ちなみに

Confluent Platform 6.1.0 以降では、このリクエストの末尾に ?verbose=true を追加すると、スキーマが互換性テストで不合格になった場合に、その理由が出力されます。詳細については、Schema Registry API リファレンスの「互換性」を参照してください。

トップレベルの構成を取得する

curl -X GET http://localhost:8081/config

結果の例:

{"compatibility":"BACKWARD"}

互換性の要件をグローバルにアップデートする

curl -X PUT -H "Content-Type: application/vnd.schemaregistry.v1+json" \
  --data '{"compatibility": "NONE"}' \
  http://localhost:8081/config

結果の例:

{"compatibility":"NONE"}

新しいトピックのスキーマを登録する

ちなみに

この例とそれに続くいくつかの例では、my-kafka という新しいトピックに言及しています。サブジェクトレベルの互換性構成を示す際にこのトピックを使用します。これらの例では、Confluent Control Center または Kafka コマンドラインを使用して、このトピックが既に作成されていることを前提としています。あくまでコマンドラインを使用して、デモに必要なトピックをここで作成する場合は、次のような Kafka コマンドでトピックを作成した後、その存在を確認してください。

kafka-topics --create --bootstrap-server localhost:9092 --partitions 1 --replication-factor 1 --topic my-kafka
kafka-topics --list --bootstrap-server localhost:9092

Schema Registry API を使用して、トピック my-kafka のスキーマを追加します。

curl -X POST -H "Content-Type: application/vnd.schemaregistry.v1+json" --data '{"schema": "{\"type\":\"record\",\"name\":\"Payment\",\"namespace\":\"my.examples\",\"fields\":[{\"name\":\"id\",\"type\":\"string\"},{\"name\":\"amount\",\"type\":\"double\"}]}"}' http://localhost:8081/subjects/my-kafka-value/versions

結果の例:

{"id":1}

サブジェクトの互換性の要件をアップデートする

curl -X PUT -H "Content-Type: application/vnd.schemaregistry.v1+json" --data '{"compatibility": "FULL"}' http://localhost:8081/config/my-kafka-value

結果の例:

{"compatibility":"FULL"}

サブジェクトの互換性の要件を取得する

curl -X GET http://localhost:8081/config/my-kafka-value

結果の例:

{"compatibilityLevel":"FULL"}

ちなみに

問い合わせの対象となるサブジェクトにサブジェクト固有の互換性レベルが設定されていない場合、コマンドからエラーコードが返されます。たとえば、サブジェクト固有の互換性が設定されていない Kafka-value サブジェクトに対して同じコマンドを実行すると、次の結果が返されます。

{"error_code":40401,"message":"Subject 'Kafka-value' not found."}

サブジェクトに対して有効になっている互換性要件を表示する

サブジェクトレベルでどの互換性要件(存在する場合)が設定され、互換性チェックにどの要件が使用されるかは、defaultToGlobal フラグを使用して調べることができます。サブジェクトレベルの互換性がサブジェクトに設定されている場合、それらはえてして同じですが、必ず同じというわけではありません。

  • サブジェクト固有の互換性が "FULL" に設定されているサブジェクト my-kafka-value の場合、defaultToGlobal=truedefaultToGlobal=false の "両方" で {"compatibilityLevel":"FULL"} が返されます。

    curl -X GET http://localhost:8081/config/my-kafka-value/?defaultToGlobal=true
    

    結果の例:

    {"compatibilityLevel":"FULL"}
    
    curl -X GET http://localhost:8081/config/my-kafka-value/?defaultToGlobal=false
    

    結果の例:

    {"compatibilityLevel":"FULL"}
    
  • サブジェクト固有の互換性が設定されていない Kafka-value サブジェクトの場合、defaultToGlobal=true からは現在のグローバルデフォルトが返されます(例: {"compatibilityLevel":"NONE"})。

    curl -X GET http://localhost:8081/config/Kafka-value/?defaultToGlobal=true
    

    結果の例:

    {"compatibilityLevel":"NONE"}
    
  • 一方、サブジェクト Kafka-value に対する defaultToGlobal=false からは、エラーコードが返されます。

    curl -X GET http://localhost:8081/config/Kafka-value/?defaultToGlobal=false
    

    結果の例:

    {"error_code":40401,"message":"Subject 'Kafka-value' not found."}
    

サブジェクト "Kafka-value" で登録されているすべてのスキーマバージョンを削除する

curl -X DELETE http://localhost:8081/subjects/Kafka-value

結果の例:

[3]

Schema Registry に現在登録されているスキーマタイプを一覧表示する

curl -X GET http://localhost:8081/schemas/types

結果の例:

["JSON", "PROTOBUF", "AVRO"]

特定の ID が使用されているサブジェクトとバージョンのペアをすべて一覧表示する

curl -X GET http://localhost:8081/schemas/ids/2/versions

結果の例:

[{"subject":"testproto-value","version":1}]

特定のスキーマを参照するスキーマの ID を一覧表示する

curl -X GET http://localhost:8081/subjects/other.proto/versions/1/referencedby

結果の例:

[2]

HTTPS での Schema Registry の使用

前述の curl コマンド例で紹介したのは、HTTP を使用して Schema Registry とやり取りする方法です。

以下の例では、HTTPS を使用して Schema Registry と通信を行う方法を紹介しています。そのいくつかの例から、さらに別のコマンドを実行する方法も見えてきます。HTTPS が有効な状態で Schema Registry と通信を行い、各 curl コマンドに示したパターン(証明書、キーなどの指定)を適用すれば、既に取り上げた他の使用例にも対応できます。セキュリティが有効な状態で Schema Registry を構成したり使用したりする方法について詳しくは、「Schema Registry のセキュリティの概要」を参照してください。

Schema Registry の HTTPS を確認する

openssl s_client -connect schemaregistry:8082/subjects -cert client.certificate.pem -key client.key -tls1

新しいバージョンのスキーマをサブジェクト "Kafka-key" で登録する

curl -v -X POST -H "Content-Type: application/vnd.schemaregistry.v1+json" --data '{"schema": "{\"type\": \"string\"}"}' --cert /etc/kafka/secrets/client.certificate.pem --key /etc/kafka/secrets/client.key --tlsv1.2 --cacert /etc/kafka/secrets/snakeoil-ca-1.crt https://schemaregistry:8082/subjects/Kafka-key/versions

すべてのサブジェクトを一覧表示する

curl -v -X GET --cert /etc/kafka/secrets/client.certificate.pem --key /etc/kafka/secrets/client.key --tlsv1.2 --cacert /etc/kafka/secrets/snakeoil-ca-1.crt https://schemaregistry:8082/subjects/

Confluent Cloud の Schema Registry に curl を使用してアクセスする方法

Confluent Cloud 上のスキーマは、curl コマンドを使用して表示したり管理したりすることもできます。

Confluent Cloud 上の Schema Registry では、--user (または -u)フラグを使用して API キーとシークレットを渡す必要があります。たとえば、レジストリ内のすべての サブジェクト を表示するには、次のように入力します。

curl --user <schema-registry-api-key>:<schema-registry-api-secret> \
<schema-registry-url>/subjects

Confluent Cloud における Schema Registry の使用について詳しくは、「Confluent Cloud におけるスキーマ管理のクイックスタート」を参照してください。