MessageTimestampRouter

ここでは、Confluent SMT io.confluent.connect.transforms.MessageTimestampRouter の使用方法を説明します。

説明

元のトピックの値とレコードのタイムスタンプフィールドに応じて、レコードのトピックフィールドを更新します。

トピックフィールドにより、送信先システムでの対応するエンティティ名(データベーステーブルや検索インデックス名など)が決まることが多いため、この変換はシンクコネクターで役立ちます。この SMT は、メッセージ値の指定されたフィールドからタイムスタンプを抽出します。特に、ログデータでは、タイムスタンプがメッセージのフィールドとして保存されるため、この SMT が役立ちます。メッセージ値は Map インスタンス である必要があります(Structs は現在サポートされていません)。基本的なトピックパターンおよびタイムスタンプフォーマットを指定するには、「TimestampRouter」を参照してください。

インストール

この変換は Confluent が開発したものであり、Apache Kafka® または Confluent Platform にデフォルトで同梱されるものではありません。この変換は、Confluent Hub クライアント を使用してインストールできます。

confluent-hub install confluentinc/connect-transforms:latest

以下の例では、メッセージ値から timestamptime、または ts という名前のフィールドが、message.timestamp.keys の構成で指定された順番で抽出されます。このタイムスタンプ値は、元は message.timestamp.format で指定されたフォーマットです。メッセージトピックにトピックプレフィックスが追加され、topic.timestamp.format で指定されたフォーマットでタイムスタンプが末尾に追加されます。

"transforms": "MessageTimestampRouter",
"transforms.MessageTimestampRouter.type": "io.confluent.connect.transforms.MessageTimestampRouter",
"transforms.MessageTimestampRouter.topic.format": "foo-${topic}-${timestamp}",
"transforms.MessageTimestampRouter.message.timestamp.format": "yyyy-MM-dd",
"transforms.MessageTimestampRouter.topic.timestamp.format": "yyyy.MM.dd",
"transforms.MessageTimestampRouter.message.timestamp.keys": "timestamp,time,ts"

メッセージ値 : {"time":"2019-08-06"}

トピック(変換前) : bar

トピック(変換後): foo-bar-2019.08.06

ちなみに

その他の例については、マネージド型コネクターの Message Timestamp Router を参照してください。

特徴

Name 説明 デフォルト 指定可能な値 重要度
topic.format フォーマット文字列。${topic}${timestamp} を、それぞれトピックおよびタイムスタンプのプレースホルダーとして使用できます。 string ${topic}-${timestamp}  
message.timestamp.format java.time.format.DateTimeFormatter と互換性のある、メッセージのタイムスタンプのフォーマット文字列。詳細については、「DateTimeFormatter」を参照してください。構成が指定されていない場合または空の文字列が指定されている場合は、デフォルトの ISO8601 標準 形式のタイムスタンプのフォーマット文字列が使用されます(日付は必須、時刻は省略可能)。 string ""  
topic.timestamp.format java.time.format.DateTimeFormatter と互換性のある、トピックのタイムスタンプのフォーマット文字列。詳細については、「DateTimeFormatter」を参照してください。 string yyyy.MM.dd  
message.timestamp.keys フィールド名のコンマ区切りのリスト。名前が指定されている順に、メッセージ値のタイムスタンプが検索されます。最初に見つかったフィールドからタイムスタンプが取得されます。 string    

述語

"述語" を使用することにより、一定の条件を満たすレコードのみに変換が適用されるように、変換を構成することができます。述語は変換チェーンで使用することができ、Filter(Apache Kafka) と組み合わせると、条件に基づいて特定のレコードを除外できます。詳細と例については、「述語」を参照してください。