Manage Schemas¶
A schema defines the structure of the Kafka data format, and it describes the data. Schemas are registered and managed by Schema Registry.
Confluent for Kubernetes (CFK) provides the Schema custom resource definition (CRD). With the Schema CRD, you can declaratively create, read, and delete schemas as Schema custom resources (CRs) in Kubernetes.
A Schema CR represents the state of a schema subject and is used to manage the versions that belong to the subject. This includes creating and deleting subjects and schema versions.
Schema CRs can only manage new subjects created by CFK and not any pre-existing schemas.
The state of a Schema CR status represents the latest version of the schema subject.
Each Schema CR is mapped to a schema subject version in Schema Registry.
A schema can be in one of three formats:
- Protocol Buffers
- JSON
- Avro (default)
See Schema Registry Overview for detailed information about Schema Registry and schemas.
Prerequisites¶
- Deploy Schema Registry before you create schemas as schemas are managed in Schema Registry.
- Get the information about the Schema Registry that you need when configuring Schema CRs:
- Schema Registry REST API endpoint
- Authentication configurations
- Authentication credentials to the Schema Registry
Create and register a schema for a new subject name¶
Register a schema for a new subject name by creating a new Schema CR.
When creating the schema, you must specify:
- Subject name
- Data format: Avro, JSON, or Protobuf
- Schema content as a file
When you register the first schema for a subject name, that schema is assigned version number 1.
To create a schema:
Create a Kubernetes ConfigMap resource containing the schema. For example:
apiVersion: v1 kind: ConfigMap metadata: name: schema-config namespace: operator data: schema: | { "namespace": "io.confluent.examples.clients.basicavro", "type": "record", "name": "Payment", "fields": [ {"name": "id", "type": "string"}, {"name": "amount", "type": "double"}, {"name": "email", "type": "string"} ] }
Create a Schema CR. The following is the structure of the Schema CR:
apiVersion: platform.confluent.io/v1beta1 kind: Schema metadata: name: namespace: spec: data: configRef: --- [1] format: --- [2] schemaReferences: subject: --- [3] version: --- [4] format: --- [5] avro: avro: --- [6] json: url: --- [7] protobuf: file: --- [8] schemaRegistryClusterRef: --- [9] schemaRegistryRest: --- [10] name: --- [11]
[1] Required. The name of the ConfigMap you create in the previous step.
[2] Required. The format of the encoded schema. Supported values are
avro
,json
, andprotobuf
.[3] The subject name of the referenced schema through the
configRef
.[4] The version of the referenced schema subject.
[5] The format of the referenced schema. Supported values are
avro
,json
, andprotobuf
.[6] The fully qualified name of the referenced schema.
[7] The referenced JSON schema name.
[8] The file name of the references Protobuf schema.
[9] The name of the Schema Registry cluster this schema belongs to. See Discover Schema Registry using Schema Registry CR name.
[10] The REST API connection configuration. See below. See Discover Schema Registry using Schema Registry endpoint.
[11] The schema name. If not set, the Schema CR name is used as the schema name.
Use this property to define schema with context.
You can also use this property to include special characters in the schema name. The following characters are accepted:
^[:a-zA-Z0-9_.-]*$
To include the
/
character in the name, replace/
with its URL-encoded value,%2F
, in the name.
Apply the Schema CR:
kubectl apply -f <Schema CR>
Discover Schema Registry¶
Using one of the following methods, a schema can discover which Schema Registry cluster it belongs to:
Provide the CR name and namespace that specifies the Schema Registry deployment
Use the Schema Registry deployed in the same namespace as the schema namespace
If neither of the above options is configured, the schema looks for a CFK-managed Schema Registry in the namespace of the schema.
Discover Schema Registry using Schema Registry CR name¶
To auto discover the Schema Registry for the schema, set the following in the Schema CR:
spec:
schemaRegistryClusterRef:
name: --- [1]
namespace: --- [2]
- [1] Required. The name of the Schema Registry cluster this schema belongs to.
- [2] Optional. The namespace where the Schema Registry cluster is running if different from the namespace this schema is being created in.
Discover Schema Registry using Schema Registry endpoint¶
To specify how to connect to the Schema Registry endpoint, specify the connection information in the Schema CR.
Schema Registry endpoint
spec:
schemaRegistryRest:
endpoint: --- [1]
authentication:
type: --- [2]
- [1] The endpoint where Schema Registry is running.
- [2] Authentication method to use for the Schema Registry cluster. Supported types are
basic
,mtls
, andbearer
. You can usebearer
when RBAC is enabled for Schema Registry.
Basic authentication to Schema Registry
spec:
schemaRegistryRest:
authentication:
type: basic --- [1]
basic:
secretRef: --- [2]
directoryPathInContainer: --- [3]
[1] Required for the basic authentication type.
[2] or [3] is required.
[2] The name of the secret that contains the credentials. See Basic authentication for the required format.
[3] The directory path in the container where the required credentials are injected by Vault.
See Basic authentication for the required format.
See Provide secrets for Confluent Platform application CR for providing the credentials and required annotations when using Vault.
mTLS authentication to Schema Registry
spec:
schemaRegistryRest:
authentication:
type: mtls --- [1]
tls:
secretRef: --- [2]
directoryPathInContainer: --- [3]
[1] Required for the mTLS authentication type.
[2] The name of the secret that contains the TLS certificates.
See Configure Network Encryption with Confluent for Kubernetes for the expected keys in the TLS secret.
[3] The directory path in the container where the expected keys and certificates are mounted.
See Configure Network Encryption with Confluent for Kubernetes for the expected TLS keys and certificates.
See Provide secrets for Confluent Platform application CR for providing the keys and certificates using the Directory Path in Container feature.
Bearer authentication to Schema Registry (for RBAC)
When RBAC is enabled for Schema Registry, you can configure bearer authentication as below:
spec:
schemaRegistryRest:
authentication:
type: bearer --- [1]
bearer:
secretRef: --- [2]
directoryPathInContainer: --- [3]
[1] Required for the bearer authentication type.
[2] or [3] is required.
[2] Required. The name of the secret that contains the bearer credentials. See Bearer authentication for the required format.
[3] The directory path in the container where the required the bearer credentials are mounted.
See Bearer authentication for the required format.
See Provide secrets for Confluent Platform application CR for providing the credential using the Directory Path in Container feature.
Create and register a new schema version for an existing subject¶
To create and register a new schema version for a schema subject, configure and deploy a new configMap, using the existing configMap name.
CFK checks that the new schema content is a compatible evolution for the subject. And if compatible, CFK registers the schema content as a new version of the subject.
View a list of schemas¶
To get a list of all schemas registered in the current namespace, run the following command:
kubectl get schema
For each schema CR, the following information is returned:
- Subject name
- Schema id
- Schema version number
Delete a schema subject version¶
You can use the following command to get the latest version of a schema subject:
kubectl get schema <schema-name> -ojsonpath="{.status.version}"
The versions from 1
to (latest version - 1)
can be deleted unless a
version was already deleted. You cannot delete the latest version of a schema
subject.
To delete a schema subject version and unregister it from Schema Registry, annotate the
Schema CR with platform.confluent.io/soft-delete-versions
or
platform.confluent.io/delete-versions
.
For example:
To trigger a soft delete on version 2:
kubectl annotate <Schema-CR-name> platform.confluent.io/soft-delete-versions="[2]"
To trigger a hard delete on version 1:
kubectl annotate <Schema-CR-name> platform.confluent.io/delete-versions="[1]"