Kafka Connect HeaderFrom SMT Usage Reference for Confluent Platform
The HeaderFrom SMT (org.apache.kafka.connect.transforms.HeaderFrom) moves or copies fields in an Apache Kafka® record key or value into the record’s headers.
Description
The HeaderFrom SMT exposes record fields as header metadata for routing, filtering, or downstream processing. The operation property controls how the source fields are handled after the transfer:
moveremoves the fields from the original key or value.copyretains the fields in the original key or value.
Select the transformation type based on where the target fields reside:
Record key:
org.apache.kafka.connect.transforms.HeaderFrom$KeyRecord value:
org.apache.kafka.connect.transforms.HeaderFrom$Value
For related header operations, see the DropHeaders SMT and the InsertHeader SMT.
Properties
Name | Description | Type | Default | Valid values | Importance |
|---|---|---|---|---|---|
| Specifies a comma-separated list of field names to move or copy into the record header. | List | (required) | Non-empty list | High |
| Specifies a comma-separated list of header names. The list must match the | List | (required) | Non-empty list | High |
| Specifies whether to move or copy fields into the record header. The | String | (required) |
| High |
| Specifies whether to replace null fields with their defined default values. When set to | Boolean |
|
| Medium |
Example: Move fields to record headers
The following configuration moves the fields id and ts from the record’s value into the record’s headers as record_id and event_timestamp, and removes the fields from the record value.
"transforms": "moveFieldsToHeader",
"transforms.moveFieldsToHeader.type": "org.apache.kafka.connect.transforms.HeaderFrom$Value",
"transforms.moveFieldsToHeader.fields": "id,ts",
"transforms.moveFieldsToHeader.headers": "record_id,event_timestamp",
"transforms.moveFieldsToHeader.operation": "move"
Before:
Record value
{ "id": 24, "ts": 1626102708861, "name": "John Smith", "book": "Kafka: The Definitive Guide" }
Record header
{ "source": "RedditSource" }
After:
Record value
{ "name": "John Smith", "book": "Kafka: The Definitive Guide" }
Record header
{ "source": "RedditSource", "record_id": 24, "event_timestamp": 1626102708861 }
Note
Record headers are an ordered list of key-value entries. The JSON objects above are illustrative representations of the header contents.
Predicates
Configure transformations with predicates to ensure they only process records satisfying a particular condition. You can also use predicates in a transformation chain along with the Kafka Connect Filter (Kafka) SMT Usage Reference for Confluent Platform to conditionally filter specific records. For more information, refer to Predicates.