Kafka Connect Filter (Kafka) SMT Usage Reference for Confluent Platform
The Apache Kafka® Filter SMT (org.apache.kafka.connect.transforms.Filter) drops Kafka records from subsequent transformations in the chain. Use it along with a predicate to conditionally filter out specific records.
Description
The Filter SMT removes records that match a configured predicate. Without a predicate, the SMT drops all records. To use Filter conditionally, configure a predicate on the transformation and optionally set negate: true to invert the match.
Predicates
A predicate is a condition that controls whether a transformation is applied to a given record.
Transformations can be configured with predicates so that the transformation is applied only to records which satisfy a condition. You can use predicates in a transformation chain and, when combined with the Kafka Filter, predicates can conditionally filter out specific records.
Predicates are specified in the connector configuration. The following properties are used:
predicates: A set of aliases for predicates applied to one or more transformations.predicates.$alias.type: Fully qualified class name for the predicate.predicates.$alias.$predicateSpecificConfig: Configuration properties for the predicate.
All transformations have the implicit configuration properties predicate and negate. A predicate is associated with a transformation by setting the transformation’s predicate configuration to the predicate’s alias. The predicate’s value can be reversed using the negate configuration property.
Kafka Connect includes the following predicates:
org.apache.kafka.connect.transforms.predicates.TopicNameMatches: Matches records in a topic with a name matching a particular Java regular expression.org.apache.kafka.connect.transforms.predicates.HasHeaderKey: Matches records which have a header with the given key.org.apache.kafka.connect.transforms.predicates.RecordIsTombstone: Matches tombstone records (that is, records with a null value).
Tip
The benefit of defining the predicate separately from the transform is it makes it easier to apply the same predicate to multiple transforms. For example, you can have one set of transforms use one predicate and another set of transforms use the same predicate for negation.
Predicate properties
Predicate class | Description | Config property | Default | Valid values | Importance |
|---|---|---|---|---|---|
| A predicate that is true for records with a topic name matching the configured Java regular expression. |
| (required) | Non-empty string; valid Java regex | medium |
| A predicate that is true for records with at least one header with the configured name. |
| (required) | Non-empty string | medium |
| A predicate that is true for tombstone records (records with a | None — no configuration required. | — | — | — |
Predicate examples
Example 1: Filter records from a specific topic
You have a source connector that produces records to many different topics and you want to do the following:
Filter out the records in the
footopic entirely.Apply the
ExtractFieldtransformation with the field nameother_fieldto records in all topics, except the topicbar.
To do this, you must first filter out the records destined for the topic foo. The Filter transformation removes records from further processing.
Next, use the TopicNameMatches predicate to apply the transformation only to records in topics which match a certain regular expression. The only configuration property for TopicNameMatches is a Java regular expression used as a pattern for matching against the topic name. The following example shows this configuration:
"transforms": "Filter",
"transforms.Filter.type": "org.apache.kafka.connect.transforms.Filter",
"transforms.Filter.predicate": "IsFoo",
"predicates": "IsFoo",
"predicates.IsFoo.type": "org.apache.kafka.connect.transforms.predicates.TopicNameMatches",
"predicates.IsFoo.pattern": "foo"
Using this configuration, ExtractField is then applied only when the topic name of the record is not bar. The reason you can’t use TopicNameMatches directly is because it would apply the transformation to matching topic names, not topic names which do not match.
The transformation’s implicit negate configuration property inverts the set of records which a predicate matches. The following shows this configuration addition:
"transforms": "Filter,Extract",
"transforms.Filter.type": "org.apache.kafka.connect.transforms.Filter",
"transforms.Filter.predicate": "IsFoo",
"transforms.Extract.type": "org.apache.kafka.connect.transforms.ExtractField$Key",
"transforms.Extract.field": "other_field",
"transforms.Extract.predicate": "IsBar",
"transforms.Extract.negate": "true",
"predicates": "IsFoo,IsBar",
"predicates.IsFoo.type": "org.apache.kafka.connect.transforms.predicates.TopicNameMatches",
"predicates.IsFoo.pattern": "foo",
"predicates.IsBar.type": "org.apache.kafka.connect.transforms.predicates.TopicNameMatches",
"predicates.IsBar.pattern": "bar"
Example 2: Apply a transformation with negate
The following configuration shows how to use a predicate in a transformation chain with the ExtractField transformation and the negate=true configuration property:
"transforms": "t2",
"transforms.t2.predicate": "has-my-prefix",
"transforms.t2.negate": "true",
"transforms.t2.type": "org.apache.kafka.connect.transforms.ExtractField$Key",
"transforms.t2.field": "c1",
"predicates": "has-my-prefix",
"predicates.has-my-prefix.type": "org.apache.kafka.connect.transforms.predicates.TopicNameMatches",
"predicates.has-my-prefix.pattern": "my-prefix-.*"
The transform t2 is only applied when the predicate has-my-prefix is false (using the negate=true parameter). The predicate is configured by the keys with prefix predicates.has-my-prefix. The predicate class is org.apache.kafka.connect.transforms.predicates.TopicNameMatches and its pattern parameter has the value my-prefix-.*.
With this configuration, the transformation is applied only to records where the topic name does not start with my-prefix-.