Important

You are viewing documentation for an older version of Confluent Platform. For the latest, click here.

MessageTimestampRouter

The following provides usage information for the Confluent SMT io.confluent.connect.transforms.MessageTimestampRouter.

Description

Update the record’s topic field as a function of the original topic value and the record’s timestamp field.

This is useful for sink connectors, because the topic field often determines the equivalent entity name in the destination system (for example, a database table or search index name). This SMT extracts the timestamp from the message’s specified field, which is especially useful for log data in which the timestamp is stored as a field in the message. See TimestampRouter to specify a basic topic pattern and timestamp format.

Installation

This transformation is developed by Confluent and does not ship by default with Apache Kafka® or Confluent Platform. You can install this transformation via the Confluent Hub Client:

confluent-hub install confluentinc/connect-transforms:latest

Example

The following example extracts a field named timestamp, time, or ts from the key, in the order specified by the message.timestamp.keys configuration. This timestamp value is originally in the format specified by message.timestamp.format. It adds a topic prefix and appends the timestamp of the format specified by topic.timestamp.format to the message topic.

"transforms": "MessageTimestampRouter",
"transforms.MessageTimestampRouter.type": "io.confluent.connect.transforms.MessageTimestampRouter",
"transforms.MessageTimestampRouter.topic.format": "foo-${topic}-${timestamp}",
"transforms.MessageTimestampRouter.message.timestamp.format": "yyyy-MM-dd",
"transforms.MessageTimestampRouter.topic.timestamp.format": "yyyy.MM.dd",
"transforms.MessageTimestampRouter.message.timestamp.keys": "timestamp,time,ts"

Message value: {"time":"2019-08-06"}

Topic (before): bar

Topic (after): foo-bar-2019.08.06

Properties

Name Description Type Default Valid Values Importance
topic.format Format string which can contain ${topic} and ${timestamp} as placeholders for the topic and timestamp, respectively. string ${topic}-${timestamp}   high
message.timestamp.format Format string for the message’s timestamp that is compatible with java.time.format.DateTimeFormatter. For additional details, see DateTimeFormatter. If no configuration or an empty string is provided, defaults to the format string for timestamp of ISO8601 standard, with mandatory date and optional time. string “”   low
topic.timestamp.format Format string for the topic’s timestamp that is compatible with java.time.format.DateTimeFormatter. For additional details, see DateTimeFormatter. string yyyy.MM.dd   high
message.timestamp.keys Comma-separated list of key names to look up the timestamp value in the message, in the order the names are listed. The timestamp is taken from the first found value. string     high