Kafka Connect Filter (Confluent) SMT Usage Reference for Confluent Cloud
The following provides usage information for the Confluent SMT io.confluent.connect.transforms.Filter. Include or drop records that match the filter.condition Predicate.
Description
Include or drop records that match the filter.condition predicate.
The filter.condition is a predicate specifying JSON Path that is applied to each record processed, and when this predicate successfully matches the record is either included (when filter.type=include) or excluded (when filter.type=exclude).
The missing.or.null.behavior property defines how the transform behaves when a record lacks field(s) used in the filter condition predicate. By default, the behavior is to fail. This property can also be set to either include or exclude records missing those predicate fields. It is not applicable to null or non-existent fields within predicates ([?()]), as these are evaluated as undefined rather than missing, and thus do not match any condition.
Use the transformation type designed for the record key (io.confluent.connect.transforms.Filter$Key) or value (io.confluent.connect.transforms.Filter$Value).
Installation
This transformation is developed by Confluent and does not ship by default with Kafka or Confluent Cloud. You can install this transformation using the confluent connect plugin install command:
confluent connect plugin install confluentinc/connect-transforms:latest
Examples
The configuration snippet below shows how to use and configure the Confluent Filter SMT.
The filter.condition is a predicate specifying JSON path. For more information on JSON path format, see https://github.com/json-path/JsonPath.
filter.condition specified in the following examples is satisfied when the nested field 'nestedKey' in field 'key' is equal to either 'value1' or 'value2'.
Defines the configuration for including records satisfying the filter condition. The filter condition is applied on record value. The fail behavior in the configuration specifies to throw an exception and fail the connector task when the record does not have the field(s) used in the filter condition.
"transforms": "filterExample1",
"transforms.filterExample1.type": "io.confluent.connect.transforms.Filter$Value",
"transforms.filterExample1.filter.condition": "$.key[?(@.nestedKey == "value1" || @.nestedKey == "value2")]",
"transforms.filterExample1.filter.type": "include", "transforms.filterExample1.missing.or.null.behavior": "fail"
Defines the configuration for excluding records that satisfy the filter condition. The filter condition is applied on record key. The include behavior in the configuration specifies to pass the record through when the record does not have the field(s) used in the filter condition.
"transforms": "filterExample2",
"transforms.filterExample2.type": "io.confluent.connect.transforms.Filter$Key", "transforms.filterExample2.filter.condition": "$.key[?(@.nestedKey == "value1" || @.nestedKey == "value2")]", "transforms.filterExample2.filter.type": "exclude", "transforms.filterExample2.missing.or.null.behavior": "include"
This example includes syntax to filter on a simple schema:
"transforms":"filterExample3",
"transforms.filterExample3.type":"io.confluent.connect.transforms.Filter$Value",
"transforms.filterExample3.filter.condition": "$[?(@.fieldToFilter == 'valueToMatch')]",
"transforms.filterExample3.filter.type":"exclude",
"transforms.filterExample3.missing.or.null.behavior":"include",
For additional examples, see Filter (Confluent) for fully-managed connectors.
Properties
Name | Description | Type | Default | Valid Values | Importance |
|---|---|---|---|---|---|
| Specifies the criteria used to match records to be included or excluded by this transformation. Use JSON Path predicate notation defined in: https://github.com/json-path/JsonPath. | string | high | ||
| Specifies the action to perform with records that match the | string | [include, exclude] | high | |
| Specifies the behavior when the record does not have the field(s) used in the | string |
| [fail, include, exclude] | medium |