Kafka Connect SetSchemaMetadata SMT Usage Reference for Confluent Platform
The SetSchemaMetadata SMT (org.apache.kafka.connect.transforms.SetSchemaMetadata) sets the schema name, version, or both on an Apache Kafka® record’s key or value schema.
Description
This transformation allows you to update the metadata associated with your schemas during the data pipeline process. A common use case for SetSchemaMetadata is to update the schema name and namespace. Because the schema name inherently includes the namespace, this transformation is particularly useful when you need to align incoming record schemas with your organization’s naming conventions or target Schema Registry requirements.
Use the concrete transformation type designed for the record key (org.apache.kafka.connect.transforms.SetSchemaMetadata$Key) or value (org.apache.kafka.connect.transforms.SetSchemaMetadata$Value).
Common use cases include:
Source connectors generate records with schemas defined by the connector, often basing key and value schema names on source-specific information. For example, a database source connector might use the name of the table from which rows are read. If the generated schema names don’t adhere to the naming convention you need, use this SMT to override the generated key or value schema names in the source records.
Sink connectors use schema names to map Kafka records into the external system. If the schema names used in the record keys and values don’t produce the mapping you want, use this SMT to change the key or value schema names before the sink connector processes the records.
Properties
Name | Description | Type | Default | Valid values | Importance |
|---|---|---|---|---|---|
| Schema name to set. | string |
| high | |
| Schema version to set. | int |
| high | |
| Specifies whether to replace null fields with their defined default values. When set to | boolean |
|
| medium |
Examples
The following examples show how to configure and use SetSchemaMetadata.
Set a schema name and version
This configuration snippet shows how to set a schema name and version.
"transforms": "SetSchemaMetadata",
"transforms.SetSchemaMetadata.type": "org.apache.kafka.connect.transforms.SetSchemaMetadata$Value",
"transforms.SetSchemaMetadata.schema.name": "order-value",
"transforms.SetSchemaMetadata.schema.version": "2"
The transform sets the schema name to order-value and the version to 2.
Set a namespace and schema name
This configuration snippet shows how to set a namespace and schema name.
"transforms": "AddNamespace",
"transforms.AddNamespace.type": "org.apache.kafka.connect.transforms.SetSchemaMetadata$Value",
"transforms.AddNamespace.schema.name": "my.namespace.order-value"
The transform sets the namespace to my.namespace and the schema name to order-value. For another example of setting the schema namespace, see Example 3: JDBC source connector with SpecificAvro.
Predicates
Configure transformations with predicates to ensure they process only records that satisfy a particular condition. You can also use predicates in a transformation chain with the Kafka Connect Filter (Kafka) SMT Usage Reference for Confluent Platform to conditionally filter specific records. For more information, see Predicates.