Kafka Connect ValueToKey SMT for Confluent Platform

The following provides usage information for the Apache Kafka® SMT org.apache.kafka.connect.transforms.ValueToKey.

Description

Replace the record key with a new key formed from a subset of fields in the record value.

Examples

These examples show how to use ValueToKey by itself and in conjunction with a second SMT.

Transform Fields to Message Key

This configuration snippet shows how to use ValueToKey to transform the UserId, city, and state fields into a message key.

"transforms": "ValueToKey",
"transforms.ValueToKey.type": "org.apache.kafka.connect.transforms.ValueToKey",
"transforms.ValueToKey.fields": "userId,city,state"

Before: {"userId": 12, "address": "1942 Wilhelm Boulevard", "city": "Topeka", "state": "KS", "country": "US"}

After: {"userId": 12, "city": "Topeka", "state": "KS"}

Chained Transformation

You can use SMTs together to perform a more complex transformation.

The following examples show how the ValueToKey and ExtractField SMTs are chained together to set the key for data coming from a JDBC Connector. During the transform, ValueToKey copies the message c1 field into the message key and then ExtractField extracts just the integer portion of that field.

"transforms": "createKey,extractInt",
"transforms.createKey.type": "org.apache.kafka.connect.transforms.ValueToKey",
"transforms.createKey.fields": "c1",
"transforms.extractInt.type": "org.apache.kafka.connect.transforms.ExtractField$Key",
"transforms.extractInt.field": "c1"

The following shows what the message looked like before the transform.

"./bin/kafka-avro-console-consumer \
                              --bootstrap-server localhost:9092 \
                              --property schema.registry.url=http://localhost:8081 \
                              --property print.key=true \
                              --from-beginning \
                              --topic mysql-foobar

null {"c1":{"int":1},"c2":{"string":"foo"},"create_ts":1501796305000,"update_ts":1501796305000}
null {"c1":{"int":2},"c2":{"string":"foo"},"create_ts":1501796665000,"update_ts":1501796665000}

After the connector configuration is applied, new rows are inserted (piped) into the MySQL table:

"echo "insert into foobar (c1,c2) values (100,'bar');"|mysql --user=username --password=pw demo

The following is displayed in the Avro console consumer. Note that the key (the first value on the line) matches the value of c1, which was defined with the transforms.

100 {"c1":{"int":100},"c2":{"string":"bar"},"create_ts":1501799535000,"update_ts":1501799535000}

Properties

Name Description Type Default Valid Values Importance
fields Field names on the record value to extract as the record key. list   non-empty list high

Predicates

Transformations can be configured with predicates so that the transformation is applied only to records which satisfy a condition. You can use predicates in a transformation chain and, when combined with the Kafka Connect Filter (Kafka) SMT for Confluent Platform, predicates can conditionally filter out specific records. For details and examples, see Predicates.