Kafka Connect Filter (Confluent) SMT Usage Reference for Confluent Platform
The Confluent Filter SMT (io.confluent.connect.transforms.Filter) conditionally includes or drops Apache Kafka® records based on a JSON Path expression evaluated against the record’s key or value.
Description
The Filter SMT includes or drops records that match the filter.condition predicate.
The filter.condition is a JSON Path predicate applied to each record. When the predicate matches, the record is either included (filter.type=include) or excluded (filter.type=exclude).
When a record lacks fields used in the filter condition, the SMT’s behavior is controlled by the missing.or.null.behavior property. See Properties for details.
Use the transformation type designed for the record key (io.confluent.connect.transforms.Filter$Key) or value (io.confluent.connect.transforms.Filter$Value).
Installation
Confluent develops this transformation and does not ship it by default with Kafka or Confluent Platform. You can install this transformation using the confluent connect plugin install command:
confluent connect plugin install confluentinc/connect-transforms:latest
Properties
Name | Description | Type | Default | Valid Values | Importance |
|---|---|---|---|---|---|
| Specifies the criteria used to match records to be included or excluded by this transformation. For JSON Path syntax, see the JSON Path syntax reference. | string | (required) | Valid JSON Path expression | high |
| Specifies the action to perform with records that match the Use | string | (required) | [include, exclude] | high |
| Specifies the behavior when the record does not have the field(s) used in the Use Not applicable for null or non-existing fields inside predicates ( | string |
| [fail, include, exclude] | medium |
Examples
The following configuration snippets show how to use and configure the SMT.
The filter.condition is a predicate specifying JSON Path. For more information, see the JSON Path syntax reference.
filter.condition specified in the following examples is satisfied when the nested field 'nestedKey' in field 'key' is equal to either 'value1' or 'value2'.
Defines the configuration for including records satisfying the filter condition. The filter condition is applied on record value. The fail behavior in the configuration specifies to throw an exception and fail the connector task when the record does not have the field(s) used in the filter condition.
"transforms": "filterExample1",
"transforms.filterExample1.type": "io.confluent.connect.transforms.Filter$Value",
"transforms.filterExample1.filter.condition": "$.key[?(@.nestedKey == 'value1' || @.nestedKey == 'value2')]",
"transforms.filterExample1.filter.type": "include",
"transforms.filterExample1.missing.or.null.behavior": "fail"
Defines the configuration for excluding records that satisfy the filter condition. The filter condition is applied on record key. The include behavior in the configuration specifies to pass the record through when the record does not have the field(s) used in the filter condition.
"transforms": "filterExample2",
"transforms.filterExample2.type": "io.confluent.connect.transforms.Filter$Key",
"transforms.filterExample2.filter.condition": "$.key[?(@.nestedKey == 'value1' || @.nestedKey == 'value2')]",
"transforms.filterExample2.filter.type": "exclude",
"transforms.filterExample2.missing.or.null.behavior": "include"
This example includes syntax to filter on a simple schema:
"transforms": "filterExample3",
"transforms.filterExample3.type": "io.confluent.connect.transforms.Filter$Value",
"transforms.filterExample3.filter.condition": "$[?(@.fieldToFilter == 'valueToMatch')]",
"transforms.filterExample3.filter.type": "exclude",
"transforms.filterExample3.missing.or.null.behavior": "include",