Kafka Connect ExtractField SMT for Confluent Platform¶
The following provides usage information for the Apache Kafka® SMT
org.apache.kafka.connect.transforms.ExtractField
.
Description¶
ExtractField
pulls a field out of a complex (non-primitive, Map or Struct)
key or value and replaces the entire key or value with the extracted field. Any
null
values are passed through unmodified.
Use the concrete transformation type designed for the record key
(org.apache.kafka.connect.transforms.ExtractField$Key
) or value
(org.apache.kafka.connect.transforms.ExtractField$Value
).
Examples¶
The following examples show how to use ExtractField
by itself and in
conjunction with a second SMT.
Extract Field Name¶
The configuration snippet below shows how to use ExtractField
to extract the
field name "id"
.
"transforms": "ExtractField",
"transforms.ExtractField.type": "org.apache.kafka.connect.transforms.ExtractField$Value",
"transforms.ExtractField.field": "id"
Before: {"id": 42, "cost": 4000}
After: 42
Tip
For additional examples, see ExtractField for managed connectors.
Properties¶
Name | Description | Type | Default | Valid Values | Importance |
---|---|---|---|---|---|
field |
Field name to extract. | string | medium |
Chained Transformation¶
You can use SMTs together to perform a more complex transformation.
The following examples show how the ValueToKey
and ExtractField
SMTs are
chained together to set the key for data coming from a JDBC Connector. During the transform, ValueToKey
copies the
message c1
field into the message key and then ExtractField
extracts
just the integer portion of that field.
"transforms": "createKey,extractInt",
"transforms.createKey.type": "org.apache.kafka.connect.transforms.ValueToKey",
"transforms.createKey.fields": "c1",
"transforms.extractInt.type": "org.apache.kafka.connect.transforms.ExtractField$Key",
"transforms.extractInt.field": "c1"
The following shows what the message looked like before the transform.
"./bin/kafka-avro-console-consumer \
--bootstrap-server localhost:9092 \
--property schema.registry.url=http://localhost:8081 \
--property print.key=true \
--from-beginning \
--topic mysql-foobar
null {"c1":{"int":1},"c2":{"string":"foo"},"create_ts":1501796305000,"update_ts":1501796305000}
null {"c1":{"int":2},"c2":{"string":"foo"},"create_ts":1501796665000,"update_ts":1501796665000}
After the connector configuration is applied, new rows are inserted (piped) into the MySQL table:
"echo "insert into foobar (c1,c2) values (100,'bar');"|mysql --user=username --password=pw demo
The following is displayed in the Avro console consumer. Note that the key (the first value on the line) matches the value of c1, which was defined with the transforms.
100 {"c1":{"int":100},"c2":{"string":"bar"},"create_ts":1501799535000,"update_ts":1501799535000}
Predicates¶
Transformations can be configured with predicates so that the transformation is applied only to records which satisfy a condition. You can use predicates in a transformation chain and, when combined with the Kafka Connect Filter (Kafka) SMT for Confluent Platform, predicates can conditionally filter out specific records. For details and examples, see Predicates.