Kafka Connect InsertField SMT Usage Reference for Confluent Platform
The InsertField SMT (org.apache.kafka.connect.transforms.InsertField) adds fields to an Apache Kafka® record key or value, using attributes from the record metadata or a configured static value.
Description
Use InsertField to add metadata fields (such as topic, partition, offset, or timestamp) or a static field with a fixed value to each record. Suffix a field name with ! to make it required, or ? to keep it optional.
Use the concrete transformation type designed for the record key (org.apache.kafka.connect.transforms.InsertField$Key) or value (org.apache.kafka.connect.transforms.InsertField$Value).
Note
When you use InsertField with a source connector, the SMT inserts null for the Kafka record metadata fields timestamp.field and partition.field. This is because the SMT runs before the record reaches the Kafka topic, where Kafka assigns these metadata values. To insert non-null metadata-based fields, use InsertField with a sink connector. For source connectors, use static.field and static.value to insert fixed values instead.
Properties
Name | Description | Type | Default | Valid Values | Importance |
|---|---|---|---|---|---|
| Field name for Kafka offset. This is only applicable to sink connectors. Suffix with | string | null | medium | |
| Field name for Kafka partition. Suffix with | string | null | medium | |
| Whether to replace fields that have a default value and are null with the default value. When | boolean | true | medium | |
| Field name for static data field. Suffix with | string | null | medium | |
| The static value to insert when a field name is configured. | string | null | medium | |
| Field name for record timestamp. Suffix with | string | null | medium | |
| Field name for Kafka topic. Suffix with | string | null | medium |
Example
This configuration snippet shows how to use InsertField to insert a static field labeled MessageSource using a static value of Kafka ConnectFramework.
"transforms": "InsertField",
"transforms.InsertField.type": "org.apache.kafka.connect.transforms.InsertField$Value",
"transforms.InsertField.static.field": "MessageSource",
"transforms.InsertField.static.value": "Kafka Connect Framework"
Before: {"author": "Philip K. Dick", "character": "Palmer Eldritch"}
After: {"author": "Philip K. Dick", "character": "Palmer Eldritch", "MessageSource": "Kafka Connect Framework"}
Predicates
Configure transformations with predicates to ensure they only process records satisfying a particular condition. You can also use predicates in a transformation chain along with the Kafka Connect Filter (Kafka) SMT Usage Reference for Confluent Platform to conditionally filter specific records. For more information, refer to Predicates.