Manage Schemas using Confluent for Kubernetes¶
Schemas define Kafka data format and are registered and managed by Schema Registry.
Confluent for Kubernetes (CFK) provides the Schema custom resource definition (CRD). With the Schema CRD, you can declaratively create, read, and delete schemas as Schema custom resources (CRs) in Kubernetes.
A Schema CR represents the state of a schema subject and is used to manage the versions that belong to the subject. This includes creating and deleting subjects and schema versions.
Schema CRs can only manage new subjects created by CFK and not any pre-existing schemas.
Similarly, you should use the Schema CRs to manage the schemas that were created by CFK. For example, if you use Control Center to make changes to a CFK-created schema, CFK is not aware of those changes in the schema.
The state of a Schema CR status represents the latest version of the schema subject.
Each Schema CR is mapped to a schema subject version in Schema Registry.
A schema can be in one of three formats:
- Protocol Buffers
- JSON
- Avro (default)
See Schema Registry Overview for detailed information about Schema Registry and schemas.
Prerequisites¶
- Deploy Schema Registry before you create schemas as schemas are managed in Schema Registry.
- Get the information about the Schema Registry that you need when configuring Schema CRs:
- Schema Registry REST service endpoint
- Authentication configurations
- Authentication credentials to the Schema Registry
Create and register a schema for a new subject name¶
Register a schema for a new subject name by creating a new Schema CR.
Note
When RABC is enabled in this Confluent Platform environment, the super user you configured
for Kafka (kafka.spec.authorization.superUsers
) does not have access to
resources in the Schema Registry cluster. If you want the super user to be able to
create schemas, grant the super user the permission on the Schema Registry cluster.
When creating the schema, you must specify:
- Subject name
- Data format: Avro, JSON, or Protobuf
- Schema content as a file
When you register the first schema for a subject name, that schema is assigned
version number 1
.
To create a schema:
Create a Kubernetes ConfigMap resource containing the schema. For example:
apiVersion: v1 kind: ConfigMap metadata: name: schema-config namespace: operator data: schema: | { "namespace": "io.confluent.examples.clients.basicavro", "type": "record", "name": "Payment", "fields": [ {"name": "id", "type": "string"}, {"name": "amount", "type": "double"}, {"name": "email", "type": "string"} ] }
Create a Schema CR. The following is the structure of the Schema CR:
apiVersion: platform.confluent.io/v1beta1 kind: Schema metadata: name: namespace: spec: compatibilityLevel: --- [1] data: configRef: --- [2] format: --- [3] mode: --- [4] name: --- [5] normalize: --- [6] schemaReferences: subject: --- [7] version: --- [8] format: --- [9] avro: avro: --- [10] json: url: --- [11] protobuf: file: --- [12] schemaRegistryClusterRef: --- [13] schemaRegistryRest: --- [14] name: --- [15]
[1] The compatibility level requirement among the schema versions. The valid options are:
BACKWARD
: Consumer can process data produced with the current and the last versions of the schema.BACKWARD_TRANSITIVE
: Consumer can process data produced with the current and all previous versions of the schema.FORWARD
: Data produced using the current version of the schema can be read by consumers with the current and the last version of the schema.FORWARD_TRANSITIVE
: Data produced using the current version of the schema can be read by consumers with the current and all previous versions of the schema.FULL
: The current and the last versions of the schema are backward and forward compatible.FULL_TRANSITIVE
: The current and all the previous versions of the schema are backward and forward compatible.NONE
: Schema compatibility checks are disabled.
For more information, see Schema Compatibility Types.
[2] Required. The name of the ConfigMap you create in the previous step.
[3] Required. The format of the encoded schema. Supported values are
avro
,json
, andprotobuf
.[4] Set the Schema Registry mode on a specific subject of a schema. Valid options are
IMPORT
,READONLY
, andREADWRITE
.[5] The subject name of the schema. If not configured, the Schema CR name is used as the subject name.
[6] Set to
true
to normalize the schema at the time of registering to the Schema Registry. For more information, see Schema Normalization.[7] The subject name of the referenced schema through the
configRef
.[8] The version of the referenced schema subject.
[9] The format of the referenced schema. Supported values are
avro
,json
, andprotobuf
.[10] The fully qualified name of the referenced schema.
[11] The referenced JSON schema name.
[12] The file name of the references Protobuf schema.
[13] The name of the Schema Registry cluster this schema belongs to. See Discover Schema Registry using Schema Registry CR name.
[14] The REST service connection configuration. See below. See Discover Schema Registry using Schema Registry endpoint.
[15] The schema name. If not set, the Schema CR name is used as the schema name.
Use this property to define schema with context.
You can also use this property to include special characters in the schema name. The following characters are accepted:
^[:a-zA-Z0-9_.-]*$
To include the
/
character in the name, replace/
with its URL-encoded value,%2F
, in the name.
Apply the Schema CR:
kubectl apply -f <Schema CR>
Discover Schema Registry¶
Using one of the following methods, a schema can discover which Schema Registry cluster it belongs to:
Provide the CR name and namespace that specifies the Schema Registry deployment
Use the Schema Registry deployed in the same namespace as the schema namespace
If neither of the above options is configured, the schema looks for a CFK-managed Schema Registry in the namespace of the schema.
Discover Schema Registry using Schema Registry CR name¶
To auto discover the Schema Registry for the schema, set the following in the Schema CR:
spec:
schemaRegistryClusterRef:
name: --- [1]
namespace: --- [2]
- [1] Required. The name of the Schema Registry cluster this schema belongs to.
- [2] Optional. The namespace where the Schema Registry cluster is running if different from the namespace this schema is being created in.
Discover Schema Registry using Schema Registry endpoint¶
To specify how to connect to the Schema Registry endpoint, specify the connection information in the Schema CR.
Schema Registry endpoint
spec:
schemaRegistryRest:
endpoint: --- [1]
authentication:
type: --- [2]
- [1] The endpoint where Schema Registry is running.
- [2] Authentication method to use for the Schema Registry cluster. Supported types are
basic
,mtls
, andbearer
. You can usebearer
when RBAC is enabled for Schema Registry.
Basic authentication to Schema Registry
spec:
schemaRegistryRest:
authentication:
type: basic --- [1]
basic:
secretRef: --- [2]
directoryPathInContainer: --- [3]
[1] Required for the basic authentication type.
[2] or [3] is required.
[2] The name of the secret that contains the credentials. See Basic authentication for the required format.
[3] The directory path in the container where the required credentials are injected by Vault.
See Basic authentication for the required format.
See Provide secrets for Confluent Platform application CR for providing the credentials and required annotations when using Vault.
mTLS authentication to Schema Registry
spec:
schemaRegistryRest:
authentication:
type: mtls --- [1]
tls:
secretRef: --- [2]
directoryPathInContainer: --- [3]
[1] Required for the mTLS authentication type.
[2] The name of the secret that contains the TLS certificates.
See Provide TLS keys and certificates in PEM format for the expected keys in the TLS secret. Only the PEM format is supported for Schema CRs.
[3] The directory path in the container where the expected keys and certificates are mounted.
See Provide TLS keys and certificates in PEM format for the expected keys in the TLS secret. Only the PEM format is supported for Schema CRs.
See Provide secrets for Confluent Platform application CR for providing the keys and certificates using the Directory Path in Container feature.
Bearer authentication to Schema Registry (for RBAC)
When RBAC is enabled for Schema Registry, you can configure bearer authentication as below:
spec:
schemaRegistryRest:
authentication:
type: bearer --- [1]
bearer:
secretRef: --- [2]
directoryPathInContainer: --- [3]
[1] Required for the bearer authentication type.
[2] or [3] is required.
[2] Required. The name of the secret that contains the bearer credentials. See Bearer authentication for the required format.
[3] The directory path in the container where the required the bearer credentials are mounted.
See Bearer authentication for the required format.
See Provide secrets for Confluent Platform application CR for providing the credential using the Directory Path in Container feature.
Create and register a new schema version for an existing subject¶
To create and register a new schema version for a schema subject, configure and deploy a new configMap, using the existing configMap name.
CFK checks that the new schema content is a compatible evolution for the subject. And if compatible, CFK registers the schema content as a new version of the subject.
View a list of schemas¶
To get a list of all schemas registered in the current namespace, run the following command:
kubectl get schema
For each schema CR, the following information is returned:
- Subject name
- Schema id
- Schema version number
Delete a schema subject version¶
You can use the following command to get the latest version of a schema subject:
kubectl get schema <schema-name> -ojsonpath="{.status.version}"
The versions from 1
to (latest version - 1)
can be deleted unless a
version was already deleted. You cannot delete the latest version of a schema
subject.
To delete a schema subject version and unregister it from Schema Registry, annotate the
Schema CR with platform.confluent.io/soft-delete-versions
or
platform.confluent.io/delete-versions
.
For example:
To trigger a soft delete on version 2:
kubectl annotate <Schema-CR-name> platform.confluent.io/soft-delete-versions="[2]"
To trigger a hard delete on version 1:
kubectl annotate <Schema-CR-name> platform.confluent.io/delete-versions="[1]"