Manage Kafka Schemas using Confluent for Kubernetes Blueprints

Schemas define the structure of Kafka data, and they are registered and managed by Schema Registry.

Confluent for Kubernetes (CFK) Blueprints provides the Schema custom resource definition (CRD). With the Schema CRD, you can declaratively create, read, and delete schemas as Schema custom resources (CRs) in Kubernetes.

A Schema CR represents the state of a schema subject and is used to manage the versions that belong to the subject. This includes creating and deleting subjects and schema versions.

Schema CRs can only manage new subjects created by CFK Blueprints and not any pre-existing schemas.

Similarly, you should use the Schema CRs to manage the schemas that were created by CFK Blueprints. For example, if you use Control Center to make changes to a CFK Blueprints-created schema, CFK Blueprints is not aware of those changes in the schema.

The state of a Schema CR status represents the latest version of the schema subject.

Each Schema CR is mapped to a schema subject version in Schema Registry.

A schema can be in one of three formats:

  • Protocol Buffers
  • JSON
  • Avro (default)

For details about Schema Registry and schemas, see Schema Registry Overview

You must first deploy the Schema Registry that will manage the schema.

Create and register a schema for a new subject name

Register a schema for a new subject name by creating a new Schema CR.

Note

When RABC is enabled in this Confluent Platform environment, the super user you configured for Kafka (spec.authorization.superUsers in the ConfluentPlatformaBlueprint CR) does not have access to resources in the Schema Registry cluster. If you want the super user to be able to create schemas, grant the super user permission on the Schema Registry cluster.

When creating the schema, you must specify:

  • Subject name
  • Data format: Avro, JSON, or Protobuf
  • Schema content as a file

When you register the first schema for a subject name, that schema is assigned version number 1.

To create a schema:

  1. Create a Kubernetes ConfigMap resource containing the schema. For example:

    apiVersion: v1
    kind: ConfigMap
    metadata:
      name: schema-config
    data:
      schema: |
        {
          "namespace": "io.confluent.examples.clients.basicavro",
          "type": "record",
          "name": "Payment",
          "fields": [
            {"name": "id", "type": "string"},
            {"name": "amount", "type": "double"},
            {"name": "email", "type": "string"}
          ]
        }
    
  2. Create a Schema CR:

    apiVersion: apps.cpc.platform.confluent.io/v1beta1
    kind: Schema
    spec:
      name:                       --- [1]
      compatibilityLevel:         --- [2]
      data:                       --- [3]
        configRef:                --- [4]
        format:                   --- [5]
      schemaReferences:
        subject:                  --- [6]
        version:                  --- [7]
        format:                   --- [8]
        avro:
          avro:                   --- [9]
        json:
          url:                    --- [10]
        protobuf:
          file:                   --- [11]
      schemaRegistryClusterRef:
        name:                     --- [12]
        namespace:                --- [13]
      deleteSchemaVersions:
        hardDelete:
          versions:               --- [14]
        softDelete:
          versions:               --- [15]
    
    • [1] The schema name. If not set, the Schema CR name is used as the schema name.

      Use this property to define schema with context.

      To include the / character in the name, replace / with its URL-encoded value, %2F, in the name.

    • [2] The compatibility level requirement among the schema versions. The valid options are:

      • BACKWARD: The consumer can process data produced with the current and the last versions of the schema.
      • BACKWARD_TRANSITIVE: The consumer can process data produced with the current and all previous versions of the schema.
      • FORWARD: Data produced using the current version of the schema can be read by consumers with the current and the last version of the schema.
      • FORWARD_TRANSITIVE: Data produced using the current version of the schema can be read by consumers with the current and all previous versions of the schema.
      • FULL: The current and the last versions of the schema are backward and forward compatible.
      • FULL_TRANSITIVE: The current and all the previous versions of the schema are backward and forward compatible.
      • NONE: Schema compatibility checks are disabled.

      For more information, see Schema Compatibility Types.

    • [3] Required. The data required to create the schema.

    • [4] Required. The name of the ConfigMap you create in the previous step.

    • [5] Required. The format of the encoded schema. Supported values are avro, json, and protobuf.

    • [6] Required. The subject name of the referenced schema through the configRef.

    • [7] Required. The version of the referenced schema subject.

    • [8] Required. The format of the referenced schema. Supported values are avro, json, and protobuf.

    • [9] The fully qualified name of the referenced Avro schema.

    • [10] The referenced JSON schema name.

    • [11] The file name of the referenced Protobuf schema.

    • [12] Required. The name of the Schema Registry cluster this schema belongs to.

    • [13] The namespace where the Schema Registry cluster is running if different from the namespace this schema is being created in.

    • [14] A list of versions to trigger a hard delete.

    • [15] A list of versions to trigger a soft delete.

Create and register a new schema version for an existing subject

To create and register a new schema version for a schema subject, configure and deploy a new configMap, using the existing configMap name.

CFK Blueprints checks that the new schema content is a compatible evolution for the subject. And if compatible, CFK Blueprints registers the schema content as a new version of the subject.

View a list of schemas

To get a list of all schemas registered in the current namespace, run the following command:

kubectl get schema

Delete a schema subject version

You can use the following command to get the latest version of a schema subject:

kubectl get schema.apps <schema-name> -ojsonpath="{.status.version}"

The versions from 1 to (latest version - 1) can be deleted unless a version was already deleted. You cannot delete the latest version of a schema subject.

To delete a schema subject version and unregister it from Schema Registry, add the versions you want to hard delete or soft delete and reapply the CR:

spec:
  deleteSchemaVersions:
    hardDelete:
      versions:
    softDelete:
      versions:

For example:

To trigger a soft delete on version 2:

spec:
  deleteSchemaVersions:
    softDelete:
      versions: 2

To trigger a hard delete on version 1:

spec:
  deleteSchemaVersions:
    hardDelete:
      versions: 1

Once the delete operation is complete, you can remove the versions you specified in spec.deleteSchemaVersions.