The following provides usage information for the Confluent SMT
Include or drop records that match the
filter.condition is a predicate specifying JsonPath that is applied to each record processed, and when this predicate successfully matches the record is either included (when
filter.type=include) or excluded (when
missing.or.null.behavior property defines how the transform behaves when a record does not have the field(s) used in the filter condition predicate. By default, the behavior is set to fail. However, the behavior can also be set to include or exclude the record that is missing the predicate’s field(s).
Use the transformation type designed for the record key
io.confluent.connect.transforms.Filter$Key) or value
This transformation is developed by Confluent and does not ship by default with Kafka or Confluent Platform. You can install this transformation using the Confluent Hub Client:
confluent-hub install confluentinc/connect-transforms:latest
The configuration snippets below shows how to use and configure the
filter.condition is a predicate specifying JSON path. For more
information on the JsonPath format, see JsonPath.
filterExample1 below defines the configuration for
including records satisfying the filter condition. The
filter.condition predicate specified in the record
value is satisfied when the nested field
'author' in field
'book' is equal to either
fail behavior is configured to throw an exception and fail the connector task when the record does not have the field(s) used in the filter condition.
transforms=filterExample1 # Use the 'io.confluent.connect.transforms.Filter$Value' as the source (record 'value') on which # 'filter.condition' shall be applied. transforms.filterExample1.type=io.confluent.connect.transforms.Filter$Value transforms.filterExample1.filter.condition=$.book[?(@.author == "Tolstoy" || @author == "Dostoevsky")] # Use 'include' to pass through all records that match the predicate. transforms.filterExample1.filter.type=include # Use the 'fail' behavior to throw an exception and fail the connector task when the record does # not have the field(s) used in the 'filter.condition'. transforms.filterExample1.missing.or.null.behavior=fail
filterExample2 below defines the configuration for
excluding records satisfying the filter condition. The filter condition is applied to record
key. The example uses the field
timestamp and the nested field
include behavior is configured to pass the record through when the record does not have the field(s) used in the filter condition.
transforms=filterExample2 # Use the 'io.confluent.connect.transforms.Filter$Key' as the source (record 'key') on which # 'filter.condition' shall be applied. transforms.filterExample2.type=io.confluent.connect.transforms.Filter$Key transforms.filterExample2.filter.condition=$.timestamp[?(@.epoch == "1594152325" || @epoch == "1594152326")] # Use 'exclude' to drop all records that match the predicate. transforms.filterExample2.filter.type=exclude # Use the 'include' behavior to pass the record through when the record does not have the field(s) # used in the 'filter.condition'. transforms.filterExample2.missing.or.null.behavior=include
||Specifies the criteria used to match records to be included or excluded by this transformation. Use JSON Path predicate notation defined in: https://github.com/json-path/JsonPath.||string||high|
||Specifies the action to perform with records that match the
||Specifies the behavior when the record does not have the field(s) used
||[fail, include, exclude]||medium|