Manage Schemas

A schema defines the structure of the Kafka data format, and it describes the data. Schemas are registered and managed by Schema Registry.

Confluent for Kubernetes (CFK) provides the Schema custom resource definition (CRD). With the Schema CRD, you can declaratively create, read, and delete schemas as Schema custom resources (CRs) in Kubernetes.

A Schema CR represents the state of a schema subject and is used to manage the versions that belong to the subject. This includes creating and deleting subjects and schema versions.

Schema CRs can only manage new subjects created by CFK and not any pre-existing schemas.

The state of a Schema CR status represents the latest version of the schema subject.

Each Schema CR is mapped to a schema subject version in Schema Registry.

A schema can be in one of three formats:

  • Protocol Buffers
  • JSON
  • Avro (default)

See Schema Registry Overview for detailed information about Schema Registry and schemas.

Prerequisites

  • Deploy Schema Registry before you create schemas as schemas are managed in Schema Registry.
  • Get the information about the Schema Registry that you need when configuring Schema CRs:
    • Schema Registry REST API endpoint
    • Authentication configurations
    • Authentication credentials to the Schema Registry

Create and register a schema for a new subject name

Register a schema for a new subject name by creating a new Schema CR.

When creating the schema, you must specify:

  • Subject name
  • Data format: Avro, JSON, or Protobuf
  • Schema content as a file

When you register the first schema for a subject name, that schema is assigned version number 1.

To create a schema:

  1. Create a Kubernetes ConfigMap resource containing the schema. For example:

    apiVersion: v1
    kind: ConfigMap
    metadata:
      name: schema-config
      namespace: operator
    data:
      schema: |
        {
          "namespace": "io.confluent.examples.clients.basicavro",
          "type": "record",
          "name": "Payment",
          "fields": [
            {"name": "id", "type": "string"},
            {"name": "amount", "type": "double"},
            {"name": "email", "type": "string"}
          ]
        }
    
  2. Create a Schema CR. The following is the structure of the Schema CR:

    apiVersion: platform.confluent.io/v1beta1
    kind: Schema
    metadata:
      name:
      namespace:
    spec:
      data:
        configRef:                --- [1]
        format:                   --- [2]
    
      schemaReferences:
        subject:                  --- [3]
        version:                  --- [4]
        format:                   --- [5]
    
        avro:
          avro:                   --- [6]
    
        json:
          url:                    --- [7]
    
        protobuf:
          file:                   --- [8]
    
      schemaRegistryClusterRef:   --- [9]
    
      schemaRegistryRest:         --- [10]
    
      name:                       --- [11]
    
    • [1] Required. The name of the ConfigMap you create in the previous step.

    • [2] Required. The format of the encoded schema. Supported values are avro, json, and protobuf.

    • [3] The subject name of the referenced schema through the configRef.

    • [4] The version of the referenced schema subject.

    • [5] The format of the referenced schema. Supported values are avro, json, and protobuf.

    • [6] The fully qualified name of the referenced schema.

    • [7] The referenced JSON schema name.

    • [8] The file name of the references Protobuf schema.

    • [9] The name of the Schema Registry cluster this schema belongs to. See Discover Schema Registry using Schema Registry CR name.

    • [10] The REST API connection configuration. See below. See Discover Schema Registry using Schema Registry endpoint.

    • [11] The schema name. If not set, the Schema CR name is used as the schema name.

      Use this property to define schema with context.

      You can also use this property to include special characters in the schema name. The following characters are accepted: ^[:a-zA-Z0-9_.-]*$

  3. Apply the Schema CR:

    kubectl apply -f <Schema CR>
    

Discover Schema Registry

Using one of the following methods, a schema can discover which Schema Registry cluster it belongs to:

Discover Schema Registry using Schema Registry CR name

To auto discover the Schema Registry for the schema, set the following in the Schema CR:

spec:
  schemaRegistryClusterRef:
    name:                        --- [1]
    namespace:                   --- [2]
  • [1] Required. The name of the Schema Registry cluster this schema belongs to.
  • [2] Optional. The namespace where the Schema Registry cluster is running if different from the namespace this schema is being created in.

Discover Schema Registry using Schema Registry endpoint

To specify how to connect to the Schema Registry endpoint, specify the connection information in the Schema CR.

Schema Registry endpoint

spec:
  schemaRegistryRest:
    endpoint:                    --- [1]
    authentication:
      type:                      --- [2]
  • [1] The endpoint where Schema Registry is running.
  • [2] Authentication method to use for the Schema Registry cluster. Supported types are basic, mtls, and bearer. You can use bearer when RBAC is enabled for Schema Registry.

Basic authentication to Schema Registry

spec:
  schemaRegistryRest:
    authentication:
      type: basic                 --- [1]
      basic:
        secretRef:                --- [2]
        directoryPathInContainer: --- [3]

mTLS authentication to Schema Registry

spec:
  schemaRegistryRest:
    authentication:
      type: mtls                 --- [1]
    tls:
      secretRef:                 --- [2]
      directoryPathInContainer:  --- [3]

Bearer authentication to Schema Registry (for RBAC)

When RBAC is enabled for Schema Registry, you can configure bearer authentication as below:

spec:
  schemaRegistryRest:
    authentication:
      type: bearer                --- [1]
      bearer:
        secretRef:                --- [2]
        directoryPathInContainer: --- [3]
  • [1] Required for the bearer authentication type.

  • [2] or [3] is required.

  • [2] Required. The name of the secret that contains the bearer credentials. See Bearer authentication for the required format.

  • [3] The directory path in the container where the required the bearer credentials are mounted.

    See Bearer authentication for the required format.

    See Provide secrets for Confluent Platform application CR for providing the credential using the Directory Path in Container feature.

Create and register a new schema version for an existing subject

To create and register a new schema version for a schema subject, configure and deploy a new configMap, using the existing configMap name.

CFK checks that the new schema content is a compatible evolution for the subject. And if compatible, CFK registers the schema content as a new version of the subject.

View a list of schemas

To get a list of all schemas registered in the current namespace, run the following command:

kubectl get schema

For each schema CR, the following information is returned:

  • Subject name
  • Schema id
  • Schema version number

Delete a schema subject version

You can use the following command to get the latest version of a schema subject:

kubectl get schema <schema-name> -ojsonpath="{.status.version}"

The versions from 1 to (latest version - 1) can be deleted unless a version was already deleted. You cannot delete the latest version of a schema subject.

To delete a schema subject version and unregister it from Schema Registry, annotate the Schema CR with platform.confluent.io/soft-delete-versions or platform.confluent.io/delete-versions.

For example:

To trigger a soft delete on version 2:

kubectl annotate <Schema-CR-name> platform.confluent.io/soft-delete-versions="[2]"

To trigger a hard delete on version 1:

kubectl annotate <Schema-CR-name> platform.confluent.io/delete-versions="[1]"