Kafka Connect HeaderToValue (Debezium) SMT Usage Reference for Confluent Cloud¶
The following provides usage information for the SMT
io.debezium.transforms.HeaderToValue
.
Description¶
The HeaderToValue
SMT extracts specified header fields from event records,
and then copies or moves these header fields to values in the event record. If configured
for move
, the SMT ensures these fields are fully eliminated from the header before
becoming part of the value payload. It allows for the processing of multiple headers from
the source message. This SMT supports dot notation to direct a header field into
a specific nested position within the record value.
Example¶
This configuration snippet shows how to use HeaderToValue
to move the
event_timestamp
and key
headers from the event message into the record
value, and then mapping them to timestamp
and source.id
fields respectively. The
transform removes the original headers. If you want to keep the headers, use
the operation copy
instead of move
.
"transforms": "moveHeadersToValue",
"transforms.moveHeadersToValue.type": "io.debezium.transforms.HeaderToValue",
"transforms.moveHeadersToValue.headers": "event_timestamp,key",
"transforms.moveHeadersToValue.fields": "timestamp,source.id",
"transforms.moveHeadersToValue.operation": "move"
Before:
Record header
{ "header_x": 0, "event_timestamp": 1962352708742, "key": 90 }
Record value
{ "before": null, "after": { "id": 99, "first_name": "Maria", "last_name": "Cena", "email": "maria.c@mail.com" }, "source": { "version": "5.1.0-RC1", "connector": "mongodb-atlas", "name": "OrderProcessingService", "ts_ms": 1752069400000, "ts_us": 1752069400000876, "ts_ns": 1752069400000876543, "snapshot": true, "db": "ecommerce", "sequence": "[\"78901234\",\"78901243\"]", "schema": "public", "table": "inventory_updates", "txId": 1234, "lsn": 78901243, "xmin": null }, "op": "c", "ts_ms": 1752069400500, "ts_us": 1752069400500987, "ts_ns": 1752069400500987654 }
After:
Record header
{ "header_x": 0 }
Record value
{ "before": null, "after": { "id": 99, "first_name": "Maria", "last_name": "Cena", "email": "maria.c@mail.com" }, "source": { "version": "5.1.0-RC1", "connector": "postgresql", "name": "mongodb-atlas", "ts_ms": 1752069400000, "ts_us": 1752069400000876, "ts_ns": 1752069400000876543, "snapshot": true, "db": "ecommerce", "sequence": "[\"78901234\",\"78901243\"]", "schema": "public", "table": "inventory_updates", "txId": 1234, "lsn": 78901243, "xmin": null, "id": 90 }, "op": "c", "ts_ms": 1752069400500, "ts_us": 1752069400500987, "ts_ns": 1752069400500987654, "timestamp": 1962352708742 }
Properties¶
Name | Description | Type | Default | Valid Values | Importance |
---|---|---|---|---|---|
headers |
A comma-separated list of header names in the record whose values are to be copied or moved to the record value. | string | None | non-empty list | high |
fields |
A comma-separated list of field names, in the same order as the header names listed in the headers configuration property. Use dot notation to instruct the SMT to nest fields within specific nodes of the message payload. |
list | None | non-empty list | high |
operation |
Specifies one of the following options: move - The SMT moves header fields to values in the event record, and removes the fields from the header. copy : The SMT copies header fields to values in the event record, and retains the original header fields. |
string | None | move , copy |
high |
Predicates¶
Transformations can be configured with predicates so that the transformation is applied only to records which satisfy a condition. You can use predicates in a transformation chain and, when combined with the Kafka Connect Filter (Kafka) SMT Usage Reference for Confluent Cloud, predicates can conditionally filter out specific records. For details and examples, see Predicates.