MessageTimestampRouter

The following provides usage information for the Confluent Single Message Transformation (SMT) io.confluent.connect.transforms.MessageTimestampRouter.

Description

Update the record’s topic field as a function of the original topic value and the record’s timestamp field.

This is useful for sink connectors, because the topic field often determines the equivalent entity name in the destination system (for example, a database table or search index name). This SMT extracts the timestamp from the message value’s specified field, which is especially useful for log data in which the timestamp is stored as a field in the message. The message value must be a Map instance (Structs are not currently supported). See TimestampRouter to specify a basic topic pattern and timestamp format.

Installation

This transformation is developed by Confluent and does not ship by default with Apache Kafka® or Confluent Platform. You can install this transformation using the confluent connect plugin install command:

confluent connect plugin install confluentinc/connect-transforms:latest

Example

The following example extracts a field named timestamp, time, or ts from the message value, in the order specified by the message.timestamp.keys configuration. This timestamp value is originally in the format specified by message.timestamp.format. It adds a topic prefix and appends the timestamp of the format specified by topic.timestamp.format to the message topic.

"transforms": "MessageTimestampRouter",
"transforms.MessageTimestampRouter.type": "io.confluent.connect.transforms.MessageTimestampRouter",
"transforms.MessageTimestampRouter.topic.format": "foo-${topic}-${timestamp}",
"transforms.MessageTimestampRouter.message.timestamp.format": "yyyy-MM-dd",
"transforms.MessageTimestampRouter.topic.timestamp.format": "yyyy.MM.dd",
"transforms.MessageTimestampRouter.message.timestamp.keys": "timestamp,time,ts"

Message value: {"time":"2019-08-06"}

Topic (before): bar

Topic (after): foo-bar-2019.08.06

Tip

For additional examples, see Message Timestamp Router for managed connectors.

Properties

Name Description Type Default Valid Values Importance
topic.format Format string which can contain ${topic} and ${timestamp} as placeholders for the topic and timestamp, respectively. string ${topic}-${timestamp}   high
message.timestamp.format Format string for the message’s timestamp that is compatible with java.time.format.DateTimeFormatter. For additional details, see DateTimeFormatter. If no configuration or an empty string is provided, defaults to the format string for timestamp of ISO8601 standard, with mandatory date and optional time. string “”   low
topic.timestamp.format Format string for the topic’s timestamp that is compatible with java.time.format.DateTimeFormatter. For additional details, see DateTimeFormatter. string yyyy.MM.dd   high
message.timestamp.keys Comma-separated list of field names to look up the timestamp in the message value, in the order the names are listed. The timestamp is taken from the first found field. string     high

Predicates

Transformations can be configured with predicates so that the transformation is applied only to records which satisfy a condition. You can use predicates in a transformation chain and, when combined with the Filter (Apache Kafka), predicates can conditionally filter out specific records. For details and examples, see Predicates.