Kafka Connect SetSchemaMetadata SMT for Confluent Platform

The following provides usage information for the Apache Kafka® SMT org.apache.kafka.connect.transforms.SetSchemaMetadata.

Description

Set the schema name, version or both on the record’s key (org.apache.kafka.connect.transforms.SetSchemaMetadata$Key) or value (org.apache.kafka.connect.transforms.SetSchemaMetadata$Value) schema.

This SMT can be used to set the schema name, version, or both on Connect records. Since the schema name includes the namespace, a common SetSchemaMetadata SMT use case is to change the name and namespace of the schemas used in Apache Kafka® record keys and values.

Use cases:

Source connectors generate records with schemas defined by the connector, and often generate the names and versions of key and value schemas based upon source-specific information. For example, a database source connector might use the name of the table from which rows are read in the schema names. If these generated schema names do not adhere to the naming convention you need, you can use this SMT to override the generated names of the key and/or value schemas in the source records produced by the source connector.

Sink connectors that consume records from Kafka topics may the names of the schemas to indicate how the connector maps the Kafka records into the external system. If the names of the schemas used in the record keys and values don’t result in the desired mapping, you can use this SMT to change the name of the key and/or value schemas in the consumed source records, before those records are processed by the sink connector.

Examples

The following examples show how to configure and use SetSchemaMetadata.

Set a schema name and version

This configuration snippet shows how to set a schema name and version.

"transforms": "SetSchemaMetadata",
"transforms.SetSchemaMetadata.type": "org.apache.kafka.connect.transforms.SetSchemaMetadata$Value",
"transforms.SetSchemaMetadata.schema.name": "order-value"
"transforms.SetSchemaMetadata.schema.version": "2"

The transform sets the schema name to order-value version 2

Set a namespace and schema name

This configuration snippet shows how to set a namespace and schema name.

"transforms" : "AddNamespace",
"transforms.AddNamespace.type" : "org.apache.kafka.connect.transforms.SetSchemaMetadata$Value",
"transforms.AddNamespace.schema.name" : "my.namespace.order-value"

The transform sets the namespace to my.namespace and schema name to order-value. For another example of setting the schema namespace, see Example 3: JDBC source connector with GenericAvro.

Tip

For additonal examples, see Set Schema Metadata for managed connectors.

Properties

Name Description Type Default Valid Values Importance
schema.name Schema name to set. string null   high
schema.version Schema version to set. int null   high

Predicates

Transformations can be configured with predicates so that the transformation is applied only to records which satisfy a condition. You can use predicates in a transformation chain and, when combined with the Kafka Connect Filter (Kafka) SMT for Confluent Platform, predicates can conditionally filter out specific records. For details and examples, see Predicates.