スキーマの管理

スキーマとは、Kafka のデータフォーマットの構造を定義し、データを記述するものです。スキーマは、Schema Registry によって登録および管理されます。

Confluent for Kubernetes (CFK)には、スキーマのカスタムリソース定義(CRD)が用意されています。スキーマ CRD を使用して、Kubernetes のスキーマカスタムリソース(CR)としてスキーマを宣言的に作成、読み取り、および削除することができます。

スキーマ CR は、スキーマサブジェクトのステートを表し、そのサブジェクトに属するバージョンの管理に使用されます。これには、サブジェクトおよびスキーマバージョンの作成と削除が含まれます。

スキーマ CR で管理できるのは、CFK によって作成された新規サブジェクトのみです。既存のスキーマはいずれも管理されません。

スキーマ CR 状態のステートは、スキーマサブジェクトの最新バージョンを表します。

スキーマ CR はそれぞれ、Schema Registry 内の 1 つのスキーマサブジェクトバージョンにマップされます。

スキーマは、以下の 3 つの形式のいずれかにすることができます。

  • プロトコルバッファ
  • JSON
  • Avro(デフォルト)

Schema Registry およびスキーマの詳細については、「Schema Registry の概要」を参照してください。

前提条件

  • スキーマを Schema Registry で管理するため、スキーマを作成する前に Schema Registry をデプロイします。
  • スキーマ CR の構成時に必要になる、Schema Registry に関する以下の情報を入手します。
    • Schema Registry REST API エンドポイント
    • 認証構成
    • Schema Registry に対する認証情報

新しいサブジェクト名のスキーマの作成と登録

新しいスキーマ CR を作成することによって、新しいサブジェクト名のスキーマを登録します。

スキーマを作成するときに、以下を指定する必要があります。

  • サブジェクト名
  • データ形式: Avro、JSON、または Protobuf
  • ファイルとしてのスキーマコンテンツ

あるサブジェクト名の最初のスキーマを登録すると、そのスキーマにはバージョン番号 1 が割り当てられます。

スキーマを作成するには、次の手順に従います。

  1. スキーマを格納する Kubernetes ConfigMap リソースを作成します。以下に例を示します。

    apiVersion: v1
    kind: ConfigMap
    metadata:
      name: schema-config
      namespace: operator
    data:
      schema: |
        {
          "namespace": "io.confluent.examples.clients.basicavro",
          "type": "record",
          "name": "Payment",
          "fields": [
            {"name": "id", "type": "string"},
            {"name": "amount", "type": "double"},
            {"name": "email", "type": "string"}
          ]
        }
    
  2. スキーマ CR を作成します。以下に、スキーマ CR の構造を示します。

    apiVersion: platform.confluent.io/v1beta1
    kind: Schema
    metadata:
      name:
      namespace:
    spec:
      data:
        configRef:                --- [1]
        format:                   --- [2]
    
      schemaReferences:
        subject:                  --- [3]
        version:                  --- [4]
        format:                   --- [5]
    
        avro:
          avro:                   --- [6]
    
        json:
          url:                    --- [7]
    
        protobuf:
          file:                   --- [8]
    
      schemaRegistryClusterRef:   --- [9]
    
      schemaRegistryRest:         --- [10]
    
      name:                       --- [11]
    
    • [1](必須)前の手順で作成した ConfigMap の名前。

    • [2](必須) エンコードされたスキーマの形式。サポートされる値は、avrojson、および protobuf です。

    • [3] configRef を介して参照されるスキーマのサブジェクト名。

    • [4] 参照されるスキーマサブジェクトのバージョン。

    • [5] 参照されるスキーマの形式。サポートされる値は、avrojson、および protobuf です。

    • [6] 参照されるスキーマの完全修飾名。

    • [7] 参照される JSON スキーマ名。

    • [8] 参照される Protobuf スキーマのファイル名。

    • [9] このスキーマが属する Schema Registry クラスターの名前。「Schema Registry の CR 名を使用した Schema Registry の検出」を参照してください。

    • [10] REST API の接続構成。以下を参照してください。「Schema Registry エンドポイントを使用した Schema Registry の検出」を参照してください。

    • [11] スキーマ名。設定されていない場合、スキーマの CR の名前がスキーマ名として使用されます。

      このプロパティを使用して、スキーマをコンテキストで定義できます。

      また、スキーマ名に特殊文字を含めるためにこのプロパティを使用することもできます。使用できる文字は ^[:a-zA-Z0-9_.-]*$ です。

      To include the / character in the name, replace / with its URL-encoded value, %2F, in the name.

  3. 次のように、スキーマの CR を適用します。

    kubectl apply -f <Schema CR>
    

Schema Registry の検出

以下のいずれかの方法を使用して、スキーマは、どの Schema Registry クラスターに属しているかを検出できます。

Schema Registry の CR 名を使用した Schema Registry の検出

スキーマの Schema Registry を自動検出するには、スキーマ CR で次のように設定します。

spec:
  schemaRegistryClusterRef:
    name:                        --- [1]
    namespace:                   --- [2]
  • [1](必須)このスキーマが属する Schema Registry クラスターの名前。
  • [2](省略可) Schema Registry クラスターが実行されている名前空間。ただし、このスキーマが作成されている名前空間と異なる場合に設定します。

Schema Registry エンドポイントを使用した Schema Registry の検出

Schema Registry エンドポイントへの接続方法を指定するには、スキーマ CR に接続情報を指定します。

Schema Registry エンドポイント

spec:
  schemaRegistryRest:
    endpoint:                    --- [1]
    authentication:
      type:                      --- [2]
  • [1] Schema Registry が実行されているエンドポイント。
  • [2] Schema Registry クラスターに使用する認証方式。サポートされるタイプは、basicmtls、および bearer です。Schema Registry で RBAC が有効になっている場合に bearer を使用できます。

Schema Registry に対する基本認証

spec:
  schemaRegistryRest:
    authentication:
      type: basic                 --- [1]
      basic:
        secretRef:                --- [2]
        directoryPathInContainer: --- [3]
  • [1] 基本認証タイプの場合に必須です。

  • [2] または [3] が必須です。

  • [2] 認証情報を格納するシークレットの名前。必要な形式については、「基本認証」を参照してください。

  • [3] Vault によって必要な認証情報が挿入されるコンテナーのディレクトリパス。

    必要な形式については、「基本認証」を参照してください。

    Vault を使用する際の認証情報と必要なアノテーションを提供する方法については、「Confluent Platform アプリケーションの CR へのシークレットの提供」を参照してください。

Schema Registry に対する mTLS 認証

spec:
  schemaRegistryRest:
    authentication:
      type: mtls                 --- [1]
    tls:
      secretRef:                 --- [2]
      directoryPathInContainer:  --- [3]

Schema Registry に対するベアラー認証(RBAC の場合)

Schema Registry で RBAC が有効になっている場合、ベアラー認証を次のように構成できます。

spec:
  schemaRegistryRest:
    authentication:
      type: bearer                --- [1]
      bearer:
        secretRef:                --- [2]
        directoryPathInContainer: --- [3]
  • [1] ベアラー認証タイプの場合に必須です。

  • [2] または [3] が必須です。

  • [2](必須) ベアラー認証情報を格納するシークレットの名前。必要な形式については、「ベアラー認証」を参照してください。

  • [3] 必要なベアラー認証情報がマウントされるコンテナーのディレクトリパス。

    必要な形式については、「ベアラー認証」を参照してください。

    コンテナー内のディレクトリパス機能を使用して認証情報を提供する方法については、「Confluent Platform アプリケーションの CR へのシークレットの提供」を参照してください。

既存のサブジェクトの新しいスキーマバージョンの作成と登録

スキーマサブジェクトの新しいスキーマバージョンを作成して登録するには、既存の configMap の名前を使用して、新しい configMap を構成してデプロイします。

CFK では、新しいスキーマコンテンツが、サブジェクトに対する互換性のある進化であるかどうかをチェックします。互換性がある場合、CFK はそのスキーマコンテンツをサブジェクトの新しいバージョンとして登録します。

スキーマのリストの表示

現在の名前空間に登録されているすべてのスキーマのリストを取得するには、次のコマンドを実行します。

kubectl get schema

スキーマの CR ごとに、以下の情報が返されます。

  • サブジェクト名
  • スキーマ ID
  • スキーマのバージョン番号

スキーマサブジェクトバージョンの削除

次のコマンドを使用して、スキーマサブジェクトの最新バージョンを取得できます。

kubectl get schema <schema-name> -ojsonpath="{.status.version}"

バージョンが既に削除されている場合を除き、1 から (latest version - 1) までのバージョンを削除できます。スキーマサブジェクトの最新バージョンを削除することはできません。

スキーマサブジェクトバージョンを削除し、それを Schema Registry から登録解除するには、スキーマ CR に platform.confluent.io/soft-delete-versions または platform.confluent.io/delete-versions の注釈を付けます。

以下に例を示します。

バージョン 2 で論理的な削除をトリガーするには、次のようにします。

kubectl annotate <Schema-CR-name> platform.confluent.io/soft-delete-versions="[2]"

バージョン 1 で物理的な削除をトリガーするには、次のようにします。

kubectl annotate <Schema-CR-name> platform.confluent.io/delete-versions="[1]"