Kafka Connect Filter (Confluent) SMT Usage Reference for Confluent Cloud
The io.confluent.connect.transforms.Filter Confluent Single Message Transform (SMT) includes or drops records based on a filter.condition JSON Path predicate.
Description
The filter.condition is a JSON Path predicate evaluated against each record. When a record matches, filter.type determines the outcome. Set filter.type to include to keep matching records, or to exclude to drop them.
The missing.or.null.behavior property controls what happens when a record lacks a field used in the predicate. By default, it is set to fail, which throws an error and fails the connector task. You can also set it to include or exclude to keep or drop those records. This setting does not apply to null or nonexistent fields inside predicates ([?()]), which are evaluated as undefined rather than missing and never match.
Use the transformation type designed for the record key (io.confluent.connect.transforms.Filter$Key) or value (io.confluent.connect.transforms.Filter$Value).
Installation
Confluent develops this transformation, and it does not ship by default with Kafka or Confluent Cloud. You can install this transformation using the confluent connect plugin install command:
confluent connect plugin install confluentinc/connect-transforms:latest
Examples
The following configuration snippet shows how to use and configure the Confluent Filter SMT.
filter.condition specified in the following examples is satisfied when the nested field 'nestedKey' in field 'key' is equal to either 'value1' or 'value2'.
Defines the configuration for including records satisfying the filter condition. The filter condition is applied on record value. The fail behavior in the configuration specifies to throw an exception and fail the connector task when the record does not have the fields used in the filter condition.
"transforms": "filterExample1",
"transforms.filterExample1.type": "io.confluent.connect.transforms.Filter$Value",
"transforms.filterExample1.filter.condition": "$.key[?(@.nestedKey == "value1" || @.nestedKey == "value2")]",
"transforms.filterExample1.filter.type": "include", "transforms.filterExample1.missing.or.null.behavior": "fail"
Defines the configuration for excluding records that satisfy the filter condition. The filter condition is applied on record key. The include behavior in the configuration specifies to pass the record through when the record does not have the fields used in the filter condition.
"transforms": "filterExample2",
"transforms.filterExample2.type": "io.confluent.connect.transforms.Filter$Key", "transforms.filterExample2.filter.condition": "$.key[?(@.nestedKey == "value1" || @.nestedKey == "value2")]", "transforms.filterExample2.filter.type": "exclude", "transforms.filterExample2.missing.or.null.behavior": "include"
This example includes syntax to filter on a simple schema:
"transforms":"filterExample3",
"transforms.filterExample3.type":"io.confluent.connect.transforms.Filter$Value",
"transforms.filterExample3.filter.condition": "$[?(@.fieldToFilter == 'valueToMatch')]",
"transforms.filterExample3.filter.type":"exclude",
"transforms.filterExample3.missing.or.null.behavior":"include",
For additional examples, see Filter (Confluent) for fully-managed connectors.
Properties
Name | Description | Type | Default | Valid Values | Importance |
|---|---|---|---|---|---|
| Specifies the criteria used to match records to be included or excluded by this transformation. Use JSON Path predicate notation defined in: https://github.com/json-path/JsonPath. | String | High | ||
| Specifies the action to perform with records that match the | String | [include, exclude] | High | |
| Specifies the behavior when the record does not have the fields used in the | String |
| [fail, include, exclude] | Medium |