Kafka Connect ExtractTopic SMT Usage Reference for Confluent Cloud
The following provides usage information for the Confluent SMT io.confluent.connect.transforms.ExtractTopic.
Description
Extract data from a message and use it as the topic name. You can either use the entire key/value (which should be a string), or use a field from a map or struct. Use the concrete transformation type designed for the record key (io.confluent.connect.transforms.ExtractTopic$Key) or value (io.confluent.connect.transforms.ExtractTopic$Value). You can also extract the entire value from a message header value (string) by using the concrete type (io.confluent.connect.transforms.ExtractTopic$Header).
Installation
This transformation is developed by Confluent and does not ship by default with Apache Kafka® or Confluent Cloud. You can install this transformation using the confluent connect plugin install command:
confluent connect plugin install confluentinc/connect-transforms:latest
Examples
The configuration snippet below shows how to use and configure the ExtractTopic SMT.
"transforms": "KeyExample", "ValueFieldExample", "KeyFieldExample", "FieldJsonPathExample", "HeaderExample",
Use the key of the message as the topic name.
"transforms.KeyExample.type": "io.confluent.connect.transforms.ExtractTopic$Key"
Extract a required field named f2 from the value, and use it as the topic name.
"transforms.ValueFieldExample.type": "io.confluent.connect.transforms.ExtractTopic$Value",
"transforms.ValueFieldExample.field": "f2"
Extract a field named f3 from the key, and use it as the topic name. If the field is null or missing, leave the topic name as-is.
"transforms.KeyFieldExample.type": "io.confluent.connect.transforms.ExtractTopic$Value",
"transforms.KeyFieldExample.field": "f3",
"transforms.KeyFieldExample.skip.missing.or.null": "true"
Extract the value of a field named f3 in the f1 field in the key, and use it as the topic name. Here the format of the field is defined with JSON Path (e.g., ["f1"]["f3"]). If the field is null or missing, leave the topic name as-is.
"transforms.FieldJsonPathExample.type": "io.confluent.connect.transforms.ExtractTopic$Value",
"transforms.FieldJsonPathExample.field": "$["f1"]["f3"]",
"transforms.FieldJsonPathExample.field.format": "JSON_PATH",
"transforms.FieldJsonPathExample.skip.missing.or.null": "true"
Extract the value of a message header (as a string) with key h1 (required) and use it as the topic name.
"transforms.HeaderExample.type": "io.confluent.connect.transforms.ExtractTopic$Header",
"transforms.HeaderExample.field": "h1",
"transforms.FieldJsonPathExample.skip.missing.or.null=true"
Tip
For additional examples, see ExtractTopic for managed connectors.
Properties
Name | Description | Type | Default | Valid Values | Importance |
|---|---|---|---|---|---|
| Field name to use as the topic name. If left blank, the entire key or value is used (and assumed to be a string). | string | “” | medium | |
| Specify field path format. Currently two formats are supported: JSON_PATH and PLAIN. If set to JSON_PATH, the transformer will interpret the field with JSON path interpreter, which supports nested field extraction. If left blank or set to PLAIN, the transformer will evaluate the field config as a non-nested field name. When using | string | “PLAIN” | “JSON_PATH”, “PLAIN” | medium |
| How to handle missing fields and null fields, keys, and values. By default, this transformation will throw an exception if a field defined in the | boolean |
| low |
Predicates
Transformations can be configured with predicates so that the transformation is applied only to records which satisfy a condition. You can use predicates in a transformation chain and, when combined with the Kafka Connect Filter (Kafka) SMT Usage Reference for Confluent Cloud, predicates can conditionally filter out specific records. For details and examples, see Predicates.