Kafka Connect InsertField SMT Usage Reference for Confluent Platform

The following provides usage information for the Apache Kafka® SMT org.apache.kafka.connect.transforms.InsertField.

Description

Insert fields using attributes from the record metadata or a configured static value.

Use the concrete transformation type designed for the record key (org.apache.kafka.connect.transforms.InsertField$Key) or value (org.apache.kafka.connect.transforms.InsertField$Value).

Note

When you use InsertField with a source connector, the SMT inserts null for the Kafka record metadata fields timestamp.field and partition.field. This is because the SMT runs before the record reaches the Kafka topic, where Kafka assigns these metadata values. To insert non-null metadata-based fields, use InsertField with a sink connector. For source connectors, use static.field and static.value to insert fixed values instead.

Example

This configuration snippet shows how to use InsertField to insert a static field labeled MessageSource using a static value of Kafka ConnectFramework.

"transforms": "InsertField",
"transforms.InsertField.type": "org.apache.kafka.connect.transforms.InsertField$Value",
"transforms.InsertField.static.field": "MessageSource",
"transforms.InsertField.static.value": "Kafka Connect Framework"

Before: {"author": "Philip K. Dick", "character": "Palmer Eldritch"}

After: {"author": "Philip K. Dick", "character": "Palmer Eldritch", "MessageSource": "Kafka Connect Framework"}

Properties

Name

Description

Type

Default

Valid Values

Importance

offset.field

Field name for Kafka offset. This is only applicable to sink connectors. Suffix with ! to make this a required field, or ? to keep it optional (the default).

string

null

medium

partition.field

Field name for Kafka partition. Suffix with ! to make this a required field, or ? to keep it optional (the default).

string

null

medium

static.field

Field name for static data field. Suffix with ! to make this a required field, or ? to keep it optional (the default).

string

null

medium

static.value

If field name is configured, the static field value.

string

null

medium

timestamp.field

Field name for record timestamp. Suffix with ! to make this a required field, or ? to keep it optional (the default).

string

null

medium

topic.field

Field name for Kafka topic. Suffix with ! to make this a required field, or ? to keep it optional (the default).

string

null

medium

Predicates

Transformations can be configured with predicates so that the transformation is applied only to records which satisfy a condition. You can use predicates in a transformation chain and, when combined with the Kafka Connect Filter (Kafka) SMT Usage Reference for Confluent Platform, predicates can conditionally filter out specific records. For details and examples, see Predicates.