Filter(Apache Kafka)¶
ここでは、Apache Kafka® SMT org.apache.kafka.connect.transforms.Filter
の使用方法を説明します。述語と組み合わせて使用します。
説明¶
すべてのレコードをドロップし、チェーンのそれ以降の変換から除外します。述語と一致する(または一致しない)レコードを条件に応じて除外するために使用します。
述語¶
述語を使用することにより、一定の条件を満たすレコードのみに変換が適用されるように、変換を構成できます。述語は変換チェーンで使用することができ、Apache Kafka® フィルターと組み合わせると、条件に基づいて特定のレコードを除外できます。
述語は、コネクターの構成で指定します。使用するプロパティは次のとおりです。
predicates
: 1 つ以上の変換に適用される述語の一連のエイリアス。predicates.$alias.type
: 述語の完全修飾クラス名。predicates.$alias.$predicateSpecificConfig
: 述語の構成プロパティ。
すべての変換には、predicate
と negate
という暗黙的な構成プロパティがあります。変換の述語の構成に述語のエイリアスを設定することにより、述語が変換と関連付けられます。negate
構成プロパティを使用すると、述語の値を反転することができます。
Kafka Connect には以下の述語が含まれています。
org.apache.kafka.connect.transforms.predicates.TopicNameMatches
: 特定の Java 正規表現と一致する名前を持つトピックのレコードを一致とします。org.apache.kafka.connect.transforms.predicates.HasHeaderKey
: 指定されたキーがヘッダーに含まれているレコードを一致とします。org.apache.kafka.connect.transforms.predicates.RecordIsTombstone
: tombstone レコード(null 値を持つレコード)を一致とします。
述語の例¶
例 1:
複数のトピックに対してレコードを生成するソースコネクターがあり、以下のことを行う必要があります。
foo
トピックのレコードを完全に除外します。- フィールド名
other_field
を指定してExtractField
の変換を、トピックbar
を除くすべてのトピックのレコードに適用します。
これを行うには、まず、トピック foo
へのレコードを除外する必要があります。Filter の変換により、それ以降の処理からレコードが除外されます。
次に、TopicNameMatches
の述語を使用して、特定の正規表現と一致するトピック内のレコードのみに変換を適用します。TopicNameMatches
の構成プロパティは、トピック名とのマッチングを行うためのパターンとして使用される Java 正規表現のみです。次の例は、この構成を示しています。
"transforms": "Filter",
"transforms.Filter.type": "org.apache.kafka.connect.transforms.Filter",
"transforms.Filter.predicate": "IsFoo",
"predicates": "IsFoo",
"predicates.IsFoo.type": "org.apache.kafka.connect.transforms.predicates.TopicNameMatches",
"predicates.IsFoo.pattern": "foo"
この構成を使用して、レコードのトピック名が bar
でない場合にのみ ExtractField
を適用します。TopicNameMatches
を直接使用できないのは、この述語では、一致しないトピック名ではなく、一致するトピック名に対して変換が適用されるためです。変換の暗黙的な negate
構成プロパティを使用すると、述語と一致するレコードのセットが反転されます。この構成を追加した例を以下に示します。
"transforms": "Filter", "Extract",
"transforms.Filter.type": "org.apache.kafka.connect.transforms.Filter",
"transforms.Filter.predicate": "IsFoo",
"transforms.Extract.type": "org.apache.kafka.connect.transforms.ExtractField$Key",
"transforms.Extract.field": "other_field",
"transforms.Extract.predicate": "=IsBar",
"transforms.Extract.negate": "true",
"predicates": "IsFoo", "IsBar",
"predicates.IsFoo.type": "org.apache.kafka.connect.transforms.predicates.TopicNameMatches",
"predicates.IsFoo.pattern": "foo",
"predicates.IsBar.type": "org.apache.kafka.connect.transforms.predicates.TopicNameMatches",
"predicates.IsBar.pattern": "bar"
例 2:
以下の構成は、ExtractField
変換と negate=true
構成プロパティを使用して、変換チェーンで述語を使用する方法を示しています。
"transforms": "t2",
"transforms.t2.predicate": "has-my-prefix",
"transforms.t2.negate": "true",
"transforms.t2.type": "org.apache.kafka.connect.transforms.ExtractField$Key",
"transforms.t2.field": "c1",
"predicates": "has-my-prefix",
"predicates.has-my-prefix.type": "org.apache.kafka.connect.transforms.predicates.TopicNameMatches",
"predicates.has-my-prefix.pattern": "my-prefix-.*"
変換 t2
は、述語 has-my-prefix
が false の場合(negate=true
パラメーターを使用)にのみ適用されます。述語は、プレフィックス predicates.has-my-prefix
を持つキーで構成されています。述語のクラスは org.apache.kafka.connect.transforms.predicates.TopicNameMatches
であり、パターンパラメーターの値は my-prefix-.*
です。この構成では、トピック名の先頭が my-prefix-
では ない レコードのみに変換が適用されます。
ちなみに
- 述語を変換とは切り離して定義すると、同じ述語を複数の変換に簡単に適用できるという利点があります。たとえば、1 組の変換で 1 つの述語を使用し、別の変換では同じ述語を否定に使用することができます。
- その他の例については、マネージド型コネクターの Filter(Apache Kafka) を参照してください。
述語のプロパティ¶
Name | 説明 | 型 | デフォルト | 指定可能な値 | 重要度 |
---|---|---|---|---|---|
TopicNameMatches |
構成されている正規表現と一致するトピック名を持つレコードに対して true になる述語。 | string | 空でない文字列、有効な正規表現 | 中 | |
HasHeaderKey |
構成されている名前のヘッダーが少なくとも 1 つあるレコードに対して true になる述語。 | string | 空でない文字列 | 中 | |
RecordIsTombstone |
tombstone のレコード(null 値を持つレコード)に対して true になる述語。 | 中 |