Kafka Connect Filter (Confluent) SMT Usage Reference for Confluent Cloud

The io.confluent.connect.transforms.Filter Confluent Single Message Transform (SMT) includes or drops records based on a filter.condition JSON Path predicate.

Description

The filter.condition is a JSON Path predicate evaluated against each record. When a record matches, filter.type determines the outcome. Set filter.type to include to keep matching records, or to exclude to drop them.

The missing.or.null.behavior property controls what happens when a record lacks a field used in the predicate. By default, it is set to fail, which throws an error and fails the connector task. You can also set it to include or exclude to keep or drop those records. This setting does not apply to null or nonexistent fields inside predicates ([?()]), which are evaluated as undefined rather than missing and never match.

Use the transformation type designed for the record key (io.confluent.connect.transforms.Filter$Key) or value (io.confluent.connect.transforms.Filter$Value).

Installation

Confluent develops this transformation, and it does not ship by default with Kafka or Confluent Cloud. You can install this transformation using the confluent connect plugin install command:

confluent connect plugin install confluentinc/connect-transforms:latest

Examples

The following configuration snippet shows how to use and configure the Confluent Filter SMT.

filter.condition specified in the following examples is satisfied when the nested field 'nestedKey' in field 'key' is equal to either 'value1' or 'value2'.

Defines the configuration for including records satisfying the filter condition. The filter condition is applied on record value. The fail behavior in the configuration specifies to throw an exception and fail the connector task when the record does not have the fields used in the filter condition.

"transforms": "filterExample1",
"transforms.filterExample1.type": "io.confluent.connect.transforms.Filter$Value",
"transforms.filterExample1.filter.condition": "$.key[?(@.nestedKey == "value1" || @.nestedKey == "value2")]",
"transforms.filterExample1.filter.type": "include", "transforms.filterExample1.missing.or.null.behavior": "fail"

Defines the configuration for excluding records that satisfy the filter condition. The filter condition is applied on record key. The include behavior in the configuration specifies to pass the record through when the record does not have the fields used in the filter condition.

"transforms": "filterExample2",
"transforms.filterExample2.type": "io.confluent.connect.transforms.Filter$Key", "transforms.filterExample2.filter.condition": "$.key[?(@.nestedKey == "value1" || @.nestedKey == "value2")]", "transforms.filterExample2.filter.type": "exclude", "transforms.filterExample2.missing.or.null.behavior": "include"

This example includes syntax to filter on a simple schema:

"transforms":"filterExample3",
"transforms.filterExample3.type":"io.confluent.connect.transforms.Filter$Value",
"transforms.filterExample3.filter.condition": "$[?(@.fieldToFilter == 'valueToMatch')]",
"transforms.filterExample3.filter.type":"exclude",
"transforms.filterExample3.missing.or.null.behavior":"include",

For additional examples, see Filter (Confluent) for fully-managed connectors.

Properties

Name

Description

Type

Default

Valid Values

Importance

filter.condition

Specifies the criteria used to match records to be included or excluded by this transformation. Use JSON Path predicate notation defined in: https://github.com/json-path/JsonPath.

String

High

filter.type

Specifies the action to perform with records that match the filter.condition predicate. Use include to pass through all records that match the predicate and drop all records that do not satisfy the predicate, or use exclude to drop all records that match the predicate.

String

[include, exclude]

High

missing.or.null.behavior

Specifies the behavior when the record does not have the fields used in the filter.condition. Use fail to throw an exception and fail the connector task, include to pass the record through, or exclude to drop the record. Not applicable for null or nonexistent fields inside predicates ([?()]), as they are evaluated as undefined rather than missing, and do not match any condition.

String

fail

[fail, include, exclude]

Medium