Confluent Server 上の スキーマ検証¶
Looking for Schema Management Confluent Cloud docs? You are currently viewing Confluent Platform documentation. If you are looking for Confluent Cloud docs, check out Schema Management on Confluent Cloud.
Kafka のトピックに対して生成されたデータに、 サブジェクト命名方法 に従って スキーマレジストリ に登録された有効なスキーマ ID が使用されていることは、ブローカーが スキーマ検証 を通じて確認します。
ちなみに
スキーマ検証 は、環境ごとにホストされている スキーマレジストリ を使用することで、Confluent Cloud クラスターでも利用できます。詳細については、「Confluent Cloud でのブローカー側スキーマ検証の使用」を参照してください。
前提条件とブローカーでの スキーマレジストリ URL の設定¶
以下の例を実行するための基本的な要件は、 Schema Registry のチュートリアル に記載されている条件とほぼ同じです。ただし、ここでは Maven は必要ありません。また、Confluent Platform バージョン 5.4.0 以降が必要となります。
ブローカーの スキーマ検証 を有効にするためのもう 1 つの前提条件として、Confluent Platform を起動する前に、Kafka の server.properties
ファイル($CONFLUENT_HOME/etc/kafka/server.properties
)で confluent.schema.registry.url
を指定する必要があります。これにより、スキーマレジストリ への接続方法がブローカーに通知されます。
以下に例を示します。
confluent.schema.registry.url=http://schema-registry:8081
この構成には、スキーマレジストリ インスタンスの URL をコンマ区切りのリストで指定することができます。Confluent CLI と Confluent Control Center の両方から スキーマ検証 を利用するために、この設定が必要となります。
Confluent CLI からトピックの スキーマ検証 を有効にする¶
トピックの スキーマ検証 は、トピックの作成時または既存のトピックの変更時に有効にすることができます。
スキーマ検証 を使用してトピックを作成する¶
トピックの作成時にその スキーマ検証 を設定するには、confluent.value.schema.validation=true
と confluent.key.schema.validation=true
を設定します。
値スキーマの検証とキースキーマの検証は互いに独立しています。一方のみを有効にすることも、両方を有効にすることもできます。(デフォルトでは、スキーマ検証は有効になっていません。キースキーマと値スキーマの検証はどちらもデフォルトで false
になっています。)
たとえば、次のコマンドでは my-topic-sv
というトピックが作成され、値スキーマに対するスキーマ検証が有効になります。
kafka-topics --create --bootstrap-server localhost:9092 --replication-factor 1 \
--partitions 1 --topic my-topic-sv \
--config confluent.value.schema.validation=true
このコマンドの出力は次のとおりです。
Created topic my-topic-sv.
この構成で、メッセージの値の有効なスキーマを持たないメッセージがトピック my-topic-sv
に生成された場合、プロデューサーにエラーが返されて、メッセージが破棄されます。
バッチで送信されたメッセージの中に無効なメッセージが 1 つでもあった場合、そのバッチ全体が破棄されます。
ちなみに
値スキーマとキースキーマの詳細については、スキーマレジストリ のチュートリアルの「 用語の復習」を参照してください。
既存のトピックに スキーマ検証 を追加する¶
my-first-topic
という新しいトピックを作成します。
kafka-topics --create --bootstrap-server localhost:9092 --replication-factor 1 --partitions 1 --topic my-first-topic
このコマンドの出力は次のとおりです。
Created topic my-first-topic.
既存のトピックに対する検証構成を(このケースでは、false
から true
に)変更するには、alter
フラグと --add-config
フラグを使用して検証を指定します。その例を次に示します。
kafka-configs --bootstrap-server localhost:9092 --alter --entity-type topics --entity-name my-first-topic --add-config confluent.value.schema.validation=true
次のような確認メッセージが表示されます。
Completed updating config for topic 'my-first-topic'.
このトピックに対する検証を無効(true
から false
)にするには、上記のコマンドに --add-config confluent.value.schema.validation=false
を指定して再度実行します。
トピックのサブジェクト命名方法を変更する¶
スキーマレジストリ 内のスキーマにトピックをマップするための 命名方法 として、デフォルトでは TopicNameStrategy
が Confluent Server によって使用されます。
Confluent Platform 5.5.0 未満では、サブジェクト命名方法がブローカーの server.properties
で構成されており、ブローカー上のすべてのトピックに同じ命名方法を使用する必要がありました。
Confluent Platform 5.5.0 以降では、命名方法がトピックに関連付けられます。そのため、スキーマのサブジェクトのキーと値の両方に、デフォルトの "トピックごと" 以外の命名方法を選択できるようになりました。その構成には、confluent.key.subject.name.strategy
と confluent.value.subject.name.strategy
を使用します。
Confluent CLI から、--config
オプションに命名方法を指定してトピックを作成または変更します。以下に例を示します。
RecordNameStrategy
を値に使用するトピックを作成するには、次のようにします。
./bin/kafka-topics --create --bootstrap-server localhost:9092 \
--replication-factor 1 --partitions 1 --topic my-other-cool-topic \
--config confluent.value.schema.validation=true --config confluent.value.subject.name.strategy=io.confluent.kafka.serializers.subject.RecordNameStrategy
RecordNameStrategy
をキーに使用するようにトピックを変更するには、次のようにします。
kafka-configs --bootstrap-server localhost:9092 \
--alter --entity-type topics --entity-name my-other-cool-topic \
--add-config confluent.key.subject.name.strategy=io.confluent.kafka.serializers.subject.RecordNameStrategy
次の構成は、confluent.value.subject.name.strategy
と confluent.key.subject.name.strategy
のどちらにでも使用できます。
io.confluent.kafka.serializers.subject.TopicNameStrategy
(デフォルト)io.confluent.kafka.serializers.subject.RecordNameStrategy
io.confluent.kafka.serializers.subject.TopicRecordNameStrategy
Confluent Control Center でトピックの スキーマ検証 を有効にする¶
Control Center では、トピックに対する スキーマ検証 を確認したり、有効または無効にしたりできます(Control Center ガイドの「トピックのスキーマを管理する」を参照してください)。加えて、トピックごとのサブジェクト命名方法を構成することができます。
トピックの スキーマ検証 設定を表示または変更する¶
Control Center ( http://localhost:9021/ )から現在の構成を表示したりトピックの スキーマ検証 を有効にしたりするには、次の手順に従います。
既存のトピックの Configuration タブをクリックし、Edit settings をクリックします。
Switch to expert mode をクリックします。
エキスパートモードで、
confluent.value.schema.validation
とconfluent.key.schema.validation
の設定を false から true に変更します。Save changes をクリックします。
トピックのサブジェクト命名方法を変更する¶
Control Center を使用して サブジェクト命名方法 を変更するには、次の手順に従います。
Control Center で、アップデートするトピックを選択し、Configuration、Switch to expert mode を順にクリックします。
設定を変更し、Save changes をクリックします。
次の構成は、confluent.value.subject.name.strategy
と confluent.key.subject.name.strategy
のどちらにでも使用できます。
io.confluent.kafka.serializers.subject.TopicNameStrategy
(デフォルト)io.confluent.kafka.serializers.subject.RecordNameStrategy
io.confluent.kafka.serializers.subject.TopicRecordNameStrategy
詳細については、「トピックのサブジェクト命名方法を変更する」を参照してください。同じタスクを Confluent CLI で行う方法について説明しているほか、この機能の背景情報についても詳しく説明しています。
デモ : コマンドラインからトピックの スキーマ検証 を有効にする¶
この短いデモでは、トピックのスキーマ検証を有効または無効にした場合の効果を紹介しています。
Confluent Platform と スキーマレジストリ を使い始めて間もない方は、最初に「 スキーマレジストリ のチュートリアル」を読んでから、このデモをご覧になることをお勧めします。
以下の例では、$CONFLUENT_HOME/bin
にある kafka-console-producer
と kafka-console-consumer
を利用しています。
Confluent Platform バージョン 5.4.0 以降のローカルインストールでは、
$CONFLUENT_HOME/etc/kafka/server.properties
に、次の スキーマレジストリ URL の構成を追加します。############################## My Schema Validation Demo Settings ################ # Schema Registry URL confluent.schema.registry.url=http://localhost:8081
上の例では、ファイルの構成を追跡するために 2 行のコメントが含まれていますが、これらは省略してもかまいません。
次のコマンドを使用して Confluent Platform を起動します。
confluent local services start
ちなみに
- 単に
confluent local schema-registry
を実行してもかまいません。その場合は、依存関係としてkafka
とzookeeper
も起動されます。このデモでは、Connect や Control Center など、他のサービスを直接参照することはしていません。ただし、さらに理解を深めるために(トピックやメッセージが Control Center にどのように表示されるかなど)、フルスタックを実行する場合もあるでしょう。confluent local
の詳細については、「Confluent Platform を使用した Apache Kafka のクイックスタート(ローカル)」および Confluent CLI コマンドリファレンスの「confluent local」を参照してください。 confluent local
のコマンドはバックグラウンドで実行されるので、そのコマンドウィンドウを再利用することができます。プロデューサーとコンシューマーのセッションは独立している必要があります。
- 単に
test-schemas
というテストトピックを作成します。デフォルトのfalse
に設定されるよう スキーマ検証 設定は指定しません。kafka-topics --bootstrap-server localhost:9092 --create --partitions 1 --replication-factor 1 --topic test-schemas
これでトピックが作成されます。テストトピックに生成されたレコードに対するブローカー検証は行われません。このデモの最初のパートでは、意図的にそのようにしてあります。トピックが作成されたことは、
kafka-topics --bootstrap-server localhost:9092 --list
を使用して確認できます。プロデューサーの新しいコマンドウィンドウで次のコマンドを実行し、(デフォルトの文字列シリアライザーを使用して)シリアル化されたレコードをトピック
test-schemas
に生成します。kafka-console-producer --broker-list localhost:9092 --topic test-schemas --property parse.key=true --property key.separator=,
現在このトピックは スキーマ検証 が無効になっているためこのコマンドは成功します。先ほどこのトピックに対してブローカーの スキーマ検証 を有効にしていた場合、そのトピックを生成先とする上のコマンドは許可されません。
このコマンドを実行すると、プロデューサーのコマンドプロンプト(
>
)が出力として表示されます。ここで、生成するメッセージを入力することができます。>
プロンプトに、初めてのメッセージを次のように入力します。1,my first record
このプロデューサーセッションは実行したままにしてください。
コンシューマー用に新しいコマンドウィンドウを開き、次のコマンドを入力してメッセージを読み取ります。
kafka-console-consumer --bootstrap-server localhost:9092 --from-beginning --topic test-schemas --property print.key=true
このコマンドからは
my first record
が出力されます。このコンシューマーセッションは実行したままにしてください。
今度は、トピック
test-schemas
の スキーマ検証 をtrue
に設定します。kafka-configs --bootstrap-server localhost:9092 --alter --entity-type topics --entity-name test-schemas --add-config confluent.value.schema.validation=true
Completed updating config for topic test-schemas.
という確認のメッセージが表示されます。プロデューサーセッションに戻り、
>
プロンプトに 2 つ目のメッセージを入力します。2,my second record
スキーマ検証 が有効になっており送信しているメッセージにはスキーマ ID が含まれていないので、
This record has failed the validation on broker
というエラーが返されます。次に、スキーマ検証 を無効にし(同じコマンドを使用して
false
に設定)、プロデューサーを再起動した後、同じメッセージまたは同様のフォーマットのメッセージを入力して再送信すると、今度はメッセージが承認されます。(たとえば、3,my third record
を生成してみてください。)ちなみに
この最後の手順で(スキーマ検証を
false
に切り替えた後に)プロデューサーを再起動する代わりに、エラーの後の空白行に3,my third record
というメッセージを入力またはコピーアンドペーストして、Enter キーを押すこともできます。コンシューマーがこれを取り込み、プロデューサーのプロンプトに戻ります。ただし、これはわかりやすいワークフローではありません。エラーの後にはプロンプトが表示されないため、まず Enter キーを押そうとしがちですが、そうするとプロデューサーがシャットダウンしてしまいます。正常に生成されたメッセージは、Control Center (ウェブブラウザーで http://localhost:9021/ にアクセス)で、Topics、test-schemas、messages の順に選択して表示される画面にも表示されます。以前に送信されたメッセージを表示するには、パーティションを選択するか、タイムスタンプに移動しなければならない場合があります。
シャットダウンとクリーンアップのタスクを実行します。
- コンシューマーとプロデューサーを停止するには、それぞれのコマンドウィンドウで Ctl キーを押しながら C キーを押します。
- Confluent Platform を停止するには、「
confluent local stop
」と入力します。 - 別のテストを最初から実行する前に既存のデータ(トピック、スキーマ、メッセージ)を削除する場合は、「
confluent local destroy
」と入力します。
スキーマ検証 のセキュリティの構成¶
一般には、スキーマレジストリ がブローカーへの接続を開始します。スキーマ検証 は、ブローカーが スキーマレジストリ への接続を開始するという点で独特です。このようになっているのは、レジストリからスキーマを取得し、プロデューサーから受信したメッセージが、特定のトピックに関連付けられているスキーマと一致することを検証するためです。スキーマ検証 が有効な場合、タスクの流れは次のようになります。
- ブローカーがプロデューサーからメッセージを受信し、その宛先が、スキーマが関連付けられているトピックであることを認識します。
- ブローカーが スキーマレジストリ への接続を開始します。
- ブローカーは、トピックに関連付けられているスキーマを(スキーマ ID によって)要求します。
- スキーマレジストリ がリクエストを受信し、要求されたスキーマをスキーマストレージから見つけてブローカーに返します。
- ブローカーは、メッセージをスキーマに照らして検証します。
したがって、トピックに対してブローカー側の スキーマ検証 が有効になっているクラスターでセキュリティをセットアップするには、ブローカーから開始される スキーマレジストリ への接続をサポートするために、Kafka ブローカーで設定を構成する必要があります。複数のブローカーがある場合は、それぞれのブローカーを構成する必要があります。たとえば、mTLS では、ブローカーごとに異なる証明書を用意することが理想的です。
Schema Registry の内部 Kafka クライアントから Kafka ブローカーへの接続は、ブローカー側の スキーマ検証 と Schema Registry の HTTP リスナー間の接続とはまったく関係がありません。以降のセキュリティ設定は、スキーマレジストリ 内部のクライアントからブローカーへの接続を反映するものではありません。
以下のブローカー構成には confluent.schema.registry.url
が含まれ、これによって スキーマレジストリ への接続方法がブローカーに通知されます。これは既に、スキーマ検証を使用するための前提条件 としてブローカーに構成されている可能性があります。示されている残りの設定は、それぞれのセキュリティ構成に固有のものです。
ちなみに
以下のセクションでは、ブローカーのセキュリティ構成に焦点を当てています。スキーマレジストリ のセキュリティを設定する方法の詳細については、「スキーマレジストリ のセキュリティの概要」および関連するセクションを参照してください。たとえば、クライアント(ブローカーなど)にユーザー名とパスワードを使用した認証を要求するように スキーマレジストリ をセットアップするには、「HTTP 基本認証」の「スキーマレジストリ」を参照してください。
mTLS(相互 TLS)認証¶
SSL を使用した双方向認証 (mTLS)を構成するには、認証(証明書)と 暗号化 に SSL を使用して、ブローカーを スキーマレジストリ に認証します。
各ブローカーのブローカープロパティファイル($CONFLUENT_HOME/etc/kafka/server.properties
)で、次の設定を定義します。
confluent.schema.registry.url=https://<host>:<port>
confluent.ssl.truststore.location=<path to truststore file>
confluent.ssl.truststore.password=<password for the truststore if encrypted>
confluent.ssl.keystore.location=<path to file with private key for the broker>
confluent.ssl.keystore.password=<password to keystore file>
confluent.ssl.key.password=<password>
基本認証¶
このセットアップでは、ブローカーが 基本認証 を使用して スキーマレジストリ に認証するように構成されます。
各ブローカー($CONFLUENT_HOME/etc/kafka/server.properties
)で、次の設定を定義します。
confluent.schema.registry.url=http://<host>:<port>
confluent.basic.auth.credentials.source=<USER_INFO, URL, or SASL_INHERIT>
confluent.basic.auth.user.info=<username>:<password> #required only if credentials source is set to USER_INFO
- プロパティ
confluent.basic.auth.credentials.source
は、使用する認証情報のタイプ(ユーザー名とパスワード)を定義します。これらは変数ではなくリテラルです。 confluent.basic.auth.credentials
をUSER_INFO
に設定した場合は、confluent.basic.auth.user.info
も指定する必要があります。
SSL を使用する基本認証¶
各ブローカー($CONFLUENT_HOME/etc/kafka/server.properties
)で、次の設定を定義します。
confluent.schema.registry.url=https://<host>:<port>
confluent.basic.auth.credentials.source=USER_INFO
confluent.basic.auth.user.info=<username>:<password>
confluent.ssl.truststore.location=<path to the truststore file>
confluent.ssl.truststore.password=<password for the truststore if encrypted>
ロールベースアクセス制御(RBAC)と ACL¶
スキーマレジストリ への認可のために、ロールベースアクセス制御 (および ACL またはそのどちらか)を行って、ブローカーで他のタイプのセキュリティとともに使用することができます。
スキーマレジストリ セキュリティプラグイン では、RBAC と ACL の両方の認可がサポートされます。
ブローカーの構成¶
セキュリティのタイプに応じて、前のセクションと同様の設定をブローカーに適切に定義します。たとえば、基本認証では、ブローカーのプロパティファイルに次のような設定を含めます。
confluent.schema.registry.url=http://<host>:<port>
confluent.basic.auth.credentials.source=USER_INFO
confluent.basic.auth.user.info=<username>:<password> #required only if credentials source is set to USER_INFO
スキーマレジストリ へのアクセスを許可するための RBAC ロールバインディングの設定¶
RBAC ロールバインディングを定義するには、スキーマレジストリ に少なくとも次の IAM 割り当てが必要です。
confluent iam rolebinding list --principal User:<user-id> \
--role DeveloperRead --kafka-cluster-id <kafka-cluster-id> \
--resource Subject:* --schema-registry-cluster-id <schema-registry-group-id>
スキーマレジストリ クラスター ID は、schema-registry-group-id
(デフォルトでは schema-registry)と同じです。
おすすめのリソース¶
- ブログの投稿記事 : Confluent Platform 5.4 でのスキーマ検証
- スキーマ検証 および Replicator: スキーマ検証 と、1 つのクラスターから別のクラスターへのデータのレプリケーションに使用される Replicator 構成との関係について説明