Manage Schemas for Confluent Platform Using Confluent for Kubernetes

Schemas define Kafka data format and are registered and managed by Schema Registry.

Confluent for Kubernetes (CFK) provides the Schema custom resource definition (CRD). With the Schema CRD, you can declaratively create, read, and delete schemas as Schema custom resources (CRs) in Kubernetes.

A Schema CR represents the state of a schema subject and is used to manage the versions that belong to the subject. This includes creating and deleting subjects and schema versions.

Schema CRs can only manage new subjects created by CFK and not any pre-existing schemas.

Similarly, you should use the Schema CRs to manage the schemas that were created by CFK. For example, if you use Control Center to make changes to a CFK-created schema, CFK is not aware of those changes in the schema.

The state of a Schema CR status represents the latest version of the schema subject.

Each Schema CR is mapped to a schema subject version in Schema Registry.

A schema can be in one of three formats:

  • Protocol Buffers
  • JSON
  • Avro (default)

See Schema Registry Overview for detailed information about Schema Registry and schemas.

Prerequisites

  • Deploy Schema Registry before you create schemas as schemas are managed in Schema Registry.
  • Get the information about the Schema Registry that you need when configuring Schema CRs:
    • Schema Registry REST service endpoint
    • Authentication configurations
    • Authentication credentials to the Schema Registry

Create and register a schema for a new subject name

Register a schema for a new subject name by creating a new Schema CR.

Note

When RABC is enabled in this Confluent Platform environment, the super user you configured for Kafka (kafka.spec.authorization.superUsers) does not have access to resources in the Schema Registry cluster. If you want the super user to be able to create schemas, grant the super user the permission on the Schema Registry cluster.

When creating the schema, you must specify:

  • Subject name
  • Data format: Avro, JSON, or Protobuf
  • Schema content as a file

When you register the first schema for a subject name, that schema is assigned version number 1.

To create a schema:

  1. Create a Kubernetes ConfigMap resource containing the schema. For example:

    apiVersion: v1
    kind: ConfigMap
    metadata:
      name: schema-config
      namespace: operator
    data:
      schema: |
        {
          "namespace": "io.confluent.examples.clients.basicavro",
          "type": "record",
          "name": "Payment",
          "fields": [
            {"name": "id", "type": "string"},
            {"name": "amount", "type": "double"},
            {"name": "email", "type": "string"}
          ]
        }
    
  2. Create a Schema CR. The following is the structure of the Schema CR:

    apiVersion: platform.confluent.io/v1beta1
    kind: Schema
    metadata:
      name:
      namespace:
    spec:
      compatibilityLevel:         --- [1]
      data:
        configRef:                --- [2]
        format:                   --- [3]
      mode:                       --- [4]
      name:                       --- [5]
      normalize:                  --- [6]
      schemaReferences:
        subject:                  --- [7]
        version:                  --- [8]
        format:                   --- [9]
    
        avro:
          avro:                   --- [10]
    
        json:
          url:                    --- [11]
    
        protobuf:
          file:                   --- [12]
    
      schemaRegistryClusterRef:   --- [13]
    
      schemaRegistryRest:         --- [14]
    
      name:                       --- [15]
    
    • [1] The compatibility level requirement among the schema versions. The valid options are:

      • BACKWARD: Consumer can process data produced with the current and the last versions of the schema.
      • BACKWARD_TRANSITIVE: Consumer can process data produced with the current and all previous versions of the schema.
      • FORWARD: Data produced using the current version of the schema can be read by consumers with the current and the last version of the schema.
      • FORWARD_TRANSITIVE: Data produced using the current version of the schema can be read by consumers with the current and all previous versions of the schema.
      • FULL: The current and the last versions of the schema are backward and forward compatible.
      • FULL_TRANSITIVE: The current and all the previous versions of the schema are backward and forward compatible.
      • NONE: Schema compatibility checks are disabled.

      For more information, see Schema Compatibility Types.

    • [2] Required. The name of the ConfigMap you create in the previous step.

    • [3] Required. The format of the encoded schema. Supported values are avro, json, and protobuf.

    • [4] Set the Schema Registry mode on a specific subject of a schema. Valid options are IMPORT, READONLY, and READWRITE.

    • [5] The subject name of the schema. If not configured, the Schema CR name is used as the subject name.

    • [6] Set to true to normalize the schema at the time of registering to the Schema Registry. For more information, see Schema Normalization.

    • [7] The subject name of the referenced schema through the configRef.

    • [8] The version of the referenced schema subject.

    • [9] The format of the referenced schema. Supported values are avro, json, and protobuf.

    • [10] The fully qualified name of the referenced schema.

    • [11] The referenced JSON schema name.

    • [12] The file name of the references Protobuf schema.

    • [13] The name of the Schema Registry cluster this schema belongs to. See Discover Schema Registry using Schema Registry CR name.

    • [14] The REST service connection configuration. See below. See Discover Schema Registry using Schema Registry endpoint.

    • [15] The schema name. If not set, the Schema CR name is used as the schema name.

      Use this property to define schema with context.

      You can also use this property to include special characters in the schema name. The following characters are accepted: ^[:a-zA-Z0-9_.-]*$

      To include the / character in the name, replace / with its URL-encoded value, %2F, in the name.

  3. Apply the Schema CR:

    kubectl apply -f <Schema CR>
    

Discover Schema Registry

Using one of the following methods, a schema can discover which Schema Registry cluster it belongs to:

Discover Schema Registry using Schema Registry CR name

To auto discover the Schema Registry for the schema, set the following in the Schema CR:

spec:
  schemaRegistryClusterRef:
    name:                        --- [1]
    namespace:                   --- [2]
  • [1] Required. The name of the Schema Registry cluster this schema belongs to.
  • [2] Optional. The namespace where the Schema Registry cluster is running if different from the namespace this schema is being created in.

Discover Schema Registry using Schema Registry endpoint

To specify how to connect to the Schema Registry endpoint, specify the connection information in the Schema CR.

Schema Registry endpoint

spec:
  schemaRegistryRest:
    endpoint:                    --- [1]
    authentication:
      type:                      --- [2]
  • [1] The endpoint where Schema Registry is running.
  • [2] Authentication method to use for the Schema Registry cluster. Supported types are basic, mtls, bearer, and oauth. You can use bearer when RBAC is enabled for Schema Registry.

Basic authentication to Schema Registry

spec:
  schemaRegistryRest:
    authentication:
      type: basic                 --- [1]
      basic:
        secretRef:                --- [2]
        directoryPathInContainer: --- [3]

mTLS authentication to Schema Registry

spec:
  schemaRegistryRest:
    authentication:
      type: mtls                 --- [1]
    tls:
      secretRef:                 --- [2]
      directoryPathInContainer:  --- [3]

Bearer authentication to Schema Registry (for RBAC)

When RBAC is enabled for Schema Registry, you can configure bearer authentication as below:

spec:
  schemaRegistryRest:
    authentication:
      type: bearer                --- [1]
      bearer:
        secretRef:                --- [2]
        directoryPathInContainer: --- [3]
  • [1] Required for the bearer authentication type.

  • [2] or [3] is required.

  • [2] Required. The name of the secret that contains the bearer credentials. See Bearer authentication for the required format.

  • [3] The directory path in the container where the required the bearer credentials are mounted.

    See Bearer authentication for the required format.

    See Provide secrets for Confluent Platform application CR for providing the credential using the Directory Path in Container feature.

OAuth authorization and authentication to Schema Registry

schemaRegistryRest:
  authentication:
    type: oauth                  --- [1]
    oauth:
      secretRef:                 --- [2]
      directoryPathInContainer:  --- [3]
      configuration:             --- [4]

Create and register a new schema version for an existing subject

To create and register a new schema version for a schema subject, configure and deploy a new configMap, using the existing configMap name.

CFK checks that the new schema content is a compatible evolution for the subject. And if compatible, CFK registers the schema content as a new version of the subject.

View a list of schemas

To get a list of all schemas registered in the current namespace, run the following command:

kubectl get schema

For each schema CR, the following information is returned:

  • Subject name
  • Schema id
  • Schema version number

Delete a schema subject version

You can use the following command to get the latest version of a schema subject:

kubectl get schema <schema-name> -ojsonpath="{.status.version}"

The versions from 1 to (latest version - 1) can be deleted unless a version was already deleted. You cannot delete the latest version of a schema subject.

To delete a schema subject version and unregister it from Schema Registry, annotate the Schema CR with platform.confluent.io/soft-delete-versions or platform.confluent.io/delete-versions.

For example:

To trigger a soft delete on version 2:

kubectl annotate <Schema-CR-name> platform.confluent.io/soft-delete-versions="[2]"

To trigger a hard delete on version 1:

kubectl annotate <Schema-CR-name> platform.confluent.io/delete-versions="[1]"