Kafka Connect Filter (Confluent) SMT Usage Reference for Confluent Platform

The Confluent Filter SMT (io.confluent.connect.transforms.Filter) conditionally includes or drops Apache Kafka® records based on a JSON Path expression evaluated against the record’s key or value.

Description

The Filter SMT includes or drops records that match the filter.condition predicate.

The filter.condition is a JSON Path predicate applied to each record. When the predicate matches, the record is either included (filter.type=include) or excluded (filter.type=exclude).

When a record lacks fields used in the filter condition, the SMT’s behavior is controlled by the missing.or.null.behavior property. See Properties for details.

Use the transformation type designed for the record key (io.confluent.connect.transforms.Filter$Key) or value (io.confluent.connect.transforms.Filter$Value).

Installation

Confluent develops this transformation and does not ship it by default with Kafka or Confluent Platform. You can install this transformation using the confluent connect plugin install command:

confluent connect plugin install confluentinc/connect-transforms:latest

Properties

Name

Description

Type

Default

Valid Values

Importance

filter.condition

Specifies the criteria used to match records to be included or excluded by this transformation. For JSON Path syntax, see the JSON Path syntax reference.

string

(required)

Valid JSON Path expression

high

filter.type

Specifies the action to perform with records that match the filter.condition predicate.

Use include to pass through all records that match the predicate and drop all records that do not satisfy the predicate, or use exclude to drop all records that match the predicate.

string

(required)

[include, exclude]

high

missing.or.null.behavior

Specifies the behavior when the record does not have the field(s) used in the filter.condition.

Use fail to throw an exception and fail the connector task, include to pass the record through, or exclude to drop the record.

Not applicable for null or non-existing fields inside predicates ([?()]), as they are evaluated as undefined rather than missing, and do not match any condition.

string

fail

[fail, include, exclude]

medium

Examples

The following configuration snippets show how to use and configure the SMT.

The filter.condition is a predicate specifying JSON Path. For more information, see the JSON Path syntax reference.

filter.condition specified in the following examples is satisfied when the nested field 'nestedKey' in field 'key' is equal to either 'value1' or 'value2'.

Defines the configuration for including records satisfying the filter condition. The filter condition is applied on record value. The fail behavior in the configuration specifies to throw an exception and fail the connector task when the record does not have the field(s) used in the filter condition.

"transforms": "filterExample1",
"transforms.filterExample1.type": "io.confluent.connect.transforms.Filter$Value",
"transforms.filterExample1.filter.condition": "$.key[?(@.nestedKey == 'value1' || @.nestedKey == 'value2')]",
"transforms.filterExample1.filter.type": "include",
"transforms.filterExample1.missing.or.null.behavior": "fail"

Defines the configuration for excluding records that satisfy the filter condition. The filter condition is applied on record key. The include behavior in the configuration specifies to pass the record through when the record does not have the field(s) used in the filter condition.

"transforms": "filterExample2",
"transforms.filterExample2.type": "io.confluent.connect.transforms.Filter$Key",
"transforms.filterExample2.filter.condition": "$.key[?(@.nestedKey == 'value1' || @.nestedKey == 'value2')]",
"transforms.filterExample2.filter.type": "exclude",
"transforms.filterExample2.missing.or.null.behavior": "include"

This example includes syntax to filter on a simple schema:

"transforms": "filterExample3",
"transforms.filterExample3.type": "io.confluent.connect.transforms.Filter$Value",
"transforms.filterExample3.filter.condition": "$[?(@.fieldToFilter == 'valueToMatch')]",
"transforms.filterExample3.filter.type": "exclude",
"transforms.filterExample3.missing.or.null.behavior": "include",