Kafka Connect ExtractField SMT Usage Reference for Confluent Platform

The ExtractField SMT (org.apache.kafka.connect.transforms.ExtractField) extracts a specified field from a complex Apache Kafka® record key or value and replaces the entire key or value with that field.

Description

The ExtractField SMT extracts a field from a complex (non-primitive, Map or Struct) key or value and replaces the entire key or value with the extracted field. ExtractField passes through any null values unmodified.

Use the specific transformation type designed for the record key (org.apache.kafka.connect.transforms.ExtractField$Key) or value (org.apache.kafka.connect.transforms.ExtractField$Value).

Properties

Name

Description

Type

Default

Valid Values

Importance

field

The name of the field to extract from the record key or value.

string

(required)

medium

replace.null.with.default

Specifies whether to replace null fields with their defined default values. When set to true, the default value is applied. Otherwise, the field remains null.

boolean

true

true or false

medium

field.syntax.version

Defines the syntax for accessing fields. V1 limits field paths to elements at the root level of the struct or map. V2 supports accessing nested elements using dotted notation.

string

V1

V1, V2

high

Examples

The following examples show how to use ExtractField by itself and along with the ValueToKey SMT.

Extract a field by name

The following configuration snippet shows how to use ExtractField to extract the field name "id".

"transforms": "ExtractField",
"transforms.ExtractField.type": "org.apache.kafka.connect.transforms.ExtractField$Value",
"transforms.ExtractField.field": "id"

Before: {"id": 42, "cost": 4000}

After: 42

Chained Transformation

You can use SMTs together to perform a more complex transformation.

The following examples show how the ValueToKey and ExtractField SMTs are chained together to set the key for data coming from a JDBC Connector. During the transform, ValueToKey copies the message c1 field into the message key and then ExtractField extracts just the integer portion of that field.

"transforms": "createKey,extractInt",
"transforms.createKey.type": "org.apache.kafka.connect.transforms.ValueToKey",
"transforms.createKey.fields": "c1",
"transforms.extractInt.type": "org.apache.kafka.connect.transforms.ExtractField$Key",
"transforms.extractInt.field": "c1"

The following shows what the message looked like before the transform.

"./bin/kafka-avro-console-consumer \
                              --bootstrap-server localhost:9092 \
                              --property schema.registry.url=http://localhost:8081 \
                              --property print.key=true \
                              --from-beginning \
                              --topic mysql-foobar

null {"c1":{"int":1},"c2":{"string":"foo"},"create_ts":1501796305000,"update_ts":1501796305000}
null {"c1":{"int":2},"c2":{"string":"foo"},"create_ts":1501796665000,"update_ts":1501796665000}

After the connector configuration is applied, new rows are inserted (piped) into the MySQL table:

"echo "insert into foobar (c1,c2) values (100,'bar');"|mysql --user=username --password=pw demo

The following is displayed in the Avro console consumer. Note that the key (the first value on the line) matches the value of c1, which was defined with the transforms.

100 {"c1":{"int":100},"c2":{"string":"bar"},"create_ts":1501799535000,"update_ts":1501799535000}

Predicates

Configure transformations with predicates to ensure they only process records satisfying a particular condition. You can also use predicates in a transformation chain along with the Kafka Connect Filter (Kafka) SMT Usage Reference for Confluent Platform to conditionally filter specific records. For more information, refer to Predicates.