Kafka Connect ExtractField SMT for Confluent Platform

The following provides usage information for the Apache Kafka® SMT org.apache.kafka.connect.transforms.ExtractField.

Description

ExtractField pulls a field out of a complex (non-primitive, Map or Struct) key or value and replaces the entire key or value with the extracted field. Any null values are passed through unmodified.

Use the concrete transformation type designed for the record key (org.apache.kafka.connect.transforms.ExtractField$Key) or value (org.apache.kafka.connect.transforms.ExtractField$Value).

Examples

The following examples show how to use ExtractField by itself and in conjunction with a second SMT.

Extract Field Name

The configuration snippet below shows how to use ExtractField to extract the field name "id".

"transforms": "ExtractField",
"transforms.ExtractField.type": "org.apache.kafka.connect.transforms.ExtractField$Value",
"transforms.ExtractField.field": "id"

Before: {"id": 42, "cost": 4000}

After: 42

Tip

For additional examples, see ExtractField for managed connectors.

Properties

Name Description Type Default Valid Values Importance
field Field name to extract. string     medium

Chained Transformation

You can use SMTs together to perform a more complex transformation.

The following examples show how the ValueToKey and ExtractField SMTs are chained together to set the key for data coming from a JDBC Connector. During the transform, ValueToKey copies the message c1 field into the message key and then ExtractField extracts just the integer portion of that field.

"transforms": "createKey,extractInt",
"transforms.createKey.type": "org.apache.kafka.connect.transforms.ValueToKey",
"transforms.createKey.fields": "c1",
"transforms.extractInt.type": "org.apache.kafka.connect.transforms.ExtractField$Key",
"transforms.extractInt.field": "c1"

The following shows what the message looked like before the transform.

"./bin/kafka-avro-console-consumer \
                              --bootstrap-server localhost:9092 \
                              --property schema.registry.url=http://localhost:8081 \
                              --property print.key=true \
                              --from-beginning \
                              --topic mysql-foobar

null {"c1":{"int":1},"c2":{"string":"foo"},"create_ts":1501796305000,"update_ts":1501796305000}
null {"c1":{"int":2},"c2":{"string":"foo"},"create_ts":1501796665000,"update_ts":1501796665000}

After the connector configuration is applied, new rows are inserted (piped) into the MySQL table:

"echo "insert into foobar (c1,c2) values (100,'bar');"|mysql --user=username --password=pw demo

The following is displayed in the Avro console consumer. Note that the key (the first value on the line) matches the value of c1, which was defined with the transforms.

100 {"c1":{"int":100},"c2":{"string":"bar"},"create_ts":1501799535000,"update_ts":1501799535000}

Predicates

Transformations can be configured with predicates so that the transformation is applied only to records which satisfy a condition. You can use predicates in a transformation chain and, when combined with the Kafka Connect Filter (Kafka) SMT for Confluent Platform, predicates can conditionally filter out specific records. For details and examples, see Predicates.