Confluent Server 上の Schema Validation¶
Looking for Schema Management Confluent Cloud docs? You are currently viewing Confluent Platform documentation. If you are looking for Confluent Cloud docs, check out Schema Management on Confluent Cloud.
Kafka のトピックに対して生成されたデータに、サブジェクト命名方法 に従って Schema Registry に登録された有効なスキーマ ID が使用されていることは、ブローカーが Schema Validation を通じて確認します。
ちなみに
Schema Validation は、環境ごとにホストされている Schema Registry を使用することで、Confluent Cloud クラスターでも利用できます。詳細については、「Confluent Cloud でのブローカー側スキーマ検証の使用」を参照してください。
前提条件とブローカーでの Schema Registry URL の設定¶
以下の例を実行するための基本的な要件は、Schema Registry のチュートリアル に記載されている条件とほぼ同じです。ただし、ここでは Maven は必要ありません。また、Confluent Platform バージョン 5.4.0 以降が必要となります。
ブローカーの Schema Validation を有効にするためのもう 1 つの前提条件として、Confluent Platform を起動する前に、Kafka の server.properties
ファイル($CONFLUENT_HOME/etc/kafka/server.properties
)で confluent.schema.registry.url
を指定する必要があります。これにより、Schema Registry への接続方法がブローカーに通知されます。
以下に例を示します。
confluent.schema.registry.url=http://schema-registry:8081
この構成には、Schema Registry インスタンスの URL をコンマ区切りのリストで指定することができます。Confluent CLI と Confluent Control Center の両方から Schema Validation を利用するために、この設定が必要となります。
Confluent CLI からトピックの Schema Validation を有効にする¶
トピックの Schema Validation は、トピックの作成時または既存のトピックの変更時に有効にすることができます。
Schema Validation を使用してトピックを作成する¶
トピックの作成時にその Schema Validation を設定するには、confluent.value.schema.validation=true
と confluent.key.schema.validation=true
を設定します。
値スキーマの検証とキースキーマの検証は互いに独立しています。一方のみを有効にすることも、両方を有効にすることもできます。(デフォルトでは、スキーマ検証は有効になっていません。キースキーマと値スキーマの検証はどちらもデフォルトで false
になっています。)
たとえば、次のコマンドでは my-topic-sv
というトピックが作成され、値スキーマに対するスキーマ検証が有効になります。
kafka-topics --create --bootstrap-server localhost:9092 --replication-factor 1 \
--partitions 1 --topic my-topic-sv \
--config confluent.value.schema.validation=true
このコマンドの出力は次のとおりです。
Created topic my-topic-sv.
この構成で、メッセージの値の有効なスキーマを持たないメッセージがトピック my-topic-sv
に生成された場合、プロデューサーにエラーが返されて、メッセージが破棄されます。
バッチで送信されたメッセージの中に無効なメッセージが 1 つでもあった場合、そのバッチ全体が破棄されます。
ちなみに
値スキーマとキースキーマの詳細については、Schema Registry のチュートリアルの「用語の復習」を参照してください。
既存のトピックに Schema Validation を追加する¶
my-first-topic
という新しいトピックを作成します。
kafka-topics --create --bootstrap-server localhost:9092 --replication-factor 1 --partitions 1 --topic my-first-topic
このコマンドの出力は次のとおりです。
Created topic my-first-topic.
既存のトピックに対する検証構成を(このケースでは、false
から true
に)変更するには、alter
フラグと --add-config
フラグを使用して検証を指定します。その例を次に示します。
kafka-configs --bootstrap-server localhost:9092 --alter --entity-type topics --entity-name my-first-topic --add-config confluent.value.schema.validation=true
次のような確認メッセージが表示されます。
Completed updating config for topic 'my-first-topic'.
このトピックに対する検証を無効(true
から false
)にするには、上記のコマンドに --add-config confluent.value.schema.validation=false
を指定して再度実行します。
トピックのサブジェクト命名方法を変更する¶
Schema Registry 内のスキーマにトピックをマッピングするための 命名方法 として、デフォルトでは TopicNameStrategy
が Confluent Server によって使用されます。
Confluent Platform 5.5.0 未満では、サブジェクト命名方法がブローカーの server.properties
で構成されており、ブローカー上のすべてのトピックに同じ命名方法を使用する必要がありました。
Confluent Platform 5.5.0 以降では、命名方法がトピックに関連付けられます。そのため、スキーマのサブジェクトのキーと値の両方に、デフォルトの "トピックごと" 以外の命名方法を選択できるようになりました。その構成には、confluent.key.subject.name.strategy
と confluent.value.subject.name.strategy
を使用します。
Confluent CLI から、--config
オプションに命名方法を指定してトピックを作成または変更します。以下に例を示します。
RecordNameStrategy
を値に使用するトピックを作成するには、次のようにします。
./bin/kafka-topics --create --bootstrap-server localhost:9092 \
--replication-factor 1 --partitions 1 --topic my-other-cool-topic \
--config confluent.value.schema.validation=true --config confluent.value.subject.name.strategy=io.confluent.kafka.serializers.subject.RecordNameStrategy
RecordNameStrategy
をキーに使用するようにトピックを変更するには、次のようにします。
kafka-configs --bootstrap-server localhost:9092 \
--alter --entity-type topics --entity-name my-other-cool-topic \
--add-config confluent.key.subject.name.strategy=io.confluent.kafka.serializers.subject.RecordNameStrategy
次の構成は、confluent.value.subject.name.strategy
と confluent.key.subject.name.strategy
のどちらにでも使用できます。
io.confluent.kafka.serializers.subject.TopicNameStrategy
(デフォルト)io.confluent.kafka.serializers.subject.RecordNameStrategy
io.confluent.kafka.serializers.subject.TopicRecordNameStrategy
Confluent Control Center でトピックの Schema Validation を有効にする¶
Control Center では、トピックに対する Schema Validation を確認したり、有効または無効にしたりできます(Control Center ガイドの「トピックのスキーマを管理する」を参照してください)。加えて、トピックごとのサブジェクト命名方法を構成することができます。
トピックの Schema Validation 設定を表示または変更する¶
Control Center (http://localhost:9021/)から現在の構成を表示したりトピックの Schema Validation を有効にしたりするには、次の手順に従います。
既存のトピックの Configuration タブをクリックし、Edit settings をクリックします。
Switch to expert mode をクリックします。
エキスパートモードで、
confluent.value.schema.validation
とconfluent.key.schema.validation
の設定を false から true に変更します。confluent.key.schema.validation
を見つけるには、下にスクロールすることが必要な場合があります。Save changes をクリックします。
トピックのサブジェクト命名方法を変更する¶
Control Center を使用して サブジェクト命名方法 を変更するには、次の手順に従います。
Control Center で、アップデートするトピックを選択し、Configuration、Switch to expert mode を順にクリックします。
confluent.value.subject.name.strategy
とconfluent.key.subject.name.strategy
を見つけます。設定を変更し、Save changes をクリックします。
次の構成は、confluent.value.subject.name.strategy
と confluent.key.subject.name.strategy
のどちらにでも使用できます。
io.confluent.kafka.serializers.subject.TopicNameStrategy
(デフォルト)io.confluent.kafka.serializers.subject.RecordNameStrategy
io.confluent.kafka.serializers.subject.TopicRecordNameStrategy
詳細については、「トピックのサブジェクト命名方法を変更する」を参照してください。同じタスクを Confluent CLI で行う方法について説明しているほか、この機能の背景情報についても詳しく説明しています。
デモ: コマンドラインからトピックの Schema Validation を有効にする¶
この短いデモでは、トピックのスキーマ検証を有効または無効にした場合の効果を紹介しています。
Confluent Platform と Schema Registry を使い始めて間もない方は、最初に「Schema Registry のチュートリアル」を読んでから、このデモをご覧になることをお勧めします。
以下の例では、$CONFLUENT_HOME/bin
にある kafka-console-producer
と kafka-console-consumer
を利用しています。
Confluent Platform バージョン 5.4.0 以降のローカルインストールでは、
$CONFLUENT_HOME/etc/kafka/server.properties
に、次の Schema Registry URL の構成を追加します。############################## My Schema Validation Demo Settings ################ # Schema Registry URL confluent.schema.registry.url=http://localhost:8081
上の例では、ファイルの構成を追跡するために 2 行のコメントが含まれていますが、これらは省略してもかまいません。
次のコマンドを使用して Confluent Platform を起動します。
confluent local services start
ちなみに
- 単に
confluent local schema-registry
を実行してもかまいません。その場合は、依存関係としてkafka
とzookeeper
も起動されます。このデモでは、Connect や Control Center など、他のサービスを直接参照することはしていません。もっとも、さらに理解を深めるために(トピックやメッセージが Control Center にどのように表示されるかなど)、フルスタックを実行したい場合もあるでしょう。confluent local
の詳細については、「Confluent Platform を使用した Apache Kafka のクイックスタート(ローカル)」および Confluent CLI コマンドリファレンスの「confluent local」を参照してください。 confluent local
のコマンドはバックグラウンドで実行されるので、そのコマンドウィンドウを再利用することができます。プロデューサーとコンシューマーのセッションは独立している必要があります。
- 単に
test-schemas
というテストトピックを作成します。デフォルトのfalse
に設定されるよう Schema Validation 設定は指定しません。kafka-topics --bootstrap-server localhost:9092 --create --partitions 1 --replication-factor 1 --topic test-schemas
これでトピックが作成されます。テストトピックに生成されたレコードに対するブローカー検証は行われません。このデモの最初のパートでは、意図的にそのようにしてあります。トピックが作成されたことは、
kafka-topics --bootstrap-server localhost:9092 --list
を使用して確認できます。プロデューサーの新しいコマンドウィンドウで次のコマンドを実行し、(デフォルトの文字列シリアライザーを使用して)シリアル化されたレコードをトピック
test-schemas
に生成します。kafka-console-producer --broker-list localhost:9092 --topic test-schemas --property parse.key=true --property key.separator=,
現在このトピックは Schema Validation が無効になっているためこのコマンドは成功します。先ほどこのトピックに対してブローカーの Schema Validation を有効にしていた場合、そのトピックを生成先とする上のコマンドは許可されません。
このコマンドを実行すると、プロデューサーのコマンドプロンプト(
>
)が出力として表示されます。ここで、生成するメッセージを入力することができます。>
プロンプトに、初めてのメッセージを次のように入力します。1,my first record
このプロデューサーセッションは実行したままにしてください。
コンシューマー用に新しいコマンドウィンドウを開き、次のコマンドを入力してメッセージを読み取ります。
kafka-console-consumer --bootstrap-server localhost:9092 --from-beginning --topic test-schemas --property print.key=true
このコマンドからは
my first record
が出力されます。このコンシューマーセッションは実行したままにしてください。
今度は、トピック
test-schemas
の Schema Validation をtrue
に設定します。kafka-configs --bootstrap-server localhost:9092 --alter --entity-type topics --entity-name test-schemas --add-config confluent.value.schema.validation=true
Completed updating config for topic test-schemas.
という確認のメッセージが表示されます。プロデューサーセッションに戻り、
>
プロンプトに 2 つ目のメッセージを入力します。2,my second record
Schema Validation が有効になっており送信しているメッセージにはスキーマ ID が含まれていないので、
This record has failed the validation on broker
というエラーが返されます。次に、Schema Validation を無効にし(同じコマンドを使用して
false
に設定)、プロデューサーを再起動した後、同じメッセージまたは同様のフォーマットのメッセージを入力して再送信すると、今度はメッセージが承認されます。(たとえば、3,my third record
を生成してみてください。)ちなみに
この最後の手順で(スキーマ検証を
false
に切り替えた後に)プロデューサーを再起動する代わりに、エラーの後の空白行に3,my third record
というメッセージを入力またはコピーアンドペーストして、Enter キーを押すこともできます。コンシューマーがこれを取り込み、プロデューサーのプロンプトに戻ります。ただし、これはわかりやすいワークフローではありません。エラーの後にはプロンプトが表示されないため、まず Enter キーを押そうとしがちですが、そうするとプロデューサーがシャットダウンしてしまいます。正常に生成されたメッセージは、Control Center (ウェブブラウザーで http://localhost:9021/ にアクセス)で、 Topics **、 **test-schemas **、 **messages の順に選択して表示される画面にも表示されます。以前に送信されたメッセージを表示するには、パーティションを選択するか、タイムスタンプに移動しなければならない場合があります。
シャットダウンとクリーンアップのタスクを実行します。
- コンシューマーとプロデューサーを停止するには、それぞれのコマンドウィンドウで Ctrl キーを押しながら C キーを押します。
- Confluent Platform を停止するには、「
confluent local stop
」と入力します。 - 別のテストを最初から実行する前に既存のデータ(トピック、スキーマ、メッセージ)を削除する場合は、「
confluent local destroy
」と入力します。
Schema Validation のセキュリティの構成¶
一般には、Schema Registry がブローカーへの接続を開始します。Schema Validation は、ブローカーが Schema Registry への接続を開始するという点で独特です。このようになっているのは、レジストリからスキーマを取得し、プロデューサーから受信したメッセージが、特定のトピックに関連付けられているスキーマと一致することを検証するためです。Schema Validation が有効な場合、タスクの流れは次のようになります。
- ブローカーがプロデューサーからメッセージを受信し、その宛先が、スキーマが関連付けられているトピックであることを認識します。
- ブローカーが Schema Registry への接続を開始します。
- ブローカーは、トピックに関連付けられているスキーマを(スキーマ ID によって)要求します。
- Schema Registry がリクエストを受信し、要求されたスキーマをスキーマストレージから見つけてブローカーに返します。
- ブローカーは、メッセージをスキーマに照らして検証します。
したがって、トピックに対してブローカー側の Schema Validation が有効になっているクラスターでセキュリティをセットアップするには、ブローカーから開始される Schema Registry への接続をサポートするために、Kafka ブローカーで設定を構成する必要があります。複数のブローカーがある場合は、それぞれのブローカーを構成する必要があります。たとえば、mTLS では、ブローカーごとに異なる証明書を用意することが理想的です。
Schema Registry の内部 Kafka クライアントから Kafka ブローカーへの接続は、ブローカー側の Schema Validation と Schema Registry の HTTP リスナー間の接続とはまったく関係がありません。以降のセキュリティ設定は、Schema Registry 内部のクライアントからブローカーへの接続を反映するものではありません。
以下のブローカー構成には confluent.schema.registry.url
が含まれ、これによって Schema Registry への接続方法がブローカーに通知されます。これは既に、スキーマ検証を使用するための前提条件 としてブローカーに構成されている可能性があります。示されている残りの設定は、それぞれのセキュリティ構成に固有のものです。
ちなみに
以下のセクションでは、ブローカーのセキュリティ構成に焦点を当てています。Schema Registry のセキュリティを設定する方法の詳細については、「Schema Registry のセキュリティの概要」および関連するセクションを参照してください。たとえば、クライアント(ブローカーなど)にユーザー名とパスワードを使用した認証を要求するように Schema Registry をセットアップするには、「HTTP 基本認証」の「Schema Registry」を参照してください。
mTLS(相互 TLS)認証¶
SSL を使用した双方向認証 (mTLS)を構成するには、認証(証明書)と 暗号化 に SSL を使用して、ブローカーを Schema Registry に認証します。
各ブローカーのブローカープロパティファイル($CONFLUENT_HOME/etc/kafka/server.properties
)で、次の設定を定義します。
confluent.schema.registry.url=https://<host>:<port>
confluent.ssl.truststore.location=<path to truststore file>
confluent.ssl.truststore.password=<password for the truststore if encrypted>
confluent.ssl.keystore.location=<path to file with private key for the broker>
confluent.ssl.keystore.password=<password to keystore file>
confluent.ssl.key.password=<password>
基本認証¶
このセットアップでは、ブローカーが 基本認証 を使用して Schema Registry に認証するように構成されます。
各ブローカー($CONFLUENT_HOME/etc/kafka/server.properties
)で、次の設定を定義します。
confluent.schema.registry.url=http://<host>:<port>
confluent.basic.auth.credentials.source=<USER_INFO, URL, or SASL_INHERIT>
confluent.basic.auth.user.info=<username>:<password> #required only if credentials source is set to USER_INFO
- プロパティ
confluent.basic.auth.credentials.source
は、使用する認証情報のタイプ(ユーザー名とパスワード)を定義します。これらは変数ではなくリテラルです。 confluent.basic.auth.credentials
をUSER_INFO
に設定した場合は、confluent.basic.auth.user.info
も指定する必要があります。
SSL を使用する基本認証¶
各ブローカー($CONFLUENT_HOME/etc/kafka/server.properties
)で、次の設定を定義します。
confluent.schema.registry.url=https://<host>:<port>
confluent.basic.auth.credentials.source=USER_INFO
confluent.basic.auth.user.info=<username>:<password>
confluent.ssl.truststore.location=<path to the truststore file>
confluent.ssl.truststore.password=<password for the truststore if encrypted>
ロールベースアクセス制御(RBAC)と ACL¶
Schema Registry への認可のために、ロールベースアクセス制御 (および ACL またはそのどちらか)を行って、ブローカーで他のタイプのセキュリティとともに使用することができます。
Schema Registry セキュリティプラグイン では、RBAC と ACL の両方の認可がサポートされます。
ブローカーの構成¶
セキュリティのタイプに応じて、前のセクションと同様の設定をブローカーに適切に定義します。たとえば、基本認証では、ブローカーのプロパティファイルに次のような設定を含めます。
confluent.schema.registry.url=http://<host>:<port>
confluent.basic.auth.credentials.source=USER_INFO
confluent.basic.auth.user.info=<username>:<password> #required only if credentials source is set to USER_INFO
Schema Registry へのアクセスを許可するための RBAC ロールバインディングの設定¶
RBAC ロールバインディングを定義するには、Schema Registry に少なくとも次の IAM 割り当てが必要です。
confluent iam rolebinding list --principal User:<user-id> \
--role DeveloperRead --kafka-cluster-id <kafka-cluster-id> \
--resource Subject:* --schema-registry-cluster-id <schema-registry-group-id>
Schema Registry クラスター ID は、schema-registry-group-id
(デフォルトでは schema-registry)と同じです。
ライセンス¶
ブローカー側の Schema Validation には Confluent Enterprise ライセンスが必要です。
詳細については、「Schema Registry」ドキュメントの「ライセンス」を参照してください。
おすすめのリソース¶
- ブログの投稿記事: Confluent Platform 5.4 でのスキーマ検証
- Schema Validation および Replicator: Schema Validation と、1 つのクラスターから別のクラスターへのデータのレプリケーションに使用される Replicator 構成との関係について説明