Kafka Connect ValueToKey SMT for Confluent Platform¶
The following provides usage information for the Apache Kafka® SMT org.apache.kafka.connect.transforms.ValueToKey
.
Description¶
Replace the record key with a new key formed from a subset of fields in the record value.
Examples¶
These examples show how to use ValueToKey
by itself and in conjunction with
a second SMT.
Transform Fields to Message Key¶
This configuration snippet shows how to use ValueToKey
to transform the
UserId
, city
, and state
fields into a message key.
"transforms": "ValueToKey",
"transforms.ValueToKey.type": "org.apache.kafka.connect.transforms.ValueToKey",
"transforms.ValueToKey.fields": "userId,city,state"
Before: {"userId": 12, "address": "1942 Wilhelm Boulevard", "city": "Topeka", "state": "KS", "country": "US"}
After: {"userId": 12, "city": "Topeka", "state": "KS"}
Chained Transformation¶
You can use SMTs together to perform a more complex transformation.
The following examples show how the ValueToKey
and ExtractField
SMTs are
chained together to set the key for data coming from a JDBC Connector. During the transform, ValueToKey
copies the
message c1
field into the message key and then ExtractField
extracts
just the integer portion of that field.
"transforms": "createKey,extractInt",
"transforms.createKey.type": "org.apache.kafka.connect.transforms.ValueToKey",
"transforms.createKey.fields": "c1",
"transforms.extractInt.type": "org.apache.kafka.connect.transforms.ExtractField$Key",
"transforms.extractInt.field": "c1"
The following shows what the message looked like before the transform.
"./bin/kafka-avro-console-consumer \
--bootstrap-server localhost:9092 \
--property schema.registry.url=http://localhost:8081 \
--property print.key=true \
--from-beginning \
--topic mysql-foobar
null {"c1":{"int":1},"c2":{"string":"foo"},"create_ts":1501796305000,"update_ts":1501796305000}
null {"c1":{"int":2},"c2":{"string":"foo"},"create_ts":1501796665000,"update_ts":1501796665000}
After the connector configuration is applied, new rows are inserted (piped) into the MySQL table:
"echo "insert into foobar (c1,c2) values (100,'bar');"|mysql --user=username --password=pw demo
The following is displayed in the Avro console consumer. Note that the key (the first value on the line) matches the value of c1, which was defined with the transforms.
100 {"c1":{"int":100},"c2":{"string":"bar"},"create_ts":1501799535000,"update_ts":1501799535000}
Properties¶
Name | Description | Type | Default | Valid Values | Importance |
---|---|---|---|---|---|
fields |
Field names on the record value to extract as the record key. | list | non-empty list | high |
Predicates¶
Transformations can be configured with predicates so that the transformation is applied only to records which satisfy a condition. You can use predicates in a transformation chain and, when combined with the Kafka Connect Filter (Kafka) SMT for Confluent Platform, predicates can conditionally filter out specific records. For details and examples, see Predicates.