Filter(Confluent)

ここでは、Confluent SMT io.confluent.connect.transforms.Filter の使用方法を説明します。filter.condition 述語 と一致するレコードを含めるか、ドロップします。

説明

filter.condition 述語と一致するレコードを含めるか、ドロップします。

filter.condition は、処理対象の各レコードに適用される JSON パス を指定する述語です。この述語が一致すると判断された場合、レコードは包含( filter.type=include)または除外(filter.type=exclude)されます。

missing.or.null.behavior プロパティでは、フィルター条件の述語で使用されているフィールドがレコードに存在しない場合の変換の動作を指定します。デフォルトの動作は "失敗" です。このプロパティは、述語のフィールドが存在しないレコードを包含または除外するために設定することもできます。

レコードキー用(io.confluent.connect.transforms.Filter$Key)または値用(io.confluent.connect.transforms.Filter$Value)の固有の変換タイプを使用します。

インストール

この変換は Confluent が開発したものであり、Kafka または Confluent Platform にデフォルトで同梱されるものではありません。この変換は、Confluent Hub クライアント を使用してインストールできます。

confluent-hub install confluentinc/connect-transforms:latest

以下の構成スニペットは、Confluent Filter SMT の使用方法と構成方法を示しています。

filter.condition は、JSON パスを指定する述語です。JSON パスフォーマットの詳細については、https://github.com/json-path/JsonPath を参照してください。

以下の例では、フィールド 'key' にネストされたフィールド 'nestedKey''value1' または 'value2' と一致する場合に、指定されている filter.condition の条件が満たされたと判断されます。

filterExample1 では、フィルター条件を満たす including レコードの構成を定義します。フィルター条件は、レコードの value に適用されます。構成で fail の動作を指定すると、フィルター条件で使用されているフィールドがレコードに存在しない場合、例外がスローされコネクタータスクが失敗 となります。

"transforms": "filterExample1",
"transforms.filterExample1.type": "io.confluent.connect.transforms.Filter$Value",
"transforms.filterExample1.filter.condition": "$.key[?(@.nestedKey == "value1" || @.nestedKey == "value2")]",
"transforms.filterExample1.filter.type": "include", "transforms.filterExample1.missing.or.null.behavior": "fail"

filterExample2 では、フィルター条件を満たす excluding レコードの構成を定義します。フィルター条件は、レコードの key に適用されます。構成で include の動作を指定すると、フィルター条件で使用されているフィールドがレコードに存在しない場合、レコードはそのまま維持 されます。

"transforms": "filterExample2",
"transforms.filterExample2.type": "io.confluent.connect.transforms.Filter$Key", "transforms.filterExample2.filter.condition": "$.key[?(@.nestedKey == "value1" || @.nestedKey == "value2")]", "transforms.filterExample2.filter.type": "exclude", "transforms.filterExample2.missing.or.null.behavior": "include"

ちなみに

その他の例については、マネージド型コネクターの Filter(Confluent) を参照してください。

特徴

Name 説明 デフォルト 指定可能な値 重要度
filter.condition この変換の対象となるレコード、またはこの変換から除外されるレコードのマッチングに使用される条件を指定します。https://github.com/json-path/JsonPath で定義されている JSON パスの述語の表記を使用します。 string    
filter.type filter.condition 述語と一致するレコードで実行するアクションを指定します。include を使用すると、述語と一致するレコードはすべてそのまま維持され、述語の条件を満たさないレコードはすべてドロップされます。 exclude を使用すると、述語と一致するレコードがすべてドロップされます。 string   [include、exclude]
missing.or.null.behavior filter.condition で使用されているフィールドがレコードに存在しない場合の動作を指定します。fail を使用すると、例外がスローされ、コネクタータスクは失敗となります。include を使用すると、レコードはそのまま維持され、exclude を使用すると、レコードはドロップされます。 string fail [fail、include、exclude]