Kafka Connect InsertField SMT Usage Reference for Confluent Platform

The InsertField SMT (org.apache.kafka.connect.transforms.InsertField) adds fields to an Apache Kafka® record key or value, using attributes from the record metadata or a configured static value.

Description

Use InsertField to add metadata fields (such as topic, partition, offset, or timestamp) or a static field with a fixed value to each record. Suffix a field name with ! to make it required, or ? to keep it optional.

Use the concrete transformation type designed for the record key (org.apache.kafka.connect.transforms.InsertField$Key) or value (org.apache.kafka.connect.transforms.InsertField$Value).

Note

When you use InsertField with a source connector, the SMT inserts null for the Kafka record metadata fields timestamp.field and partition.field. This is because the SMT runs before the record reaches the Kafka topic, where Kafka assigns these metadata values. To insert non-null metadata-based fields, use InsertField with a sink connector. For source connectors, use static.field and static.value to insert fixed values instead.

Properties

Name

Description

Type

Default

Valid Values

Importance

offset.field

Field name for Kafka offset. This is only applicable to sink connectors. Suffix with ! to make this a required field, or ? to keep it optional (the default).

string

null

medium

partition.field

Field name for Kafka partition. Suffix with ! to make this a required field, or ? to keep it optional (the default).

string

null

medium

replace.null.with.default

Whether to replace fields that have a default value and are null with the default value. When true, the default value is used; otherwise, null is used.

boolean

true

medium

static.field

Field name for static data field. Suffix with ! to make this a required field, or ? to keep it optional (the default).

string

null

medium

static.value

The static value to insert when a field name is configured.

string

null

medium

timestamp.field

Field name for record timestamp. Suffix with ! to make this a required field, or ? to keep it optional (the default).

string

null

medium

topic.field

Field name for Kafka topic. Suffix with ! to make this a required field, or ? to keep it optional (the default).

string

null

medium

Example

This configuration snippet shows how to use InsertField to insert a static field labeled MessageSource using a static value of Kafka ConnectFramework.

"transforms": "InsertField",
"transforms.InsertField.type": "org.apache.kafka.connect.transforms.InsertField$Value",
"transforms.InsertField.static.field": "MessageSource",
"transforms.InsertField.static.value": "Kafka Connect Framework"

Before: {"author": "Philip K. Dick", "character": "Palmer Eldritch"}

After: {"author": "Philip K. Dick", "character": "Palmer Eldritch", "MessageSource": "Kafka Connect Framework"}

Predicates

Configure transformations with predicates to ensure they only process records satisfying a particular condition. You can also use predicates in a transformation chain along with the Kafka Connect Filter (Kafka) SMT Usage Reference for Confluent Platform to conditionally filter specific records. For more information, refer to Predicates.