Kafka Connect ExtractTopic SMT for Confluent Cloud¶
The following provides usage information for the Confluent SMT io.confluent.connect.transforms.ExtractTopic
.
Description¶
Extract data from a message and use it as the topic name. You can either use the entire key/value (which should be a string), or use a field from a map or struct.
Use the concrete transformation type designed for the record key (io.confluent.connect.transforms.ExtractTopic$Key
) or value (io.confluent.connect.transforms.ExtractTopic$Value
).
You can also extract the entire value from a message header value (string) by using the concrete type (io.confluent.connect.transforms.ExtractTopic$Header
).
Installation¶
This transformation is developed by Confluent and does not ship by default with Apache Kafka® or Confluent Cloud. You can install this transformation using the confluent connect plugin install command:
confluent connect plugin install confluentinc/connect-transforms:latest
Examples¶
The configuration snippet below shows how to use and configure the
ExtractTopic
SMT.
"transforms": "KeyExample", "ValueFieldExample", "KeyFieldExample", "FieldJsonPathExample", "HeaderExample",
Use the key of the message as the topic name.
"transforms.KeyExample.type": "io.confluent.connect.transforms.ExtractTopic$Key"
Extract a required field named f2
from the value, and use it as the topic name.
"transforms.ValueFieldExample.type": "io.confluent.connect.transforms.ExtractTopic$Value",
"transforms.ValueFieldExample.field": "f2"
Extract a field named f3
from the key, and use it as the topic name. If the field is null or missing, leave the topic name as-is.
"transforms.KeyFieldExample.type": "io.confluent.connect.transforms.ExtractTopic$Value",
"transforms.KeyFieldExample.field": "f3",
"transforms.KeyFieldExample.skip.missing.or.null": "true"
Extract the value of a field named f3
in the f1
field in the key, and use it as the topic name.
Here the format of the field is defined with JSON Path (e.g., ["f1"]["f3"]
). If the field is null or missing, leave the topic name as-is.
"transforms.FieldJsonPathExample.type": "io.confluent.connect.transforms.ExtractTopic$Value",
"transforms.FieldJsonPathExample.field": "$["f1"]["f3"]",
"transforms.FieldJsonPathExample.field.format": "JSON_PATH",
"transforms.FieldJsonPathExample.skip.missing.or.null": "true"
Extract the value of a message header (as a string) with key h1
(required) and use it as the topic name.
"transforms.HeaderExample.type": "io.confluent.connect.transforms.ExtractTopic$Header",
"transforms.HeaderExample.field": "h1",
"transforms.FieldJsonPathExample.skip.missing.or.null=true"
Tip
For additional examples, see ExtractTopic for managed connectors.
Properties¶
Name | Description | Type | Default | Valid Values | Importance |
---|---|---|---|---|---|
field |
Field name to use as the topic name. If left blank, the entire key or value is used (and assumed to be a string). | string | “” | medium | |
field.format |
Specify field path format. Currently two formats are supported: JSON_PATH and PLAIN. If set to JSON_PATH, the transformer will interpret the field with JSON path intepreter, which supports nested field extraction. If left blank or set to PLAIN, the transformer will evaluate the field config as a non-nested field name. When using ExtractTopic$Header , only the default PLAIN format can be used, which will extract the header value as a string. |
string | “PLAIN” | “JSON_PATH”, “PLAIN” | medium |
skip.missing.or.null |
How to handle missing fields and null fields, keys, and values. By default, this transformation will throw an exception if a field defined in the field configuration is missing or null, or if no field is specified but the message’s key or value is null. If this configuration is set to true , the transformation will instead silently ignore these conditions and allow the record to pass through unaltered. |
boolean | false |
low |
Predicates¶
Transformations can be configured with predicates so that the transformation is applied only to records which satisfy a condition. You can use predicates in a transformation chain and, when combined with the Kafka Connect Filter (Kafka) SMT for Confluent Cloud, predicates can conditionally filter out specific records. For details and examples, see Predicates.