Kafka Connect HeaderFrom SMT Usage Reference for Confluent Platform

The HeaderFrom SMT (org.apache.kafka.connect.transforms.HeaderFrom) moves or copies fields in an Apache Kafka® record key or value into the record’s headers.

Description

The HeaderFrom SMT exposes record fields as header metadata for routing, filtering, or downstream processing. The operation property controls how the source fields are handled after the transfer:

  • move removes the fields from the original key or value.

  • copy retains the fields in the original key or value.

Select the transformation type based on where the target fields reside:

  • Record key: org.apache.kafka.connect.transforms.HeaderFrom$Key

  • Record value: org.apache.kafka.connect.transforms.HeaderFrom$Value

For related header operations, see the DropHeaders SMT and the InsertHeader SMT.

Properties

HeaderFrom SMT configuration properties

Name

Description

Type

Default

Valid values

Importance

fields

Specifies a comma-separated list of field names to move or copy into the record header.

List

(required)

Non-empty list

High

headers

Specifies a comma-separated list of header names.

The list must match the fields list in count and order. Headers and fields are paired by position: the first header receives the first field, and the second header receives the second field.

List

(required)

Non-empty list

High

operation

Specifies whether to move or copy fields into the record header.

The move operation removes the fields from the source record key or value. The copy operation retains them.

String

(required)

move, copy

High

replace.null.with.default

Specifies whether to replace null fields with their defined default values.

When set to true, the default value is applied. Otherwise, the field remains null.

Boolean

true

true or false

Medium

Example: Move fields to record headers

The following configuration moves the fields id and ts from the record’s value into the record’s headers as record_id and event_timestamp, and removes the fields from the record value.

"transforms": "moveFieldsToHeader",
"transforms.moveFieldsToHeader.type": "org.apache.kafka.connect.transforms.HeaderFrom$Value",
"transforms.moveFieldsToHeader.fields": "id,ts",
"transforms.moveFieldsToHeader.headers": "record_id,event_timestamp",
"transforms.moveFieldsToHeader.operation": "move"

Before:

  • Record value

    {
        "id": 24,
        "ts": 1626102708861,
        "name": "John Smith",
        "book": "Kafka: The Definitive Guide"
    }
    
  • Record header

    {
        "source": "RedditSource"
    }
    

After:

  • Record value

    {
        "name": "John Smith",
        "book": "Kafka: The Definitive Guide"
    }
    
  • Record header

    {
        "source": "RedditSource",
        "record_id": 24,
        "event_timestamp": 1626102708861
    }
    

Note

Record headers are an ordered list of key-value entries. The JSON objects above are illustrative representations of the header contents.

Predicates

Configure transformations with predicates to ensure they only process records satisfying a particular condition. You can also use predicates in a transformation chain along with the Kafka Connect Filter (Kafka) SMT Usage Reference for Confluent Platform to conditionally filter specific records. For more information, refer to Predicates.