Kafka Connect ExtractTopic SMT Usage Reference for Confluent Platform

The ExtractTopic SMT (io.confluent.connect.transforms.ExtractTopic) extracts data from a Apache Kafka® record key, value, or header and uses it as the topic name.

Description

The ExtractTopic SMT extracts data from a Apache Kafka® record and uses it as the topic name. You can either use the entire key or value (which should be a string), or use a field from a map or struct.

Use the specific transformation type designed for the record key (io.confluent.connect.transforms.ExtractTopic$Key) or value (io.confluent.connect.transforms.ExtractTopic$Value).

You can also extract the entire value from a message header value (string) by using the specific type (io.confluent.connect.transforms.ExtractTopic$Header).

Installation

Confluent develops this transformation and does not ship it by default with Apache Kafka® or Confluent Platform. You can install this transformation using the confluent connect plugin install command:

confluent connect plugin install confluentinc/connect-transforms:latest

Properties

Name

Description

Type

Default

Valid Values

Importance

field

The field name or header key to use as the topic name. Required when extracting from a header using ExtractTopic$Header. For ExtractTopic$Key or ExtractTopic$Value, leaving this blank configures the SMT to use the entire key or value as a string for the topic name.

string

“”

medium

field.format

The format used to parse the field path. The SMT supports two formats: JSON_PATH and PLAIN.

If set to JSON_PATH, the transformer interprets the field with a JSON path interpreter, which supports nested field extraction.

If set to PLAIN (or left blank), the transformer evaluates the field configuration as a non-nested field name. When using ExtractTopic$Header, only the default PLAIN format can be used, which extracts the header value as a string.

string

PLAIN

JSON_PATH, PLAIN

medium

skip.missing.or.null

Determines how to handle missing fields and null values.

By default, the SMT throws an exception if a field defined in the field configuration is missing or null, or if no field is specified but the message’s key or value is null.

Set to true to silently ignore these conditions and pass the record unaltered.

boolean

false

low

Examples

The following configuration snippet shows how to use and configure the ExtractTopic SMT.

"transforms": "KeyExample,ValueFieldExample,KeyFieldExample,FieldJsonPathExample,HeaderExample"

Use the key as the topic name

Use the key of the message as the topic name.

"transforms.KeyExample.type": "io.confluent.connect.transforms.ExtractTopic$Key"

Extract a field from the value

Extract a required field named f2 from the value, and use it as the topic name.

"transforms.ValueFieldExample.type": "io.confluent.connect.transforms.ExtractTopic$Value",
"transforms.ValueFieldExample.field": "f2"

Extract a field from the key

Extract a field named f3 from the key, and use it as the topic name. If the field is null or missing, leave the topic name as-is.

"transforms.KeyFieldExample.type": "io.confluent.connect.transforms.ExtractTopic$Key",
"transforms.KeyFieldExample.field": "f3",
"transforms.KeyFieldExample.skip.missing.or.null": "true"

Extract a nested field with JSON Path

Extract the value of a nested field — for example, the field f3 inside the f1 field — and use it as the topic name. The field path uses JSON Path syntax (for example, $["f1"]["f3"]). If the field is null or missing, leave the topic name as-is.

"transforms.FieldJsonPathExample.type": "io.confluent.connect.transforms.ExtractTopic$Value",
"transforms.FieldJsonPathExample.field": "$["f1"]["f3"]",
"transforms.FieldJsonPathExample.field.format": "JSON_PATH",
"transforms.FieldJsonPathExample.skip.missing.or.null": "true"

Extract a header value as the topic name

Extract the value of a message header (as a string) with key h1 (required) and use it as the topic name.

"transforms.HeaderExample.type": "io.confluent.connect.transforms.ExtractTopic$Header",
"transforms.HeaderExample.field": "h1"

Predicates

Configure transformations with predicates to ensure they only process records satisfying a particular condition. You can also use predicates in a transformation chain along with the Kafka Connect Filter (Kafka) SMT Usage Reference for Confluent Platform to conditionally filter specific records. For more information, refer to Predicates.