Confluent Server 上の Schema Validation

Looking for Schema Management Confluent Cloud docs? You are currently viewing Confluent Platform documentation. If you are looking for Confluent Cloud docs, check out Schema Management on Confluent Cloud.

Kafka のトピックに対して生成されたデータに、サブジェクト命名方法 に従って Schema Registry に登録された有効なスキーマ ID が使用されていることは、ブローカーが Schema Validation を通じて確認します。

ちなみに

Schema Validation は、環境ごとにホストされている Schema Registry を使用することで、Confluent Cloud クラスターでも利用できます。詳細については、「Confluent Cloud でのブローカー側スキーマ検証の使用」を参照してください。

前提条件とブローカーでの Schema Registry URL の設定

以下の例を実行するための基本的な要件は、Schema Registry のチュートリアル に記載されている条件とほぼ同じです。ただし、ここでは Maven は必要ありません。また、Confluent Platform バージョン 5.4.0 以降が必要となります。

ブローカーの Schema Validation を有効にするためのもう 1 つの前提条件として、Confluent Platform を起動する前に、Kafka の server.properties ファイル($CONFLUENT_HOME/etc/kafka/server.properties)で confluent.schema.registry.url を指定する必要があります。これにより、Schema Registry への接続方法がブローカーに通知されます。

以下に例を示します。

confluent.schema.registry.url=http://schema-registry:8081

この構成には、Schema Registry インスタンスの URL をコンマ区切りのリストで指定することができます。Confluent CLIConfluent Control Center の両方から Schema Validation を利用するために、この設定が必要となります。

Confluent CLI からトピックの Schema Validation を有効にする

トピックの Schema Validation は、トピックの作成時または既存のトピックの変更時に有効にすることができます。

Schema Validation を使用してトピックを作成する

トピックの作成時にその Schema Validation を設定するには、confluent.value.schema.validation=trueconfluent.key.schema.validation=true を設定します。

値スキーマの検証とキースキーマの検証は互いに独立しています。一方のみを有効にすることも、両方を有効にすることもできます。(デフォルトでは、スキーマ検証は有効になっていません。キースキーマと値スキーマの検証はどちらもデフォルトで false になっています。)

たとえば、次のコマンドでは my-topic-sv というトピックが作成され、値スキーマに対するスキーマ検証が有効になります。

kafka-topics --create --bootstrap-server localhost:9092 --replication-factor 1 \
--partitions 1 --topic my-topic-sv \
--config confluent.value.schema.validation=true

このコマンドの出力は次のとおりです。

Created topic my-topic-sv.

この構成で、メッセージの値の有効なスキーマを持たないメッセージがトピック my-topic-sv に生成された場合、プロデューサーにエラーが返されて、メッセージが破棄されます。

バッチで送信されたメッセージの中に無効なメッセージが 1 つでもあった場合、そのバッチ全体が破棄されます。

ちなみに

値スキーマとキースキーマの詳細については、Schema Registry のチュートリアルの「用語の復習」を参照してください。

既存のトピックに Schema Validation を追加する

my-first-topic という新しいトピックを作成します。

kafka-topics --create --bootstrap-server localhost:9092 --replication-factor 1 --partitions 1 --topic my-first-topic

このコマンドの出力は次のとおりです。

Created topic my-first-topic.

既存のトピックに対する検証構成を(このケースでは、false から true に)変更するには、alter フラグと --add-config フラグを使用して検証を指定します。その例を次に示します。

kafka-configs --bootstrap-server localhost:9092 --alter --entity-type topics --entity-name my-first-topic --add-config confluent.value.schema.validation=true

次のような確認メッセージが表示されます。

Completed updating config for topic 'my-first-topic'.

このトピックに対する検証を無効(true から false)にするには、上記のコマンドに --add-config confluent.value.schema.validation=false を指定して再度実行します。

トピックのサブジェクト命名方法を変更する

Schema Registry 内のスキーマにトピックをマッピングするための 命名方法 として、デフォルトでは TopicNameStrategy が Confluent Server によって使用されます。

Confluent Platform 5.5.0 未満では、サブジェクト命名方法がブローカーの server.properties で構成されており、ブローカー上のすべてのトピックに同じ命名方法を使用する必要がありました。

Confluent Platform 5.5.0 以降では、命名方法がトピックに関連付けられます。そのため、スキーマのサブジェクトのキーと値の両方に、デフォルトの "トピックごと" 以外の命名方法を選択できるようになりました。その構成には、confluent.key.subject.name.strategyconfluent.value.subject.name.strategy を使用します。

Confluent CLI から、--config オプションに命名方法を指定してトピックを作成または変更します。以下に例を示します。

RecordNameStrategy を値に使用するトピックを作成するには、次のようにします。

./bin/kafka-topics --create --bootstrap-server localhost:9092 \
--replication-factor 1 --partitions 1 --topic my-other-cool-topic \
--config confluent.value.schema.validation=true --config confluent.value.subject.name.strategy=io.confluent.kafka.serializers.subject.RecordNameStrategy

RecordNameStrategy をキーに使用するようにトピックを変更するには、次のようにします。

kafka-configs --bootstrap-server localhost:9092 \
--alter --entity-type topics --entity-name my-other-cool-topic \
--add-config confluent.key.subject.name.strategy=io.confluent.kafka.serializers.subject.RecordNameStrategy

次の構成は、confluent.value.subject.name.strategyconfluent.key.subject.name.strategy のどちらにでも使用できます。

  • io.confluent.kafka.serializers.subject.TopicNameStrategy (デフォルト)
  • io.confluent.kafka.serializers.subject.RecordNameStrategy
  • io.confluent.kafka.serializers.subject.TopicRecordNameStrategy

Confluent Control Center でトピックの Schema Validation を有効にする

Control Center では、トピックに対する Schema Validation を確認したり、有効または無効にしたりできます(Control Center ガイドの「トピックのスキーマを管理する」を参照してください)。加えて、トピックごとのサブジェクト命名方法を構成することができます。

トピックの Schema Validation 設定を表示または変更する

Control Center (http://localhost:9021/)から現在の構成を表示したりトピックの Schema Validation を有効にしたりするには、次の手順に従います。

  1. 既存のトピックの Configuration タブをクリックし、Edit settings をクリックします。

  2. Switch to expert mode をクリックします。

    ../_images/sv-c3-topic-expert-settings.png
  3. エキスパートモードで、confluent.value.schema.validationconfluent.key.schema.validation の設定を false から true に変更します。

    confluent.key.schema.validation を見つけるには、下にスクロールすることが必要な場合があります。

    ../_images/sv-c3-topic-enable.png
  4. Save changes をクリックします。

トピックのサブジェクト命名方法を変更する

Control Center を使用して サブジェクト命名方法 を変更するには、次の手順に従います。

  1. Control Center で、アップデートするトピックを選択し、ConfigurationSwitch to expert mode を順にクリックします。

    confluent.value.subject.name.strategyconfluent.key.subject.name.strategy を見つけます。

    ../_images/c3-schema-subject-name-strategy.png
  2. 設定を変更し、Save changes をクリックします。

次の構成は、confluent.value.subject.name.strategyconfluent.key.subject.name.strategy のどちらにでも使用できます。

  • io.confluent.kafka.serializers.subject.TopicNameStrategy (デフォルト)
  • io.confluent.kafka.serializers.subject.RecordNameStrategy
  • io.confluent.kafka.serializers.subject.TopicRecordNameStrategy

詳細については、「トピックのサブジェクト命名方法を変更する」を参照してください。同じタスクを Confluent CLI で行う方法について説明しているほか、この機能の背景情報についても詳しく説明しています。

デモ: コマンドラインからトピックの Schema Validation を有効にする

この短いデモでは、トピックのスキーマ検証を有効または無効にした場合の効果を紹介しています。

Confluent Platform と Schema Registry を使い始めて間もない方は、最初に「Schema Registry のチュートリアル」を読んでから、このデモをご覧になることをお勧めします。

以下の例では、$CONFLUENT_HOME/bin にある kafka-console-producerkafka-console-consumer を利用しています。

  1. Confluent Platform バージョン 5.4.0 以降のローカルインストールでは、$CONFLUENT_HOME/etc/kafka/server.properties に、次の Schema Registry URL の構成を追加します。

    ############################## My Schema Validation Demo Settings ################
    # Schema Registry URL
    confluent.schema.registry.url=http://localhost:8081
    

    上の例では、ファイルの構成を追跡するために 2 行のコメントが含まれていますが、これらは省略してもかまいません。

  2. 次のコマンドを使用して Confluent Platform を起動します。

    confluent local services start
    

    ちなみに

    • 別の方法として、単に confluent local services schema-registry start` を実行してもかまいません。その場合は、依存関係として kafkazookeeper も起動されます。このデモでは、Connect や Control Center など、他のサービスを直接参照することはしていません。もっとも、さらに理解を深めるために(トピックやメッセージが Control Center にどのように表示されるかなど)、フルスタックを実行したい場合もあるでしょう。confluent local の詳細については、「Confluent Platform のクイックスタート」および Confluent CLI コマンドリファレンスの「confluent local」を参照してください。
    • confluent local のコマンドはバックグラウンドで実行されるので、そのコマンドウィンドウを再利用することができます。プロデューサーとコンシューマーのセッションは独立している必要があります。
  3. test-schemas というテストトピックを作成します。デフォルトの false に設定されるよう Schema Validation 設定は指定しません。

    kafka-topics --bootstrap-server localhost:9092 --create --partitions 1 --replication-factor 1 --topic test-schemas
    

    これでトピックが作成されます。テストトピックに生成されたレコードに対するブローカー検証は行われません。このデモの最初のパートでは、意図的にそのようにしてあります。トピックが作成されたことは、kafka-topics --bootstrap-server localhost:9092 --list を使用して確認できます。

  4. プロデューサーの新しいコマンドウィンドウで次のコマンドを実行し、(デフォルトの文字列シリアライザーを使用して)シリアル化されたレコードをトピック test-schemas に生成します。

    kafka-console-producer --broker-list localhost:9092 --topic test-schemas --property parse.key=true --property key.separator=,
    

    現在このトピックは Schema Validation が無効になっているためこのコマンドは成功します。先ほどこのトピックに対してブローカーの Schema Validation を有効にしていた場合、そのトピックを生成先とする上のコマンドは許可されません。

    このコマンドを実行すると、プロデューサーのコマンドプロンプト(> )が出力として表示されます。ここで、生成するメッセージを入力することができます。

    > プロンプトに、初めてのメッセージを次のように入力します。

    1,my first record
    

    このプロデューサーセッションは実行したままにしてください。

  5. コンシューマー用に新しいコマンドウィンドウを開き、次のコマンドを入力してメッセージを読み取ります。

    kafka-console-consumer --bootstrap-server localhost:9092 --from-beginning --topic test-schemas --property print.key=true
    

    このコマンドからは my first record が出力されます。

    このコンシューマーセッションは実行したままにしてください。

  6. 今度は、トピック test-schemas の Schema Validation を true に設定します。

    kafka-configs --bootstrap-server localhost:9092 --alter --entity-type topics --entity-name test-schemas --add-config confluent.value.schema.validation=true
    

    Completed updating config for topic test-schemas. という確認のメッセージが表示されます。

  7. プロデューサーセッションに戻り、> プロンプトに 2 つ目のメッセージを入力します。

    2,my second record
    

    Schema Validation が有効になっており送信しているメッセージにはスキーマ ID が含まれていないので、This record has failed the validation on broker というエラーが返されます。

    次に、Schema Validation を無効にし(同じコマンドを使用して false に設定)、プロデューサーを再起動した後、同じメッセージまたは同様のフォーマットのメッセージを入力して再送信すると、今度はメッセージが承認されます。(たとえば、3,my third record を生成してみてください。)

    ちなみに

    この最後の手順で(スキーマ検証を false に切り替えた後に)プロデューサーを再起動する代わりに、エラーの後の空白行に 3,my third record というメッセージを入力またはコピーアンドペーストして、Enter キーを押すこともできます。コンシューマーがこれを取り込み、プロデューサーのプロンプトに戻ります。ただし、これはわかりやすいワークフローではありません。エラーの後にはプロンプトが表示されないため、まず Enter キーを押そうとしがちですが、そうするとプロデューサーがシャットダウンしてしまいます。

    正常に生成されたメッセージは、Control Center (ウェブブラウザーで http://localhost:9021/ にアクセス)で、 Topics **、 **test-schemas **、 **messages の順に選択して表示される画面にも表示されます。以前に送信されたメッセージを表示するには、パーティションを選択するか、タイムスタンプに移動しなければならない場合があります。

    ../_images/sv-topics.png
  8. シャットダウンとクリーンアップのタスクを実行します。

    • コンシューマーとプロデューサーを停止するには、それぞれのコマンドウィンドウで Ctrl キーを押しながら C キーを押します。
    • Confluent Platform を停止するには、「confluent local stop」と入力します。
    • 別のテストを最初から実行する前に既存のデータ(トピック、スキーマ、メッセージ)を削除する場合は、「confluent local destroy」と入力します。

Schema Validation のセキュリティの構成

一般には、Schema Registry がブローカーへの接続を開始します。Schema Validation は、ブローカーが Schema Registry への接続を開始するという点で独特です。このようになっているのは、レジストリからスキーマを取得し、プロデューサーから受信したメッセージが、特定のトピックに関連付けられているスキーマと一致することを検証するためです。Schema Validation が有効な場合、タスクの流れは次のようになります。

  1. ブローカーがプロデューサーからメッセージを受信し、その宛先が、スキーマが関連付けられているトピックであることを認識します。
  2. ブローカーが Schema Registry への接続を開始します。
  3. ブローカーは、トピックに関連付けられているスキーマを(スキーマ ID によって)要求します。
  4. Schema Registry がリクエストを受信し、要求されたスキーマをスキーマストレージから見つけてブローカーに返します。
  5. ブローカーは、メッセージをスキーマに照らして検証します。

したがって、トピックに対してブローカー側の Schema Validation が有効になっているクラスターでセキュリティをセットアップするには、ブローカーから開始される Schema Registry への接続をサポートするために、Kafka ブローカーで設定を構成する必要があります。複数のブローカーがある場合は、それぞれのブローカーを構成する必要があります。たとえば、mTLS では、ブローカーごとに異なる証明書を用意することが理想的です。

Schema Registry の内部 Kafka クライアントから Kafka ブローカーへの接続は、ブローカー側の Schema Validation と Schema Registry の HTTP リスナー間の接続とはまったく関係がありません。以降のセキュリティ設定は、Schema Registry 内部のクライアントからブローカーへの接続を反映するものではありません。

以下のブローカー構成には confluent.schema.registry.url が含まれ、これによって Schema Registry への接続方法がブローカーに通知されます。これは既に、スキーマ検証を使用するための前提条件 としてブローカーに構成されている可能性があります。示されている残りの設定は、それぞれのセキュリティ構成に固有のものです。

ちなみに

以下のセクションでは、ブローカーのセキュリティ構成に焦点を当てています。Schema Registry のセキュリティを設定する方法の詳細については、「Schema Registry のセキュリティの概要」および関連するセクションを参照してください。たとえば、クライアント(ブローカーなど)にユーザー名とパスワードを使用した認証を要求するように Schema Registry をセットアップするには、「HTTP 基本認証」の「Schema Registry」を参照してください。

mTLS(相互 TLS)認証

SSL を使用した双方向認証mTLS)を構成するには、認証(証明書)と 暗号化 に SSL を使用して、ブローカーを Schema Registry に認証します。

各ブローカーのブローカープロパティファイル($CONFLUENT_HOME/etc/kafka/server.properties)で、次の設定を定義します。

confluent.schema.registry.url=https://<host>:<port>
confluent.ssl.truststore.location=<path to truststore file>
confluent.ssl.truststore.password=<password for the truststore if encrypted>
confluent.ssl.keystore.location=<path to file with private key for the broker>
confluent.ssl.keystore.password=<password to keystore file>
confluent.ssl.key.password=<password>

基本認証

このセットアップでは、ブローカーが 基本認証 を使用して Schema Registry に認証するように構成されます。

各ブローカー($CONFLUENT_HOME/etc/kafka/server.properties)で、次の設定を定義します。

confluent.schema.registry.url=http://<host>:<port>
confluent.basic.auth.credentials.source=<USER_INFO, URL, or SASL_INHERIT>
confluent.basic.auth.user.info=<username>:<password> #required only if credentials source is set to USER_INFO
  • プロパティ confluent.basic.auth.credentials.source は、使用する認証情報のタイプ(ユーザー名とパスワード)を定義します。これらは変数ではなくリテラルです。
  • confluent.basic.auth.credentialsUSER_INFO に設定した場合は、confluent.basic.auth.user.info も指定する必要があります。

SSL を使用する基本認証

各ブローカー($CONFLUENT_HOME/etc/kafka/server.properties)で、次の設定を定義します。

confluent.schema.registry.url=https://<host>:<port>
confluent.basic.auth.credentials.source=USER_INFO
confluent.basic.auth.user.info=<username>:<password>
confluent.ssl.truststore.location=<path to the truststore file>
confluent.ssl.truststore.password=<password for the truststore if encrypted>

ロールベースアクセス制御(RBAC)と ACL

Schema Registry への認可のために、ロールベースアクセス制御 (および ACL またはそのどちらか)を行って、ブローカーで他のタイプのセキュリティとともに使用することができます。

Schema Registry セキュリティプラグイン では、RBAC と ACL の両方の認可がサポートされます。

ブローカーの構成

セキュリティのタイプに応じて、前のセクションと同様の設定をブローカーに適切に定義します。たとえば、基本認証では、ブローカーのプロパティファイルに次のような設定を含めます。

confluent.schema.registry.url=http://<host>:<port>
confluent.basic.auth.credentials.source=USER_INFO
confluent.basic.auth.user.info=<username>:<password> #required only if credentials source is set to USER_INFO

Schema Registry へのアクセスを許可するための RBAC ロールバインディングの設定

RBAC ロールバインディングを定義するには、Schema Registry に少なくとも次の IAM 割り当てが必要です。

confluent iam rbac role-binding list --principal User:<user-id> \
--role DeveloperRead --kafka-cluster-id <kafka-cluster-id> \
--resource Subject:* --schema-registry-cluster-id <schema-registry-group-id>

Schema Registry クラスター ID は、schema-registry-group-id (デフォルトでは schema-registry)と同じです。

ライセンス

ブローカー側の Schema Validation には Confluent Enterprise ライセンスが必要です。

詳細については、「Schema Registry」ドキュメントの「ライセンス」を参照してください。

おすすめのリソース