Filter (Apache Kafka)¶
The following provides usage information for the Apache Kafka® SMT
org.apache.kafka.connect.transforms.Filter. Designed to be used in conjunction with predicates.
Drops all records, filtering them from subsequent transformations in the chain. This is intended to be used conditionally to filter out records matching (or not matching) a predicate.
Transformations can be configured with predicates so that the transformation is applied only to records which satisfy a condition. You can use predicates in a transformation chain and, when combined with the Apache Kafka® Filter, predicates can conditionally filter out specific records.
Predicates are specified in the connector configuration. The following properties are used:
predicates: A set of aliases for predicates applied to one or more transformations.
predicates.$alias.type: Fully qualified class name for the predicate.
predicates.$alias.$predicateSpecificConfig: Configuration properties for the predicate.
All transformations have the implicit configuration properties
negate. A predicate is associated with a transformation by setting the
transformation’s predicate configuration to the predicate’s alias. The
predicate’s value can be reversed using the
negate configuration property.
Kafka Connect includes the following predicates:
org.apache.kafka.connect.transforms.predicates.TopicNameMatches: Matches records in a topic with a name matching a particular Java regular expression.
org.apache.kafka.connect.transforms.predicates.HasHeaderKey: Matches records which have a header with the given key.
org.apache.kafka.connect.transforms.predicates.RecordIsTombstone: Matches tombstone records (that is, records with a null value).
You have a source connector that produces records to many different topics and you want to do the following:
- Filter out the records in the
- Apply the
ExtractFieldtransformation with the field name
other_fieldto records in all topics, except the topic
To do this, you need to first filter out the records destined for the topic
foo. The Filter transformation removes records from further processing.
Next, you use the
TopicNameMatches predicate to apply the transformation
only to records in topics which match a certain regular expression. The only
configuration property for
TopicNameMatches is a Java regular expression
used as a pattern for matching against the topic name. The following example
shows this configuration:
"transforms": "Filter", "transforms.Filter.type": "org.apache.kafka.connect.transforms.Filter", "transforms.Filter.predicate": "IsFoo", "predicates": "IsFoo", "predicates.IsFoo.type": "org.apache.kafka.connect.transforms.predicates.TopicNameMatches", "predicates.IsFoo.pattern": "foo"
Using this configuration,
ExtractField is then applied only when the topic
name of the record is not
bar. The reason you can’t use
directly is because it would apply the transformation to matching topic names,
not topic names which do not match. The transformation’s implicit
configuration properties inverts the set of records which a predicate matches.
This configuration addition is shown below:
"transforms": "Filter", "Extract", "transforms.Filter.type": "org.apache.kafka.connect.transforms.Filter", "transforms.Filter.predicate": "IsFoo", "transforms.Extract.type": "org.apache.kafka.connect.transforms.ExtractField$Key", "transforms.Extract.field": "other_field", "transforms.Extract.predicate": "=IsBar", "transforms.Extract.negate": "true", "predicates": "IsFoo", "IsBar", "predicates.IsFoo.type": "org.apache.kafka.connect.transforms.predicates.TopicNameMatches", "predicates.IsFoo.pattern": "foo", "predicates.IsBar.type": "org.apache.kafka.connect.transforms.predicates.TopicNameMatches", "predicates.IsBar.pattern": "bar"
The following configuration shows how to use a predicate in a transformation
chain with the
ExtractField transformation and the
"transforms": "t2", "transforms.t2.predicate": "has-my-prefix", "transforms.t2.negate": "true", "transforms.t2.type": "org.apache.kafka.connect.transforms.ExtractField$Key", "transforms.t2.field": "c1", "predicates": "has-my-prefix", "predicates.has-my-prefix.type": "org.apache.kafka.connect.transforms.predicates.TopicNameMatches", "predicates.has-my-prefix.pattern": "my-prefix-.*"
t2 is only applied when the predicate
false (using the
negate=true parameter). The predicate is configured by the
keys with prefix
predicates.has-my-prefix. The predicate class is
org.apache.kafka.connect.transforms.predicates.TopicNameMatches and it’s pattern
parameter has the value
my-prefix-.* . With this configuration, the
transformation is applied only to records where the topic name does not
- The benefit of defining the predicate separately from the transform is it makes it easier to apply the same predicate to multiple transforms. For example, you can have one set of transforms use one predicate and another set of transforms use the same predicate for negation.
- For additional examples, see Filter (Apache Kafka) for managed connectors.
||A predicate which is true for records with a topic name that matches the configured regular expression.||string||non-empty string, valid regex||medium|
||A predicate which is true for records with at least one header with the configured name.||string||non-empty string||medium|
||A predicate which is true for records which are tombstones (that is, records with a null value).||medium|