Kafka Connect ExtractField SMT Usage Reference for Confluent Platform
The ExtractField SMT (org.apache.kafka.connect.transforms.ExtractField) extracts a specified field from a complex Apache Kafka® record key or value and replaces the entire key or value with that field.
Description
The ExtractField SMT extracts a field from a complex (non-primitive, Map or Struct) key or value and replaces the entire key or value with the extracted field. ExtractField passes through any null values unmodified.
Use the specific transformation type designed for the record key (org.apache.kafka.connect.transforms.ExtractField$Key) or value (org.apache.kafka.connect.transforms.ExtractField$Value).
Properties
Name | Description | Type | Default | Valid Values | Importance |
|---|---|---|---|---|---|
| The name of the field to extract from the record key or value. | string | (required) | medium | |
| Specifies whether to replace null fields with their defined default values. When set to | boolean |
|
| medium |
| Defines the syntax for accessing fields. | string |
|
| high |
Examples
The following examples show how to use ExtractField by itself and along with the ValueToKey SMT.
Extract a field by name
The following configuration snippet shows how to use ExtractField to extract the field name "id".
"transforms": "ExtractField",
"transforms.ExtractField.type": "org.apache.kafka.connect.transforms.ExtractField$Value",
"transforms.ExtractField.field": "id"
Before: {"id": 42, "cost": 4000}
After: 42
Chained Transformation
You can use SMTs together to perform a more complex transformation.
The following examples show how the ValueToKey and ExtractField SMTs are chained together to set the key for data coming from a JDBC Connector. During the transform, ValueToKey copies the message c1 field into the message key and then ExtractField extracts just the integer portion of that field.
"transforms": "createKey,extractInt",
"transforms.createKey.type": "org.apache.kafka.connect.transforms.ValueToKey",
"transforms.createKey.fields": "c1",
"transforms.extractInt.type": "org.apache.kafka.connect.transforms.ExtractField$Key",
"transforms.extractInt.field": "c1"
The following shows what the message looked like before the transform.
"./bin/kafka-avro-console-consumer \
--bootstrap-server localhost:9092 \
--property schema.registry.url=http://localhost:8081 \
--property print.key=true \
--from-beginning \
--topic mysql-foobar
null {"c1":{"int":1},"c2":{"string":"foo"},"create_ts":1501796305000,"update_ts":1501796305000}
null {"c1":{"int":2},"c2":{"string":"foo"},"create_ts":1501796665000,"update_ts":1501796665000}
After the connector configuration is applied, new rows are inserted (piped) into the MySQL table:
"echo "insert into foobar (c1,c2) values (100,'bar');"|mysql --user=username --password=pw demo
The following is displayed in the Avro console consumer. Note that the key (the first value on the line) matches the value of c1, which was defined with the transforms.
100 {"c1":{"int":100},"c2":{"string":"bar"},"create_ts":1501799535000,"update_ts":1501799535000}
Predicates
Configure transformations with predicates to ensure they only process records satisfying a particular condition. You can also use predicates in a transformation chain along with the Kafka Connect Filter (Kafka) SMT Usage Reference for Confluent Platform to conditionally filter specific records. For more information, refer to Predicates.