重要

このページの日本語コンテンツは古くなっている可能性があります。最新の英語版コンテンツをご覧になるには、こちらをクリックしてください。

Confluent Cloud でのブローカー側のスキーマ検証の使用

Looking for Confluent Platform Schema Management docs? You are currently viewing Confluent Cloud documentation. If you are looking for Confluent Platform docs, check out Schema Management on Confluent Platform.

Kafka のトピックに対して生成されたデータに、サブジェクト命名方法 に従って スキーマレジストリ に登録された有効なスキーマ ID が使用されていることは、ブローカーが スキーマ検証 を通じて確認します(「スキーマ、サブジェクト、トピック」も参照してください)。

前提条件

  • Confluent Cloud の スキーマ検証 は 専用クラスター でのみ、ホストされている スキーマレジストリ を介して使用できます。Confluent Cloud ブローカーでは スキーマレジストリ のセルフマネージド型のインスタンスを使用することはできません。Confluent Cloud でホストされている スキーマレジストリ のみを使用できます。(/platform/current/スキーマ検証は、 使用できます。)
  • スキーマ検証 を使用する 環境でスキーマレジストリを有効に しておく必要があります。
  • スキーマ検証 は、環境レベルで境界があります。同じ環境内のすべての専用クラスターが 1 つの スキーマレジストリ を共有します。クラスターから、別の環境のスキーマを確認することはできません。

トピックに関する スキーマ検証 の構成オプション

スキーマ検証 は、トピックレベルで、以下のパラメーターを使用して設定します。

プロパティ 説明
confluent.key.schema.validation true に設定すると、メッセージキーに対するスキーマ ID の検証が有効になります。デフォルト値は false です。
confluent.value.schema.validation true に設定すると、メッセージの値に対するスキーマ ID の検証が有効になります。デフォルト値は false です。
confluent.key.subject.name.strategy メッセージキーのサブジェクト命名方法を設定します。デフォルト値は io.confluent.kafka.serializers.subject.TopicNameStrategy です。
confluent.value.subject.name.strategy メッセージの値のサブジェクト命名方法を設定します。デフォルト値は io.confluent.kafka.serializers.subject.TopicNameStrategy です。

ちなみに

  • 値スキーマの検証とキースキーマの検証は互いに独立しています。一方のみを有効にすることも、両方を有効にすることもできます。
  • サブジェクトの命名方法は、スキーマ検証 と紐付けられています。スキーマ検証 が有効になっていない場合、効果はありません。

Confluent CLI の最新バージョンの取得

  • 既に Confluent CLI がある場合は、最新の状態であることを確認します。

    既に Confluent Cloud CLI がある場合は、ccloud update --major を実行すると、利用可能なアップデートと confluent v2.0 への直接アップグレードのプロンプトが表示されます。

    既に新しい統合 CLI がある場合は confluent update を実行します。統合 CLI を使用すると、コマンドで ccloud ではなく confluent を使用できます。アップグレードしたら、参考として 便利な統合 CLI の Confluent コマンドリファレンス をご覧ください。

  • To learn more about the new unified CLI and migration paths, see Install Confluent CLI, Migrate to Confluent CLI v2, and Run multiple CLIs in parallel.

Confluent CLI から スキーマ検証 を有効にする

トピックの スキーマ検証 は、トピックの作成時または既存のトピックの変更時に有効にすることができます。

スキーマ検証 を有効にするコマンド構文は次のとおりです。

confluent kafka topic <create|update> <topic-name> --config confluent.<key|value>.schema.validation=true

たとえば、次のコマンドでは flights というトピックが作成され、値スキーマに対するスキーマ検証が有効になります。

confluent kafka topic create flights --config confluent.value.schema.validation=true

この構成で、メッセージの値に対する有効なスキーマを持たないメッセージがトピック flights に対して生成された場合、プロデューサーにエラーが返され、メッセージは破棄されます。

バッチで送信されたメッセージの中に無効なメッセージが 1 つでもあった場合、そのバッチ全体が破棄されます。

別のサブジェクト命名方法を指定しない場合、デフォルトで io.confluent.kafka.serializers.subject.TopicNameStrategy が使用されます。メッセージキーとメッセージの値のスキーマのいずれか、または両方に使用される命名方法を変更することができます。たとえば、次のコマンドを使用すると、トピック flights に対して io.confluent.kafka.serializers.subject.RecordNameStrategy を使用するようにサブジェクト命名方法が設定されます。

confluent kafka topic update flights --config confluent.value.subject.name.strategy=io.confluent.kafka.serializers.subject.RecordNameStrategy

次の 命名方法confluent.value.subject.name.strategy の値として使用できます。

命名方法 説明
TopicNameStrategy サブジェクト名はトピック名に由来します。デフォルトではこの方法が使用されます。
RecordNameStrategy サブジェクト名はレコード名に由来します。イベントは、それぞれデータ構造が異なる場合がありますが、この命名方法によって、関連するイベントをサブジェクトの下で論理的にグループ化できます。
TopicRecordNameStrategy サブジェクト名はトピック名とレコード名に由来します。イベントは、それぞれデータ構造が異なる場合がありますが、この命名方法によって、関連するイベントをサブジェクトの下で論理的にグループ化できます。

注釈

このセクションの例にあるように、上記の命名方法の完全なクラス名は、命名方法の名称に、プレフィックスとして io.confluent.kafka.serializers.subject を付けたものになります。

Confluent Cloud Console からトピックの スキーマ検証 を有効にする

Cloud Console からトピックの スキーマ検証 を有効にするには

  1. トピックを開きます。

  2. Configuration タブをクリックします。

  3. Edit settings をクリックします。

  4. Switch to expert mode をクリックします。

  5. エキスパートモードで、confluent.value.schema.validationconfluent.key.schema.validation の設定を false から true に変更します。

    ../_images/cloud-sv-topic-enable.png

    ちなみに

    別の命名方法を指定しない場合、デフォルトで TopicNameStrategy が使用されます。

    メッセージキーとメッセージの値のスキーマのいずれか、または両方に使用される命名方法を変更することができます。これらの設定は、選択したトピックの エキスパートモード でも使用できます。必要に応じて、ここで設定します。

  6. Save changes をクリックします。

スキーマ検証 を無効にするには、同じ構成オプションを false に設定します。

スキーマ検証 のデモ

この短いデモに従って、スキーマ検証 のテストを行うことができます。

  1. ウェブ UI または Confluent CLI で players-maple というテストトピックを作成します。スキーマ検証 の設定は指定しません。これにより、このトピックではデフォルトの false が使用されます。

    Confluent CLI で使用するコマンドは、次のようになります。

    confluent kafka topic create players-maple
    

    これでトピックが作成されます。テストトピックに生成されたレコードに対するブローカー検証は行われません。このデモの最初のパートでは、意図的にそのようにしてあります。

  2. プロデューサー用の新しいコマンドウィンドウで(Confluent Cloud にログインし、同じ環境およびクラスターを使用します)、次のコマンドを実行して、トピック players-maple に対して(デフォルトの文字列シリアライザーを使用して)シリアル化されたレコードを生成します。

    confluent kafka topic produce players-maple --parse-key=true --delimiter=,
    

    現在このトピックは スキーマ検証 が無効になっているためこのコマンドは成功します。先ほどこのトピックに対してブローカーの スキーマ検証 を有効にしていた場合、そのトピックを生成先とする上のコマンドは許可されません。

    プロデューサーのプロンプトで、最初のメッセージを次のように入力します。

    1,Pierre
    

    このプロデューサーセッションは実行したままにしてください。

  3. コンシューマー用の新しいコマンドウィンドウを開き(Confluent Cloud にログインし、同じ環境およびクラスターを使用します)、次のコマンドを入力して、メッセージを読み取ります。

    confluent kafka topic consume players-maple --from-beginning --print-key=true
    

    このコマンドの出力は 1 Pierre です。

    このコンシューマーセッションは実行したままにしてください。

  4. 次に、トピック players-maple の スキーマ検証 を true に設定します。

    confluent kafka topic update players-maple --config confluent.value.schema.validation=true
    

    ちなみに

    players-maple トピックの構成について、Confluent Cloud Console の エキスパートモード でこの設定を更新することもできます。

  5. プロデューサーセッションに戻り、プロンプトに 2 つ目のメッセージを入力します。

    2,Frederik
    

    スキーマ検証 が有効になっており、送信しているメッセージにスキーマ ID が含まれていないため、次のエラーが返されます。Error: producer has detected an INVALID_RECORD error for topic players-maple

    その後、スキーマ検証 を無効にして(同じコマンドを使用して false を設定します)、同じメッセージまたは似たようなフォーマットのメッセージを入力して、改めて送信すると、そのメッセージは生成されます(たとえば、3,Ben を生成します)。

    正常に生成されたメッセージは、ウェブブラウザーで Topics、players-maple、Messages の順に選択すると、その画面にも表示されます。以前に送信されたメッセージを表示するには、パーティションを選択するか、特定のタイムスタンプに移動しなければならない場合があります。

スキーマ検証 でチェックされる内容とその仕組み

トピックに対して スキーマ検証 を有効にすると、各メッセージについて以下のチェックが行われます。

  • トピックに対して生成されたメッセージに関連付けられたスキーマがあること。(メッセージに関連付けられたスキーマ ID が存在する必要があります。それにより、スキーマがあることがわかります)。
  • スキーマがトピックと一致している必要があります。

上記のデモでは、スキーマ検証 が機能していることが Confluent CLI を使用して簡単に示されています。

実際には、通常はクライアントアプリケーションの機能として、Avro オブジェクト、Protobuf オブジェクト、または Jackson でシリアル化可能な POJO を送信します。この場合、スキーマ検証 はオブジェクトをもとにスキーマを派生させます。スキーマは スキーマレジストリ に送信され、そのスキーマが サブジェクト に存在するかどうかがチェックされます。存在する場合、スキーマレジストリ はそのバージョンのスキーマ ID を使用します。存在しない場合、スキーマレジストリ は、クライアントでスキーマの自動登録が false に設定されていれば、エラーをスローし、クライアントでスキーマの自動登録が true に設定されていれば、スキーマを登録します。

スキーマの自動登録は、クライアントアプリケーションで設定されます。クライアントアプリケーションは、デフォルトで新しいスキーマを自動的に登録します。クライアントでスキーマの自動登録を無効にすることができます。本稼働環境では通常、無効にしておくことが推奨されます。詳細については、Confluent Platform ドキュメントの「スキーマの自動登録の無効化」を参照してください。

おすすめの関連情報