Confluent Platform の Schema Linking

Looking for Schema Management Confluent Cloud docs? You are currently viewing Confluent Platform documentation. If you are looking for Confluent Cloud docs, check out Schema Management on Confluent Cloud.

Schema Registry で Schema Linking をサポートするようになりました。以下のビデオとドキュメントでは、お使いの Confluent デプロイで Schema Linking の使用を開始するために必要な概念、構成、ツールについて説明します。クイックスタートでは、エクスポーターを作成して、クラスターへの Schema Linking の実装に使用する方法を実践的な例で順を追って説明します。

ちなみに

The video below focuses on Schema Linking on Confluent Cloud, but the feature is also available on Confluent Enterprise, with Confluent Platform specific setup, commands, and workflow, as described in the Confluent Platform Schema Linking Quick Start. Schema Linking on Confluent Cloud and on Confluent Platform share the core concepts and mechanisms of schema contexts, exporters, configuration options, exporter lifecycle, and REST APIs.

Schema Linking とは

Schema Linking は、2 つの Schema Registry クラスターを同期させ続けるための機能です。Schema Linking を Cluster Linking と組み合わせると、2 つの Schema Registry および Kafka クラスターの間でスキーマとトピックデータの両方を同期し続けることができます。

../_images/schema-linking.ja.png

Schema Registry に、Schema Linking をサポートするための 2 つの新しい概念が導入されました。

  • スキーマコンテキスト - スキーマコンテキスト は Schema Registry の独立したスコープを表し、1 つの Schema Registry クラスター内に任意の数の個別の "サブレジストリ" を作成するために使用できます。各スキーマコンテキストはスキーマ ID とサブジェクト名を個別にグループ化したものであり、同じスキーマ ID を異なるコンテキストで使用できるようにすることで、完全に異なるスキーマを表します。明示的なコンテキストがないスキーマ ID またはサブジェクト名はすべてデフォルトのコンテキストに存在し、1 つのドット(.)で示されます。明示的なコンテキストはドットで始まり、さらに別のドットで区切られた部分から構成されます(例: .mycontext.subcontext)。コンテキスト名の構成は Unix の絶対パスと似ていますが、フォワードスラッシュの代わりにドットが使用されます(デフォルトのスキーマは Unix のルートパスと似ています)。ただし、プレフィックスを共有する 2 つのコンテキストの間に関係はありません。
  • スキーマエクスポーター - スキーマエクスポーター は Schema Registry に存在するコンポーネントであり、スキーマを 1 つの Schema Registry クラスターから別のクラスターにエクスポートします。スキーマエクスポーターのライフサイクルは API を介して管理されます。これらの API で、スキーマエクスポーターの作成、一時停止、再開、破棄が可能です。スキーマエクスポーターは、スキーマの変更データキャプチャーを実行できる "小型コネクター" のようなものです。

以下のクイックスタートでは、Schema Linking を実現するためにスキーマエクスポーターとスキーマコンテキストの使用を開始する方法を説明します。

これらの概念の詳細については、「スキーマコンテキスト」および「スキーマエクスポーター」を参照してください。

クイックスタート

今すぐに Schema Linking を試してみたい場合は、以下の手順に従ってください。クイックスタートの後で、コンテキスト、エクスポーター、コマンドオプション、API に関する詳細情報を紹介しています。これらの情報は、いくつかの実践的な例を試した後の方が役立つと考えられます。

ちなみに

すべてのプロパティを、以降の表に記載されているとおりに構成してください。エクスポーターの API または schema-exporter コマンドを使用するときに 404 エラーが発生した場合は、Schema Linking を有効にする Schema Registry プロパティファイルを下記のとおりに構成していないことが考えられます。

送信元環境と送信先環境をセットアップする

ZooKeeper、Confluent Server(ブローカー)、Schema Registry を 2 つずつセットアップします。これらは送信元および送信先として機能します。さらに、両方にアクセスする Confluent Control Center を 1 つセットアップします。以下の既存のプロパティファイルを、これらの新しいバージョンのファイルのベースとして使用します。

テンプレートファイル 新しいファイル
etc/kafka/zookeeper.properties zookeeper0.properties、zookeeper1.properties
etc/kafka/server.properties server0.properties、server1.properties
etc/schema-registry/schema-registry.properties schema-registry0.properties、schema-registry1.properties
etc/confluent-control-center/control-center-dev.properties control-center-multi-sr.properties

これらのファイルに、以下の構成を使用します。

ちなみに

この構成の詳細については、「マルチクラスター Schema Registry の有効化」を参照してください。

ファイル 構成
zookeeper0.properties
  • dataDir=/tmp/zookeeper/zk-0 (未使用のログ用ディレクトリ)
  • clientPort=2181zookeeper プロパティと同じ)
zookeeper1.properties
  • dataDir=/tmp/zookeeper/zk-1 (未使用のログ用ディレクトリ)
  • clientPort=2182zookeeper0 が既に 2181 を使用しているため)
server0.properties
  • broker.id=0
  • listeners=PLAINTEXT://:9092
  • log.dirs=/tmp/kafka-logs-bk-0 (未使用のログ用ディレクトリ)
  • zookeeper.connect=localhost:2181
  • confluent.http.server.listeners=http://localhost:8090
  • confluent.metrics.reporter.bootstrap.servers=localhost:9092

次の構成は、このブローカーのマルチクラスター Schema Registry セットアップに固有です。

  • confluent.http.server.listeners=http://0.0.0.0:8090
  • confluent.schema.registry.url=http://localhost:8081
server1.properties
  • broker.id=1
  • listeners=PLAINTEXT:// :9093
  • log.dirs=/tmp/kafka-logs-bk-1 (未使用のログ用ディレクトリ)
  • zookeeper.connect=localhost:2182
  • confluent.http.server.listeners=http://localhost:8090
  • confluent.metrics.reporter.bootstrap.servers=localhost:9093

次の構成は、このブローカーのマルチクラスター Schema Registry セットアップに固有です。

  • confluent.http.server.listeners=http://0.0.0.0:8091
  • confluent.schema.registry.url=http://localhost:8082
schema-registry1.properties
  • listeners=PLAINTEXT://:8082
  • kafakstore.bootstrap.servers=localhost:9093
  • kafkastore.topic=_schemas1 (レジストリで相互に上書きされないように、server0.properties 内のデフォルトの _schemas とは異なるものにする必要があります)
  • schema.registry.group.id=schema-registry-dest (デフォルト ID の schema-registry を使用する、server0.properties で使用されるグループ ID とは異なるものにする必要があります)

以下は、Schema Linking 固有の構成です。両方の Schema Registry プロパティファイルで同じ構成にする必要があります。

  • resource.extension.class=io.confluent.schema.exporter.SchemaExporterResourceExtension
  • kafkastore.update.handlers=io.confluent.schema.exporter.storage.SchemaExporterUpdateHandler
  • password.encoder.secret=mysecret
control-center-multi-sr.properties
  • bootstrap.servers=localhost:9092
  • zookeeper.connect=localhost:2181
  • confluent.controlcenter.kafka.AK1.bootstrap.servers=localhost:9093
  • confluent.controlcenter.kafka.AK1.zookeeper.connect=localhost:2182
  • confluent.controlcenter.streams.cprest.url=http://0.0.0.0:8090
  • confluent.controlcenter.schema.registry.url=http://localhost:8081
  • confluent.controlcenter.kafka.AK1.cprest.url=http://0.0.0.0:8091
  • confluent.controlcenter.schema.registry.SR-AK1.url=http://localhost:8082

cpcrest.urlconfluent.controlcenter.kafka.AK1.cprest.urlconfluent.controlcenter.schema.registry.SR-AK1.url の構成は、マルチクラスター Schema Registry に固有の新規プロパティです。

クラスターを起動する

  1. ZooKeeper を起動します。
  2. Confluent Server ブローカーを起動する
  3. Schema Registry クラスターを起動します。
  4. Confluent Control Center を起動します。

以下の起動コマンドの例では、プロパティファイルが以下に示す <path-to-confluent>/etc/ ディレクトリにあり、コマンドは <path-to-confluent> から実行していることを前提としています。

ZooKeeper の起動

./bin/zookeeper-server-start etc/kafka/zookeeper0.properties
./bin/zookeeper-server-start etc/kafka/zookeeper1.properties

Kafka ブローカーの起動

./bin/kafka-server-start etc/kafka/server0.properties
./bin/kafka-server-start etc/kafka/server1.properties

スキーマレジストリクラスターの起動

./bin/schema-registry-start etc/schema-registry/schema-registry0.properties
./bin/schema-registry-start etc/schema-registry/schema-registry1.properties

Control Center の起動

./bin/control-center-start etc/confluent-control-center/control-center-multi-sr.properties

送信元でスキーマを作成する

送信元環境では、少なくとも 2 つまたは 3 つのスキーマを作成します。そのうち 1 つ以上に修飾サブジェクト名を指定します。

  1. Confluent Control Center でトピックとそれに関連するスキーマを作成します(Control Center は http://localhost:9021/ で実行されます)。

    これにより、命名スキーム <topic>-value でスキーマサブジェクトを生成します(Control Center から修飾サブジェクトを作成することはできません。このため Schema Registry API を使用します)。

  2. :.<context-name>:<subject-name> という構文を使用して、特定のコンテキストにおいて修飾サブジェクト名と非修飾サブジェクト名の両方でスキーマを作成します。

    • 非修飾サブジェクト名でスキーマを作成するには、単純に coffeedonuts などの名前を指定します。
    • 特定のコンテキストにおいて修飾サブジェクト名でスキーマを作成するには、REST API:.<context-name>:<subject-name> という構文を使用します。たとえば、:.snowcones:sales:.burgers:locations のように指定します。

    Schema Registry API を使用して非修飾サブジェクト名でスキーマを作成する例を以下に示します。

    curl -v -X POST -H "Content-Type: application/json" --data @/path/to/test.avro <source sr url>/subjects/donuts/versions
    

    上記の curl コマンドは、次の Avro スキーマを含むファイルを呼び出します。

    {
          "schema":
            "{
                   \"type\": \"record\",
                   \"connect-name\": \"myname\",
                   \"connect-donuts\": \"mydonut\",
                   \"name\": \"test\",
                   \"doc\": \"some doc info\",
                     \"fields\":
                       [
                         {
                           \"type\": \"string\",
                           \"doc\": \"doc for field1\",
                           \"name\": \"field1\"
                         },
                         {
                           \"type\": \"int\",
                           \"doc\": \"doc for field2\",
                           \"name\": \"field2\"
                         }
                       ]
                   }"
         }
    
  3. Schema Registry API を使用し、プレフィックスを渡して送信元のサブジェクトのリストを表示します。

    curl --silent -X GET <source sr url>/subjects?subjectPrefix=":.<context-name>:<subject-name>" | jq
    

    その例を次に示します。

    curl --silent -X GET http://localhost:8081/subjects?subjectPrefix=":*:" | jq
    

    出力は以下のようになります。

    ":.snowcones:sales",
      "coffee-value",
      "donuts"
    

これで、2 つのクラスターとレジストリの Schema Linking のエクスポーターを作成してテストする準備ができました。以下に示す exporter コマンドを $CONFLUENT_HOME (Confluent Platform の最上位ディレクトリ)から実行します。

エクスポーターの構成ファイルを作成する

スキーマエクスポーターは、"送信元" 環境からスキーマを読み取り、リンクされたコピーを送信先にエクスポートします。

エクスポーターを作成するために使用する ~/config.txt を作成し、エクスポーターが "送信先" クラスターにアクセスするために必要な URL と認証情報を入力します。

schema.registry.url=<destination sr url>

認証情報の使用(任意)

このチュートリアルをローカルインスタンスの Confluent Platform でテストする場合、認証情報は不要です。

認証情報が必要な場合は、まず Schema Registry の HTTP 基本認証を構成 します。さらに、セキュアな通信のために HTTPS を使用 するのが理想です。

上記の指示に従って、Schema Registry で基本認証を使用する構成ができたら、次のように認証情報を ~/config.txt ファイルの最後に追加します。

schema.registry.url=<destination sr url>
basic.auth.credentials.source=USER_INFO
basic.auth.user.info=fred:letmein

上記の例と、それをサポートするための Schema Registry の必須構成については、Schema Registry の HTTP 基本認証を構成する 手順で説明しています。

このファイルの認証情報により、以下に示すコマンドで --config-file ~/config.txt を呼び出すたびに、自動的にこれらの認証情報が Schema Registry の URL とともに渡されます。

HTTP 基本認証のパラメーターに加えて、構成ファイルを使用して、「Schema Registry のクライアント」で説明するクライアント構成を渡すことができます。

ちなみに

HTTP 基本認証が構成された送信元の Schema Registry クラスターと Confluent CLI を使用して通信するには、--basic.auth.credentials.source および --basic.auth.user.info を適切な認証情報とともに渡す必要があります。または、ベアラートークン認証を使用する場合は、--bearer.auth.credentials.source および --bearer.auth.token フラグで認証情報を提供します。

送信元でエクスポーターを作成する

Confluent Platform CLI を使用して、送信元でエクスポーターを作成します。

schema-exporter --create コマンドを使用して新しいエクスポーターを作成します。

./bin/schema-exporter --create --name <name-of-exporter> --subjects ":*:" \
 --config-file ~/config.txt
 --schema.registry.url <source sr url>

たとえば、以下のコマンドでは、特定のコンテキストとデフォルトのコンテキストのものを含む、すべてのスキーマ(":*:")をエクスポートする "my-first-exporter" というエクスポーターが作成されます。

./bin/schema-exporter --create --name my-first-exporter --subjects ":*:" \
 --config-file ~/config.txt \
 --schema.registry.url  http://localhost:8081/

以下のコマンド構文では、カスタムコンテキスト context1 のサブジェクト donuts および coffee のみをエクスポートするエクスポーターが作成されます。

schema-exporter --create --name exporter1 --subjects donuts,coffee \
 --context-type CUSTOM --context-name context1 \
 --config-file ~/config.txt \
 --schema.registry.url <source sr url>

エクスポーターのその他のオプション

上記の最初の例を使用して作成したエクスポーターは、単にすべてをエクスポートするという意味で、比較的ベーシックなものです。次のセクションでわかるように、修飾サブジェクト名と非修飾サブジェクト名でスキーマを整理、エクスポート、操作する方法を理解するには、このようなエクスポーターが効率的です。

ただし、特定のサブジェクトとコンテキストのみをエクスポートするよう指定するエクスポーターも作成できることを覚えておいてください。その場合、以下の構文を使用します。

schema-exporter --create <exporterName> --subjects <subjectName1>,<subjectName2> \
--context-type CUSTOM --context-name <contextName> \
--config-file ~/config.txt
  • <> で囲まれた文字列は任意の名前に置き換えます。
  • subjects はコンマ区切りの文字列としてリストされます("pizzas,sales,customers" など)。
  • subjectscontext-typecontext-name はすべて省略可能です。context-type が CUSTOM の場合のみ context-name を指定します。
  • subjects のデフォルトは * で、context-type のデフォルトは AUTO です。

また、エクスポーターを作成するときにすべてをデフォルトとし、--subjects を指定しない場合、デフォルトのコンテキストのスキーマのみをエクスポートするエクスポーターが作成されます。

./bin/schema-exporter --create --name my-first-exporter \
 --config-file ~/config.txt \
 --schema.registry.url  http://localhost:8081/

このタイプのエクスポーターでは、修飾サブジェクト名を持つ送信元のスキーマが送信先にエクスポートされません。

schema-exporter --create および schema-exporter --update で使用できる省略可能なその他のパラメーターは、--subject-format です。これは、送信先クラスターのサブジェクト名のフォーマットを指定します。デフォルトサブジェクト名と置き換えるプレースホルダーとして、${subject} を含めることができます。たとえば、サブジェクト orders の場合、dc_${subject} は、送信先サブジェクト名 dc_orders にマッピングされます。

複数のエクスポーターを同時に作成して実行できるため、このクイックスタートを最後まで終えたら、次は異なるパラメーターを指定したその他のエクスポーターを作成し、テストしてみてください。

エクスポーターが実行されていることを確認しエクスポーターの情報を表示する

  1. 利用可能なエクスポーターのリストを表示します。

    ./bin/schema-exporter --list \
     --schema.registry.url <source sr url>
    

    その例を次に示します。

    ./bin/schema-exporter --list --schema.registry.url http://localhost:8081
    

    作成したエクスポーターがリストに表示されます。

    [my-first-exporter]
    
  2. エクスポーターに関する情報を表示します。

    ./bin/schema-exporter --describe --name <exporterName> \
     --schema.registry.url <source sr url>
    

    その例を次に示します。

    ./bin/schema-exporter --describe --schema.registry.url http://localhost:8081 --name my-first-exporter
    

    出力は以下のようになります。

    {"name":"my-first-exporter","subjects":[":*:"],"contextType":"AUTO","context":".","config":{"schema.registry.url":"http://localhost:8082"}}
    
  3. エクスポーターの構成を取得します。

    ./bin/schema-exporter --get-config --name <exporterName> \
     --schema.registry.url <source sr url>
    

    その例を次に示します。

    ./bin/schema-exporter --get-config --name my-first-exporter --schema.registry.url http://localhost:8081
    

    出力は以下のようになります。

    {schema.registry.url=http://localhost:8082}
    
  4. エクスポーターのステータスを取得します。

    ./bin/schema-exporter --get-status --name <exporterName> \
     --schema.registry.url <source sr url>
    

    その例を次に示します。

    ./bin/schema-exporter --get-status --name my-first-exporter --schema.registry.url http://localhost:8081
    

    出力は以下のようになります。

    {"name":"my-first-exporter","state":"RUNNING","offset":9,"ts":1635890864106}
    
  5. 最後に確認として、送信元のスキーマサブジェクトのリストを取得します。

    このためには、次のようにサブジェクトのプレフィックスを付けた API 呼び出しを使用します。

    curl --silent -X GET http://localhost:8081/subjects?subjectPrefix=":*:" | jq
    
    ":.snowcones:sales",
      "coffee-value",
      "donuts"
    

スキーマがエクスポートされたことを確認する

ここまでで、エクスポーターが実行中であることを確認しました。自分が送信元で作成したスキーマについては把握できているため、自分のスキーマが送信先にエクスポートされていることを確認します。

  1. 以下の API 呼び出しを実行して、送信先のスキーマサブジェクトを表示します。

    curl --silent -X GET http://localhost:8082/subjects?subjectPrefix=":*:" | jq
    
    ":.QWE7LDvySmeV6Sg81B3jUg-schema-registry.snowcones:sales",
    ":.QWE7LDvySmeV6Sg81B3jUg-schema-registry:coffee-value",
    ":.QWE7LDvySmeV6Sg81B3jUg-schema-registry:donuts"
    
  2. 特定のコンテキストのスキーマのみのリストを表示します。

    curl --silent -X GET '<destination sr url>/subjects?subjectPrefix=:.<context-name>:<subject-name>' | jq
    
    • たとえば、送信元の snowcones コンテキスト以下のすべてのサブジェクトを表示するには、次のコマンドを使用します。

      curl --silent -X GET 'http://localhost:8081/subjects?subjectPrefix=:.snowcones:' | jq
      

      snowcones コンテキストにサブジェクトが 1 つのみの場合は以下のような出力になります。

      ":.snowcones:sales"
      
    • 送信先の snowcones コンテキスト以下のすべてのサブジェクトのリストを表示するには、同じコマンド構文を使用します。なお、送信先のサブジェクト名の最初には長い ID を含める必要があります。これらは、プレフィックスの一部であるためです。

      curl --silent -X GET http://localhost:8082/subjects?subjectPrefix=":.QWE7LDvySmeV6Sg81B3jUg-schema-registry.snowcones:" | jq
      

      出力は以下のようになります。

      ":.QWE7LDvySmeV6Sg81B3jUg-schema-registry.snowcones:sales"
      

ちなみに

送信元でエクスポーターを作成したときに省略可能なパラメーター --subject-format を使用した場合は、送信先にエクスポートされたサブジェクトが、指定したサブジェクト名変更フォーマットにマッピングされていることを確認します。

エクスポーターを一時停止して変更を加える

  1. エクスポーターを一時停止します。

    SOURCE に戻り、以下のコマンドを実行してエクスポーターを一時停止します。

    ./bin/schema-exporter --pause --name <exporterName> \
     --schema.registry.url <source sr url>
    

    その例を次に示します。

    ./bin/schema-exporter --pause --name my-first-exporter --schema.registry.url http://localhost:8081
    

    コマンドが正常に実行されたことを確認するメッセージが出力されます。たとえば、Successfully paused exporter my-first-exporter. のようなメッセージです。

    念のためステータスを確認します。

    ./bin/schema-exporter --get-status --name <exporterName> \
     --schema.registry.url <source sr url>
    

    出力は以下のようになります。

    {"name":"my-first-exporter","state":"PAUSED","offset":9,"ts":1635890864106}
    
  2. スキーマエクスポーターのオフセットをリセットしてからステータスを取得します。

    ./bin/schema-exporter --reset --name <exporterName> \
     --schema.registry.url <source sr url>
    

    その例を次に示します。

    ./bin/schema-exporter --reset --name my-first-exporter --schema.registry.url http://localhost:8081
    

    ステータスに、オフセットがリセットされたことが示されます。その例を次に示します。

    Successfully reset exporter my-first-exporter
    
  3. エクスポーターの構成または情報をアップデートします。

    アップデートの対象を subjectscontext-typecontext-nameconfig-file から選択します。その例を次に示します。

    ./bin/schema-exporter --update --name <exporterName> --context-name <newContextName>
    
  4. スキーマエクスポーターを再開します。

    ./bin/schema-exporter --resume --name <exporterName> --schema.registry.url <source sr url>
    

    その例を次に示します。

    ./bin/schema-exporter --resume --name my-first-exporter --schema.registry.url http://localhost:8081
    

    出力は以下のようになります。

    Successfully resumed exporter my-first-exporter
    

エクスポーターを削除する

テストを終了できる状態になったら、以下のコマンドを使用してエクスポーターを一時停止し、削除します。

  1. エクスポーターを一時停止します。

    ./bin/schema-exporter --pause --name <exporterName> \
     --schema.registry.url <source sr url>
    

    その例を次に示します。

    ./bin/schema-exporter --pause --name my-first-exporter --schema.registry.url http://localhost:8081
    
  2. エクスポーターを削除します。

    ./bin/schema-exporter --delete --name <exporterName> \
    --schema.registry.url <source sr url>
    

    その例を次に示します。

    ./bin/schema-exporter --delete --name my-first-exporter --schema.registry.url http://localhost:8081
    

    出力は以下のようになります。

    Successfully deleted exporter my-first-exporter
    

クイックスタートはこれで終了です。次のセクションでは、Schema Linking の概念と、ここで試したツールの詳細について説明します。

スキーマコンテキスト

スキーマコンテキストとは

スキーマコンテキストは単に「コンテキスト」とも呼ばれ、基本的には、サブジェクト名とスキーマ ID をグループ化したものです。1 つの Schema Registry クラスターで任意の数のコンテキストをホストできます。各コンテキストは個別の "サブレジストリ" とみなすことができます。コンテキストは、スキーマエクスポーター を使用して別の Schema Registry にコピーすることもできます。

コンテキストの仕組み

以下で、コンテキストの主要な側面をいくつか紹介し、それらがスキーマの整理にどのように役立つかについて説明します。

スキーマのスコープはコンテキストごとに指定される

サブジェクト名とスキーマ ID のスコープはコンテキストごとに指定されるため、同じ Schema Registry クラスター内の 2 つのコンテキストにそれぞれ同じ ID(123 など)のスキーマ、または同じ名前(mytopic-value)のサブジェクトがあっても問題ありません。

デフォルトのコンテキスト

明示的なコンテキストがないスキーマ ID またはサブジェクト名はすべてデフォルトのコンテキストに存在し、1 つのドット(.)で表されます。明示的なコンテキストはドットで始まり、さらに別のドットで区切られた部分から構成されます(例: .mycontext.subcontext)。コンテキスト名は Unix の絶対パスと似ていますが、フォワードスラッシュの代わりにドットが使用されます(同様に、デフォルトのスキーマコンテキストは Unix のルートパスと似ています)。ただし、プレフィックスを共有する 2 つのコンテキストの間に関係はありません。

修飾サブジェクト

サブジェクト名がコンテキストで修飾されている場合、そのサブジェクトを "修飾サブジェクト" と呼びます。コンテキストでサブジェクトを修飾する場合、そのコンテキストをコロンで囲む必要があります。たとえば、:.mycontext:mysubject のようになります。修飾されていないサブジェクト名は、デフォルトのコンテキストにあるとみなされます。つまり、mysubject:.:mysubject と同じです(ドットはデフォルトのコンテキストを表します)。

コンテキストを REST API に渡す方法は 2 つあります。

要件を満たしたサブジェクトの使用

修飾サブジェクトは、サブジェクト名が想定される場所であればどこにでも渡すことができます。ほとんどの REST API は、POST /subjects/{subject}/versions のように、サブジェクト名を使用します。

URL パスの一部としてサブジェクト名を使用しない REST API はごくわずかです。その例を以下に示します。

  • /schemas/ids/{id}
  • /schemas/ids/{id}/subjects
  • /schemas/ids/{id}/versions

前述の 3 つの API で "subject"(?subject と記述します)というクエリパラメーターを使用できるようになったため、/schemas/ids/{id}?subject=:.mycontext:mysubject のように修飾サブジェクト名を渡すと、指定したコンテキストを使用したスキーマ ID のルックアップが実行されます。

ベースコンテキストパスの使用

前述したように、非修飾サブジェクトを指定するすべての API はデフォルトのコンテキストで機能します。サブジェクト名が想定される場所に修飾サブジェクトを渡す方法に加えて、ベースコンテキストパスを使用してコンテキストを渡す方法もあります。ベースコンテキストパスは /contexts/{context} という形式で、既存の Schema Registry パスすべての先頭に追加できます。このため、特定のコンテキストでスキーマ ID をルックアップするために、URL /contexts/.mycontext/schemas/ids/{id} を使用することもできます。

ベースコンテキストパスを使用してデフォルトのコンテキストを操作することもできます。この場合、ベースコンテキストパスは "/contexts/:.:/" という形式になります(例: /contexts/:.:/schemas/ids/{id})。一部の URL パーサーで削除されてしまうため、1 つのドットは使用できません。

マルチコンテキスト API

これまで挙げたすべての例は 1 つのコンテキストに該当するものです。複数のコンテキスト向けに結果を返す API は 3 つあります。

  • /contexts
  • /schemas?subjectPrefix=:*:
  • /subjects?subjectPrefix=:*:

最初の API、/contexts は、すべてのコンテキストのリストを返します。他の 2 つの API、/schemas および /subjects は通常、デフォルトのコンテキストのみで動作します。これらを使用して、subjectPrefix を値 :*: (コンテキストワイルドカードと呼ばれます)とともに渡すことで、すべてのコンテキストに対してクエリを実行できます。コンテキストワイルドカードはすべてのコンテキストに一致します。

クライアント向けのコンテキスト名の指定

クライアントを使用して Schema Registry と通信するときに、クライアントで特定のコンテキストを使用することもできます。たとえば、ある Schema Registry との通信から別の Schema Registry との通信にクライアントを移行する場合などです。このために、上で定義したベースコンテキストパスを使用します。この場合、クライアントが使用する Schema Registry の URL を https://<host1> から https://<host2>/contexts/.mycontext に変更するだけです。

Schema Registry の URL でベースコンテキストパスを使用することで、クライアントがすべての Schema Registry リクエストに同じスキーマコンテキストを使用するようになります。しかし、さらに高度な用法では、クライアントがトピックごとに異なるコンテキストを使用するようにできます。このためには、コンテキスト名戦略をシリアライザーまたはデシリアライザーに指定します。

  • context.name.strategy=com.acme.MyContextNameStrategy

コンテキスト名戦略は、以下のインターフェイスを実装する必要があるクラスです。

/**
 * A {@link ContextNameStrategy} is used by a serializer or deserializer to determine
 * the context name used with the schema registry.
 */
public interface ContextNameStrategy extends Configurable {

  /**
   * For a given topic, returns the context name to use.
   *
   * @param topic The Kafka topic name.
   * @return The context name to use
   */
  String contextName(String topic);
}

コンテキスト名戦略の使用は、一般的なものではありません。ベースコンテキストパスを Schema Registry の URL に指定する方法でほとんどのニーズを満たすことができます。

スキーマエクスポーター

スキーマエクスポーターとは

以前は、Confluent Replicator が、ある Schema Registry クラスターから別のクラスターに スキーマを移行 するための主要な手段でした。ただし、送信元の Schema Registry クラスターがオンプレミスである必要がありました。この手法によるスキーマの移行をサポートするには、グローバルで、または特定のサブジェクトについて、送信先の Schema Registry を IMPORT モードにします。

これに代わって新しいスキーマエクスポーター機能が導入され、Replicator のスキーマ移行機能が拡張されました。スキーマエクスポーターは Schema Registry クラスター内に常駐し、Confluent Cloud の 2 つの Schema Registry クラスター間でスキーマをレプリケートするために使用できます。

Schema Linking

スキーマエクスポーターにより、コンテキスト/修飾サブジェクト名を使用してレジストリ間のスキームを同期する Schema Linking に対応できます。スキーマコンテキストが概念的な基盤と名前空間フレームワークを提供する一方、エクスポーターは負担の大きなリンク処理を担います。

送信元のデフォルトのコンテキストから送信先の新しいコンテキストへのスキーマのエクスポート

デフォルトで、スキーマエクスポーターはスキーマを送信元の Schema Registry のデフォルトのコンテキストから送信先の Schema Registry の新しいコンテキストにエクスポートします。送信先のコンテキスト(つまり、送信先のコンテキスト内のサブジェクト)は IMPORT モードになります。これにより、送信先の Schema Registry は、デフォルトのコンテキストのクライアントに一切影響を与えずに、デフォルトのコンテキストを通常どおりに使用できます。

送信先の Schema Registry にデフォルトで作成される新しいコンテキストは .lsrc-xxxxxx という形式になり、送信元の論理名から取得されます。

Schema Registry クラスターは互いにスキーマをエクスポートできる

2 つの Schema Registry クラスターそれぞれでスキーマエクスポーターを使用して、デフォルトのコンテキストからもう一方の Schema Registry にスキーマをエクスポートできます。この構成では、それぞれがデフォルトのコンテキストからの読み取りまたはデフォルトのコンテキストへの書き込みを実行でき、それぞれがエクスポートされたコンテキストからの読み取りを実行できます(ただし、書き込みは実行できません)。これにより、それぞれに送信元トピックと読み取り専用のミラートピックを持つ Cluster Linking の構成に一致させることができます。

エクスポーターは同じ Schema Registry のコンテキスト全体にスキーマをコピーできる

さらに、スキーマエクスポーターは同じ Schema Registry クラスター内のコンテキスト間でスキーマをコピーできます。たとえば、".staging" コンテキストを作成し、本稼働環境で使用する準備ができた段階で、そのスキーマを ".staging" コンテキストからデフォルトのコンテキストにコピーできます。同じ Schema Registry クラスターとの間でスキーマのコピーを実行する場合は、特殊な URL local:/// を使用します。

スキーマのエクスポートのカスタマイズ

送信元の Schema Registry からエクスポートされるコンテキストや送信先の Schema Registry で使用されるコンテキストをカスタマイズするためのさまざまな方法が用意されています。全構成プロパティのリストについては、以下を参照してください。

Schema Registry ごとに使用できるエクスポーターの数

Schema Registry ごとに同時に使用できるエクスポーターの数は 10 に制限されています。

構成オプション

スキーマエクスポーターには、以下のとおり主要な構成プロパティがあります。

name
エクスポーターの一意の名前。
subjects

複数の形式があります。

  • サブジェクト名/コンテキストのリスト(例: [ "subject1", "subject2", ".mycontext1", ".mycontext2" ]
  • サブジェクト名のプレフィックスを含み、ワイルドカードで終わるシングルトンのリスト(例: ["mytopic*"]
  • ワイルドカード 1 つのみを含むシングルトンのリスト["*"]。デフォルトのコンテキストのすべてのサブジェクトを示します。これがデフォルトです。
  • コンテキストワイルドカードを含むシングルトンのリスト、[":*:"]。すべてのコンテキストを示します。
subject-format
これはオプションのパラメーターで、送信先クラスターのサブジェクト名のフォーマットを指定するために使用できます。プレースホルダーとして ${subject} を指定できます。これは、デフォルトのサブジェクト名と置き換えられます。たとえば、サブジェクト orders の場合、dc_${subject} は、送信先サブジェクト名 dc_orders にマッピングされます。
context-type

以下のいずれかです。

  • AUTO - 送信元のコンテキストの先頭に自動生成されたコンテキスト(Confluent Cloud の場合 .lsrc-xxxxxx)を追加します。これがデフォルトです。
  • CUSTOM - 送信元のコンテキストの先頭に、以下の context で指定するカスタムのコンテキスト名を追加します。
  • NONE - 先頭に何も追加せずに、送信元のコンテキストをそのままコピーします。このオプションは、送信元 Schema Registry そのものを送信先にコピーする場合に便利です。
context-name
前述の CUSTOM contextType で使用するコンテキスト名。
config

送信先 Schema Registry と通信するためのクライアントを作成するための構成セット。構成ファイル(--config-file ~/<my-config>.txt など)で渡すことができます。一般に、以下の構成が含まれます。

  • schema.registry.url - 送信先 Schema Registry の URL。送信元と送信先が同じ場合、これを local:/// にすると、さらに効率的にコピーできます。
  • basic.auth.credentials.source - 通常は "USER_INFO"
  • basic.auth.user.info - 通常の形式は <api-key>:<api-secret>

System Topics and Security Configurations

これらのファイルに、以下の構成を使用します。

  • exporter.config.topic - Stores configurations for the exporters. The default name for this topic is _exporter_configs.
  • exporter.state.topic - Stores the status of the exporters. The default name for this topic is _exporter_states.

If you are using role-based access control (RBAC), exporter.config.topic and exporter.state.topic require ResourceOwner on these topics, as does the _schemas internal topic. See also, Use Role-Based Access Control (RBAC) in Confluent Cloud and Configuring Role-Based Access Control for Schema Registry on Confluent Platform.

If you are configuring Schema Registry on Confluent Platform using the Schema Registry Security Plugin, you must activate both the exporter and the Schema Registry security plugin by specifying both extension classes in the $CONFLUENT_HOME/etc/schema-registry/schema-registry.properties files:

resource.extension.class=io.confluent.kafka.schemaregistry.security.SchemaRegistrySecurityResourceExtension,io.confluent.schema.exporter.SchemaExporterResourceExtension

The configuration for the exporter resource extension class in the schema-registry.properties is described in Set up source and destination environments in Schema Linking on Confluent Platform.

ライフサイクルとステート

Schema Registry はスキーマを Kafka トピックに保存します。スキーマエクスポーターは、そのトピックのオフセットを使用して進捗状況を判断します。

スキーマエクスポーターが作成されると、最初は STARTING ステートになります。このステートでは、既にトピックに書き込まれた該当するスキーマすべてを検出し、エクスポートします。登録済みのスキーマをエクスポートした後、エクスポーターは RUNNING ステートに入ります。このステートでは、新しいスキーマがあると通知され、該当するスキーマがあればエクスポートできます。スキーマのエクスポート中、エクスポーターは最新のトピックのオフセットを記録してその進捗状況を保存します。

スキーマエクスポーターに変更を加える場合、まず "一時停止" する必要があります。これによって、スキーマエクスポーターが PAUSED ステートに入ります。適宜変更を加えた後で、エクスポーターを再開できます。再開後、エクスポーターは記録されている最新のオフセットの後に該当するスキーマがあればそれを検出し、エクスポートします。

一時停止中のエクスポーターを "リセット" することもできます。これにより保存されているオフセットがクリアされ、再開時に該当するすべてのスキーマが再エクスポートされます。この場合、リセット後にエクスポーターが再び STARTING ステートから開始され、同じライフサイクルを進みます。

ライフサイクルの各段階におけるスキーマエクスポーターのステートを以下にまとめます。

ステート 説明
STARTING エクスポーターはトピックの該当する登録済みスキーマすべてを検出し、エクスポートします。これは開始時のステートまたはリセット後のステートです。
RUNNING エクスポーターは新しいスキーマを通知され、該当する場合はエクスポートして、最新のトピックのオフセットを記録することで進捗状況を追跡します。
PAUSED 構成の変更などを目的に、エクスポーターを一時停止できます。再開時に、エクスポーターは最新の記録済みオフセット以降のスキーマを検出し、エクスポートします。

REST API

Schema Registry は以下の REST API をサポートしています。詳細については、Schema Registry API のドキュメント の「エクスポーター」を参照してください。

タスク API
テナントのエクスポーターのリストを取得する GET /exporters
新しいエクスポーターを作成する POST /exporters
エクスポーターに関する情報を取得する GET /exporters/{name}
エクスポーターの構成を取得する GET /exporters/{name}/config
エクスポーターのステータスを取得する GET /exporters/{name}/status
エクスポーターの情報をアップデートする PUT /exporters/{name}/config
エクスポーターを一時停止する PUT /exporters/{name}/pause
エクスポーターを再開する PUT /exporters/{name}/resume
エクスポーターをリセットし、オフセットをクリアする PUT /exporters/{name}/reset
エクスポーターを削除する DELETE /exporters/{name}