Kafka Connect HeaderToValue (Debezium) SMT Usage Reference for Confluent Cloud

The following provides usage information for the SMT io.debezium.transforms.HeaderToValue.

Description

The HeaderToValue SMT extracts specified header fields from event records, and then copies or moves these header fields to values in the event record. If configured for move, the SMT ensures these fields are fully eliminated from the header before becoming part of the value payload. It allows for the processing of multiple headers from the source message. This SMT supports dot notation to direct a header field into a specific nested position within the record value.

Example

This configuration snippet shows how to use HeaderToValue to move the event_timestamp and key headers from the event message into the record value, and then mapping them to timestamp and source.id fields respectively. The transform removes the original headers. If you want to keep the headers, use the operation copy instead of move.

"transforms": "moveHeadersToValue",
"transforms.moveHeadersToValue.type": "io.debezium.transforms.HeaderToValue",
"transforms.moveHeadersToValue.headers": "event_timestamp,key",
"transforms.moveHeadersToValue.fields": "timestamp,source.id",
"transforms.moveHeadersToValue.operation": "move"

Before:

  • Record header

    {
        "header_x": 0,
        "event_timestamp": 1962352708742,
        "key": 90
    }
    
  • Record value

    {
        "before": null,
        "after": {
            "id": 99,
            "first_name": "Maria",
            "last_name": "Cena",
            "email": "maria.c@mail.com"
        },
        "source": {
            "version": "5.1.0-RC1",
            "connector": "mongodb-atlas",
            "name": "OrderProcessingService",
            "ts_ms": 1752069400000,
            "ts_us": 1752069400000876,
            "ts_ns": 1752069400000876543,
            "snapshot": true,
            "db": "ecommerce",
            "sequence": "[\"78901234\",\"78901243\"]",
            "schema": "public",
            "table": "inventory_updates",
            "txId": 1234,
            "lsn": 78901243,
            "xmin": null
        },
        "op": "c",
        "ts_ms": 1752069400500,
        "ts_us": 1752069400500987,
        "ts_ns": 1752069400500987654
    }
    

After:

  • Record header

    {
        "header_x": 0
    }
    
  • Record value

    {
        "before": null,
        "after": {
            "id": 99,
            "first_name": "Maria",
            "last_name": "Cena",
            "email": "maria.c@mail.com"
        },
        "source": {
            "version": "5.1.0-RC1",
            "connector": "postgresql",
            "name": "mongodb-atlas",
            "ts_ms": 1752069400000,
            "ts_us": 1752069400000876,
            "ts_ns": 1752069400000876543,
            "snapshot": true,
            "db": "ecommerce",
            "sequence": "[\"78901234\",\"78901243\"]",
            "schema": "public",
            "table": "inventory_updates",
            "txId": 1234,
            "lsn": 78901243,
            "xmin": null,
            "id": 90
        },
        "op": "c",
        "ts_ms": 1752069400500,
        "ts_us": 1752069400500987,
        "ts_ns": 1752069400500987654,
        "timestamp": 1962352708742
    }
    

Properties

Name Description Type Default Valid Values Importance
headers A comma-separated list of header names in the record whose values are to be copied or moved to the record value. string None non-empty list high
fields A comma-separated list of field names, in the same order as the header names listed in the headers configuration property. Use dot notation to instruct the SMT to nest fields within specific nodes of the message payload. list None non-empty list high
operation Specifies one of the following options: move- The SMT moves header fields to values in the event record, and removes the fields from the header. copy: The SMT copies header fields to values in the event record, and retains the original header fields. string None move, copy high

Predicates

Transformations can be configured with predicates so that the transformation is applied only to records which satisfy a condition. You can use predicates in a transformation chain and, when combined with the Kafka Connect Filter (Kafka) SMT Usage Reference for Confluent Cloud, predicates can conditionally filter out specific records. For details and examples, see Predicates.