Kafka Connect ExtractTopic SMT Usage Reference for Confluent Platform
The ExtractTopic SMT (io.confluent.connect.transforms.ExtractTopic) extracts data from a Apache Kafka® record key, value, or header and uses it as the topic name.
Description
The ExtractTopic SMT extracts data from a Apache Kafka® record and uses it as the topic name. You can either use the entire key or value (which should be a string), or use a field from a map or struct.
Use the specific transformation type designed for the record key (io.confluent.connect.transforms.ExtractTopic$Key) or value (io.confluent.connect.transforms.ExtractTopic$Value).
You can also extract the entire value from a message header value (string) by using the specific type (io.confluent.connect.transforms.ExtractTopic$Header).
Installation
Confluent develops this transformation and does not ship it by default with Apache Kafka® or Confluent Platform. You can install this transformation using the confluent connect plugin install command:
confluent connect plugin install confluentinc/connect-transforms:latest
Properties
Name | Description | Type | Default | Valid Values | Importance |
|---|---|---|---|---|---|
| The field name or header key to use as the topic name. Required when extracting from a header using | string | “” | medium | |
| The format used to parse the field path. The SMT supports two formats: If set to If set to | string |
|
| medium |
| Determines how to handle missing fields and null values. By default, the SMT throws an exception if a field defined in the Set to | boolean |
| low |
Examples
The following configuration snippet shows how to use and configure the ExtractTopic SMT.
"transforms": "KeyExample,ValueFieldExample,KeyFieldExample,FieldJsonPathExample,HeaderExample"
Use the key as the topic name
Use the key of the message as the topic name.
"transforms.KeyExample.type": "io.confluent.connect.transforms.ExtractTopic$Key"
Extract a field from the value
Extract a required field named f2 from the value, and use it as the topic name.
"transforms.ValueFieldExample.type": "io.confluent.connect.transforms.ExtractTopic$Value",
"transforms.ValueFieldExample.field": "f2"
Extract a field from the key
Extract a field named f3 from the key, and use it as the topic name. If the field is null or missing, leave the topic name as-is.
"transforms.KeyFieldExample.type": "io.confluent.connect.transforms.ExtractTopic$Key",
"transforms.KeyFieldExample.field": "f3",
"transforms.KeyFieldExample.skip.missing.or.null": "true"
Extract a nested field with JSON Path
Extract the value of a nested field — for example, the field f3 inside the f1 field — and use it as the topic name. The field path uses JSON Path syntax (for example, $["f1"]["f3"]). If the field is null or missing, leave the topic name as-is.
"transforms.FieldJsonPathExample.type": "io.confluent.connect.transforms.ExtractTopic$Value",
"transforms.FieldJsonPathExample.field": "$["f1"]["f3"]",
"transforms.FieldJsonPathExample.field.format": "JSON_PATH",
"transforms.FieldJsonPathExample.skip.missing.or.null": "true"
Extract a header value as the topic name
Extract the value of a message header (as a string) with key h1 (required) and use it as the topic name.
"transforms.HeaderExample.type": "io.confluent.connect.transforms.ExtractTopic$Header",
"transforms.HeaderExample.field": "h1"
Predicates
Configure transformations with predicates to ensure they only process records satisfying a particular condition. You can also use predicates in a transformation chain along with the Kafka Connect Filter (Kafka) SMT Usage Reference for Confluent Platform to conditionally filter specific records. For more information, refer to Predicates.