Filter(Apache Kafka)

ここでは、Apache Kafka® SMT org.apache.kafka.connect.transforms.Filter の使用方法を説明します。述語と組み合わせて使用します。

説明

すべてのレコードをドロップし、チェーンのそれ以降の変換から除外します。述語と一致する(または一致しない)レコードを条件に応じて除外するために使用します。

述語

述語を使用することにより、一定の条件を満たすレコードのみに変換が適用されるように、変換を構成できます。述語は変換チェーンで使用することができ、Apache Kafka® フィルターと組み合わせると、条件に基づいて特定のレコードを除外できます。

述語は、コネクターの構成で指定します。使用するプロパティは次のとおりです。

  • predicates: 1 つ以上の変換に適用される述語の一連のエイリアス。
  • predicates.$alias.type: 述語の完全修飾クラス名。
  • predicates.$alias.$predicateSpecificConfig: 述語の構成プロパティ。

すべての変換には、predicatenegate という暗黙的な構成プロパティがあります。変換の述語の構成に述語のエイリアスを設定することにより、述語が変換と関連付けられます。negate 構成プロパティを使用すると、述語の値を反転することができます。

Kafka Connect には以下の述語が含まれています。

  • org.apache.kafka.connect.transforms.predicates.TopicNameMatches: 特定の Java 正規表現と一致する名前を持つトピックのレコードを一致とします。
  • org.apache.kafka.connect.transforms.predicates.HasHeaderKey: 指定されたキーがヘッダーに含まれているレコードを一致とします。
  • org.apache.kafka.connect.transforms.predicates.RecordIsTombstone: tombstone レコード(null 値を持つレコード)を一致とします。

述語の例

例 1:

複数のトピックに対してレコードを生成するソースコネクターがあり、以下のことを行う必要があります。

  • foo トピックのレコードを完全に除外します。
  • フィールド名 other_field を指定して ExtractField の変換を、トピック bar を除くすべてのトピックのレコードに適用します。

これを行うには、まず、トピック foo へのレコードを除外する必要があります。Filter の変換により、それ以降の処理からレコードが除外されます。

次に、TopicNameMatches の述語を使用して、特定の正規表現と一致するトピック内のレコードのみに変換を適用します。TopicNameMatches の構成プロパティは、トピック名とのマッチングを行うためのパターンとして使用される Java 正規表現のみです。次の例は、この構成を示しています。

"transforms": "Filter",
"transforms.Filter.type": "org.apache.kafka.connect.transforms.Filter",
"transforms.Filter.predicate": "IsFoo",

"predicates": "IsFoo",
"predicates.IsFoo.type": "org.apache.kafka.connect.transforms.predicates.TopicNameMatches",
"predicates.IsFoo.pattern": "foo"

この構成を使用して、レコードのトピック名が bar でない場合にのみ ExtractField を適用します。TopicNameMatches を直接使用できないのは、この述語では、一致しないトピック名ではなく、一致するトピック名に対して変換が適用されるためです。変換の暗黙的な negate 構成プロパティを使用すると、述語と一致するレコードのセットが反転されます。この構成を追加した例を以下に示します。

"transforms": "Filter", "Extract",
"transforms.Filter.type": "org.apache.kafka.connect.transforms.Filter",
"transforms.Filter.predicate": "IsFoo",

"transforms.Extract.type": "org.apache.kafka.connect.transforms.ExtractField$Key",
"transforms.Extract.field": "other_field",
"transforms.Extract.predicate": "=IsBar",
"transforms.Extract.negate": "true",

"predicates": "IsFoo", "IsBar",
"predicates.IsFoo.type": "org.apache.kafka.connect.transforms.predicates.TopicNameMatches",
"predicates.IsFoo.pattern": "foo",

"predicates.IsBar.type": "org.apache.kafka.connect.transforms.predicates.TopicNameMatches",
"predicates.IsBar.pattern": "bar"

例 2:

以下の構成は、ExtractField 変換と negate=true 構成プロパティを使用して、変換チェーンで述語を使用する方法を示しています。

"transforms": "t2",
"transforms.t2.predicate": "has-my-prefix",
"transforms.t2.negate": "true",
"transforms.t2.type": "org.apache.kafka.connect.transforms.ExtractField$Key",
"transforms.t2.field": "c1",
"predicates": "has-my-prefix",
"predicates.has-my-prefix.type": "org.apache.kafka.connect.transforms.predicates.TopicNameMatches",
"predicates.has-my-prefix.pattern": "my-prefix-.*"

変換 t2 は、述語 has-my-prefix が false の場合(negate=true パラメーターを使用)にのみ適用されます。述語は、プレフィックス predicates.has-my-prefix を持つキーで構成されています。述語のクラスは org.apache.kafka.connect.transforms.predicates.TopicNameMatches であり、パターンパラメーターの値は my-prefix-.* です。この構成では、トピック名の先頭が my-prefix- では ない レコードのみに変換が適用されます。

ちなみに

  • 述語を変換とは切り離して定義すると、同じ述語を複数の変換に簡単に適用できるという利点があります。たとえば、1 組の変換で 1 つの述語を使用し、別の変換では同じ述語を否定に使用することができます。
  • その他の例については、マネージド型コネクターの Filter(Apache Kafka) を参照してください。

述語のプロパティ

Name 説明 デフォルト 指定可能な値 重要度
TopicNameMatches 構成されている正規表現と一致するトピック名を持つレコードに対して true になる述語。 string   空でない文字列、有効な正規表現
HasHeaderKey 構成されている名前のヘッダーが少なくとも 1 つあるレコードに対して true になる述語。 string   空でない文字列
RecordIsTombstone tombstone のレコード(null 値を持つレコード)に対して true になる述語。