Kafka Connect ValueToKey SMT Usage Reference for Confluent Platform
The ValueToKey SMT (org.apache.kafka.connect.transforms.ValueToKey) replaces an Apache Kafka® record’s key with a new key formed from a subset of fields in the record value.
Description
Use this SMT to copy one or more fields from the record value into the record key.
Properties
Name | Description | Type | Default | Valid values | Importance |
|---|---|---|---|---|---|
| Specifies field names on the record value to extract as the record key. | list | non-empty list | high | |
| Specifies whether to replace null fields with their defined default values. When set to | boolean |
|
| medium |
Examples
These examples show how to use ValueToKey by itself and with a second SMT.
Transform fields to a message key
This configuration snippet shows how to use ValueToKey to transform the userId, city, and state fields into a message key.
"transforms": "ValueToKey",
"transforms.ValueToKey.type": "org.apache.kafka.connect.transforms.ValueToKey",
"transforms.ValueToKey.fields": "userId,city,state"
Before: {"userId": 12, "address": "1942 Wilhelm Boulevard", "city": "Topeka", "state": "KS", "country": "US"}
After: {"userId": 12, "city": "Topeka", "state": "KS"}
Chained transformation
You can use SMTs together to perform a more complex transformation.
The following examples show how the ValueToKey and ExtractField SMTs are chained together to set the key for data from a JDBC Connector. During the transform, ValueToKey copies the message c1 field into the message key and then ExtractField extracts just the integer portion of that field.
"transforms": "createKey,extractInt",
"transforms.createKey.type": "org.apache.kafka.connect.transforms.ValueToKey",
"transforms.createKey.fields": "c1",
"transforms.extractInt.type": "org.apache.kafka.connect.transforms.ExtractField$Key",
"transforms.extractInt.field": "c1"
The following shows what the message looked like before the transform.
./bin/kafka-avro-console-consumer \
--bootstrap-server localhost:9092 \
--property schema.registry.url=http://localhost:8081 \
--property print.key=true \
--from-beginning \
--topic mysql-foobar
null {"c1":{"int":1},"c2":{"string":"foo"},"create_ts":1501796305000,"update_ts":1501796305000}
null {"c1":{"int":2},"c2":{"string":"foo"},"create_ts":1501796665000,"update_ts":1501796665000}
After the connector configuration is applied, new rows are inserted (piped) into the MySQL table:
echo "insert into foobar (c1,c2) values (100,'bar');" | mysql --user=username --password=pw demo
The following is displayed in the Avro console consumer. The key (the first value on the line) matches the value of c1, which the transforms set.
100 {"c1":{"int":100},"c2":{"string":"bar"},"create_ts":1501799535000,"update_ts":1501799535000}
Predicates
Configure transformations with predicates to ensure they process only records that satisfy a particular condition. You can also use predicates in a transformation chain with the Kafka Connect Filter (Kafka) SMT Usage Reference for Confluent Platform to conditionally filter specific records. For more information, see Predicates.