Link Schemas for Confluent Platform Using Confluent for Kubernetes¶
Schema Linking is a Confluent feature for keeping schemas in sync between two Schema Registry clusters.
You can use Schema Linking in conjunction with Cluster Linking to keep both schemas and topic data in sync across two Schema Registry and Kafka clusters.
Schema Linking can also be used independently of Cluster Linking for replicating schemas between clusters for purposes of aggregation, backup, staging, and migration of schemas.
Schema Linking is supported using schema exporters that reside in Schema Registry and continuously export schemas from one context to another within the same Schema Registry cluster or across a different Schema Registry cluster.
A schema exporter can sync schemas in groups, referred to as schema context.
Each schema context is an independent grouping of schema IDs and subject names.
If schemas are exported without any context (contextType: NONE
), those
schemas are exported as is and go into the default context.
See Schema Linking for complete details of the Schema Linking feature.
The high-level workflow to run Schema Linking is:
Deploy the source and the destination Schema Registry clusters.
Define schemas in the source Schema Registry cluster.
When you register schemas in the source cluster, you can specify a custom context, by inserting the context in the schema name. If no context is given, the default context is used.
Create a schema exporter in the source Schema Registry cluster.
Exported schemas are placed in the
IMPORT
mode in the destination Schema Registry. Changes cannot be made to the schemas in theIMPORT
mode.As needed:
Confluent for Kubernetes (CFK) provides a declarative API, the SchemaExporter custom resource definition (CRD), to support the entire workflow of creating and managing schema exporters.
Enable schema exporter in Schema Registry¶
Update the source Schema Registry CR to enable schema exporter, and apply the changes with
the kubectl apply -f <Schema Registry CR>
command:
spec:
passwordEncoder: --- [1]
enableSchemaExporter: --- [2]
- [1] Optional. Specify the password encoder for the source Schema Registry. See Manage Password Encoder Secrets for Confluent Platform Using Confluent for Kubernetes for details.
- [2] Set to
true
to enable schema exporter in the Schema Registry.
Create schema exporter¶
A schema exporter is created and managed in the source Schema Registry cluster.
Note
When RABC is enabled in this Confluent Platform environment, the super user you configured
for Kafka (kafka.spec.authorization.superUsers
) does not have access to
resources in the Schema Registry cluster. If you want the super user to be able to
create schema exporters, grant the super user the permission on the Schema Registry
cluster.
In the source Schema Registry clusters, create a schema exporter CR and apply the
configuration with the kubectl apply -f <Schema Exporter CR>
command:
apiVersion: platform.confluent.io/v1beta1
kind: SchemaExporter
metadata:
name: --- [1]
namespace: --- [2]
spec:
sourceCluster: --- [3]
destinationCluster: --- [4]
subjects: --- [5]
subjectRenameFormat: --- [6]
contextType: --- [7]
contextName: --- [8]
configs: --- [9]
[1] Required. The name of the schema exporter. The name must be unique in a source Schema Registry cluster.
[2] The namespace for the schema exporter.
[3] The source Schema Registry cluster. You can either specify the cluster name or the endpoint. If not given, CFK will auto discover the source Schema Registry in the namespace of this schema exporter. The discover process errors out if more than one Schema Registry clusters are discovered in the namespace.
See Specify the source and destination Schema Registry clusters for configuration details.
[4] The destination Schema Registry cluster where the schemas will be exported. If not defined, the source cluster is used as the destination, and the schema exporter will be exporting schemas across contexts within the source cluster.
See Specify the source and destination Schema Registry clusters for configuration details.
[5] The subjects to export to the destination. Default value is
["*"]
, which denotes all subjects in the default context.[6] The rename format that defines how to rename the subject at the destination.
For example, if the value is
my-${subject}
, subjects at destination will becomemy-XXX
whereXXX
is the original subject.[7] Specify how to create context to move the subjects at the destination.
The default value is
AUTO
, with which, the exporter will use an auto generated context in the destination cluster. The auto generated context name will be reported in the status.If set to
NONE
, the exporter copies the source schemas as-is.[8] The name of the schema context on the destination to export the subjects. If this is defined,
spec.contextType
is ignored.[9] Additional configs not supported by the SchemaExporter CRD properties.
An example SchemaExporter CR:
apiVersion: platform.confluent.io/v1beta1
kind: SchemaExporter
metadata:
name: schemaExporter
namespace: confluent
spec:
sourceCluster:
schemaRegistryClusterRef:
name: sr
namespace: operator
destinationCluster:
schemaRegistryRest
endpoint: https://schemaregistry.operator-dest.svc.cluster.local:8081
authentication:
type: basic
secretRef: sr-basic
subjects:
- subject1
- subject2
contextName: link-source
Specify the source and destination Schema Registry clusters¶
Using one of the following methods, a schema exporter can specify the source and the destination Schema Registry clusters:
Specify Schema Registry using Schema Registry cluster name¶
To specify the source or destination|sr| for the schema exporter, set the
following in the SchemaExporter CR under spec.sourceCluster
or spec.destinationCluster
:
schemaRegistryClusterRef:
name: --- [1]
namespace: --- [2]
- [1] Required. The name of the Schema Registry cluster.
- [2] Optional. The namespace where the Schema Registry cluster is running if different from the namespace of the schema
Specify Schema Registry using Schema Registry endpoint¶
To specify how to connect to the Schema Registry endpoint, specify the connection
information in the SchemaExporter CR, in the spec.sourceCluster
or
spec.destinationCluster
section:
Schema Registry endpoint
schemaRegistryRest:
endpoint: --- [1]
authentication:
type: --- [2]
- [1] The endpoint where Schema Registry is running.
- [2] Authentication method to use for the Schema Registry cluster. Supported types are
basic
,mtls
,bearer
, andoauth
. You can usebearer
when RBAC is enabled for Schema Registry.
Basic authentication to Schema Registry
schemaRegistryRest:
authentication:
type: basic --- [1]
basic:
secretRef: --- [2]
directoryPathInContainer: --- [3]
[1] Required for the basic authentication type.
[2] Required. The name of the secret that contains the credentials.
See Basic authentication for the required.
[3] Set to the directory path in the container where required authentication credentials are injected by Vault.
See Basic authentication for the required
See Provide secrets for Confluent Platform application CR for providing the credential and required annotations when using Vault.
mTLS authentication to Schema Registry
schemaRegistryRest:
authentication:
type: mtls --- [1]
tls:
secretRef: --- [2]
directoryPathInContainer: --- [3]
[1] Required for the mTLS authentication type.
[2] The name of the secret that contains the TLS certificates.
See Provide TLS keys and certificates in PEM format for the expected keys in the TLS secret. Only the PEM format is supported for SchemaExporter CRs.
[3] Set to the directory path in the container where the TLS certificates are injected by Vault.
See Provide TLS keys and certificates in PEM format for the expected keys in the TLS secret. Only the PEM format is supported for SchemaExporter CRs.
See Provide secrets for Confluent Platform application CR for providing the credential and required annotations when using Vault.
Bearer authentication to Schema Registry (for RBAC)
When RBAC is enabled for Schema Registry, you can configure bearer authentication as below:
schemaRegistryRest:
authentication:
type: bearer --- [1]
bearer:
secretRef: --- [2]
directoryPathInContainer: --- [3]
[1] Required for the bearer authentication type.
[2] or [3] is required.
[2] The name of the secret that contains the bearer credentials. See Bearer authentication for the required format.
[3] Set to the directory path in the container where required authentication credentials are injected by Vault.
See Bearer authentication for the required format.
See Provide secrets for Confluent Platform application CR for providing the credential and required annotations when using Vault.
OAuth authorization/authentication to Schema Registry
schemaRegistryRest:
authentication:
type: oauth --- [1]
oauth:
secretRef: --- [2]
directoryPathInContainer: --- [3]
configuration: --- [4]
[1] Required for the
oauth
authentication type.[2] or [3] is required.
[2] The name of the secret that contains the bearer credentials. See OAuth authorization and authentication for the required format.
[3] Set to the directory path in the container where required authentication credentials are injected by Vault.
See OAuth authorization and authentication for the required format.
See Provide secrets for Confluent Platform application CR for providing the credential and required annotations when using Vault.
[4] The client-side OAuth configuration. For details, see Client-side OAuth/OIDC authentication for Confluent components.
TLS encryption for Schema Registry cluster
tls:
enabled: true --- [1]
secretRef: --- [2]
directoryPathInContainer: --- [3]
[1] Required.
[2] or [3] is required.
[2] The name of the secret that contains the certificates.
See Provide TLS keys and certificates in PEM format for the expected keys in the TLS secret. Only the PEM format is supported for SchemaExporter CRs.
[3] Set to the directory path in the container where the TLS certificates are injected by Vault.
See Provide TLS keys and certificates in PEM format for the expected keys in the TLS secret. Only the PEM format is supported for SchemaExporter CRs.
See Provide secrets for Confluent Platform application CR for providing the credential and required annotations when using Vault.
Edit schema exporter configuration¶
When you update configuration of an existing exporter, CFK pauses the exporter, updates the config, and resumes the exporter.
The following properties of the configuration cannot be changed for an existing exporter. Existing exporter should be deleted and re-created:
- Source Schema Registry
- Destination Schema Registry
- Name of the schema exporter
Edit the schema exporter CR with desired configs and apply it with the
kubectl apply -f <Schema Exporter CR>
command.
The context type (contextType
) defaults to AUTO
only during creation. If
you created a schema exporter with custom context and want to edit it to use an
auto-generated context, contextType
should be explicitly set to AUTO
.
If the context name (contextName
) is edited, only the new subjects/schema
will be exported to the new context. Older schemas synced before the update will
get synced in the earlier context. To migrate all the old schemas to the new
context, you need to reset the exporter.
Similarly, if the subjectRename
format is edited, only the new schema will
be migrated with the new name format. You need to reset the exporter to
remigrate the already synced schemas with the new name format.
Reset schema exporter¶
A schema exporter is in one of the STARTING
, RUNNING
and PAUSED
states.
Reset a schema exporter to clear its saved offset.
To reset a schema exporter, add the reset exporter annotation to the schema exporter CR with the command:
kubectl annotate schemaexporter schemaexporter platform.confluent.io/reset-schema-exporter="true"
Pause schema exporter¶
To pause a schema exporter, add the pause exporter annotation to the schema exporter CR with the command:
kubectl annotate schemaexporter schemaexporter platform.confluent.io/pause-schema-exporter="true".
Resume schema exporter¶
To resume a schema exporter, add the resume exporter annotation to the schema exporter CR with the command:
kubectl annotate schemaexporter schemaexporter platform.confluent.io/resume-schema-exporter="true".
Delete schema exporter¶
Deleting the schema exporter does not delete the schemas already exported to the destination. The schemas exported to the destination Schema Registry stay in the last synced state.
Once the schema link is broken, exported schemas can be moved out of IMPORT
mode using migration as explained in Migrate Schemas.
After the schemas are moved out
of the IMPORT
mode, to manage those schemas on the destination Schema Registry,
create Schema CRs for those schemas on the destination cluster.
To delete a schema exporter:
kubectl delete schemaexporter schemaexporter.