Kafka Connect InsertField SMT Usage Reference for Confluent Platform

The following provides usage information for the Apache Kafka® SMT org.apache.kafka.connect.transforms.InsertField.

Description

Insert fields using attributes from the record metadata or a configured static value.

Use the concrete transformation type designed for the record key (org.apache.kafka.connect.transforms.InsertField$Key) or value (org.apache.kafka.connect.transforms.InsertField$Value).

Note

When you use InsertField with a source connector, the SMT inserts null for the Kafka record metadata fields timestamp.field and partition.field. This is because the SMT runs before the record reaches the Kafka topic, where Kafka assigns these metadata values. To insert non-null metadata-based fields, use InsertField with a sink connector. For source connectors, use static.field and static.value to insert fixed values instead.

Example

This configuration snippet shows how to use InsertField to insert a static field labeled MessageSource using a static value of Kafka ConnectFramework.

"transforms": "InsertField",
"transforms.InsertField.type": "org.apache.kafka.connect.transforms.InsertField$Value",
"transforms.InsertField.static.field": "MessageSource",
"transforms.InsertField.static.value": "Kafka Connect Framework"

Before: {"author": "Philip K. Dick", "character": "Palmer Eldritch"}

After: {"author": "Philip K. Dick", "character": "Palmer Eldritch", "MessageSource": "Kafka Connect Framework"}

Properties

Name

Description

Type

Default

Valid Values

Importance

offset.field

Field name for Kafka offset. This is only applicable to sink connectors. Suffix with ! to make this a required field, or ? to keep it optional (the default).

string

null

medium

partition.field

Field name for Kafka partition. Suffix with ! to make this a required field, or ? to keep it optional (the default).

string

null

medium

static.field

Field name for static data field. Suffix with ! to make this a required field, or ? to keep it optional (the default).

string

null

medium

static.value

If field name is configured, the static field value.

string

null

medium

timestamp.field

Field name for record timestamp. Suffix with ! to make this a required field, or ? to keep it optional (the default).

string

null

medium

topic.field

Field name for Kafka topic. Suffix with ! to make this a required field, or ? to keep it optional (the default).

string

null

medium

Predicates

Configure transformations with predicates to ensure they only process records satisfying a particular condition. You can also use predicates in a transformation chain along with the Kafka Connect Filter (Kafka) SMT Usage Reference for Confluent Platform to conditionally filter specific records. For more information, refer to Predicates.