Kafka Connect ExtractTimeStamp SMT Usage Reference for Confluent Cloud
The ExtractTimeStamp Single Message Transform (SMT) is used to extract a specific time value from a field within a Kafka record’s key or value and promote that value to the record’s official timestamp metadata.
Note
The ExtractTimeStamp SMT is supported on the HTTP V2 Sink , MongoDB Sink, and JDBC Sink connectors.
To apply the ExtractTimeStamp SMT, add the following to your connector configuration.
{
"transforms" : "extractTimestamp",
"transforms.extractTimestamp.type" : "com.github.jcustenborder.kafka.connect.transform.common.ExtractTimestamp$Value",
"transforms.extractTimestamp.field.name" : "<TIMESTAMP_FIELD_NAME>"
}
Update the <TIMESTAMP_FIELD_NAME> placeholder to the name of the field containing the record’s event time.
Examples
The example below shows how to use ExtractTimeStamp SMT:
Input:
{ "topic" : "topic", "kafkaPartition" : 1, "value" : { "timestamp" : 1512164613123 }, "timestampType" : "NO_TIMESTAMP_TYPE", "offset" : 1, "headers" : [ ] }
Adding the SMT to your connector configuration:
To apply the
ExtractTimeStampSMT ontimestampfield, add the following to your connector configuration:{ "transforms" : "extractTimestamp", "transforms.extractTimestamp.type" : "com.github.jcustenborder.kafka.connect.transform.common.ExtractTimestamp$Value", "transforms.extractTimestamp.field.name" : "timestamp" }
Output:
After the
ExtractTimeStampSMT applies, the value transforms as follows:{ "topic" : "topic", "kafkaPartition" : 1, "value" : { "timestamp" : 1512164613123 }, "timestamp" : 1512164613123, "timestampType" : "NO_TIMESTAMP_TYPE", "offset" : 1, "headers" : [ ] }
Properties
Name | Description | Type | Default | Valid Values | Importance |
|---|---|---|---|---|---|
| Specifies the field in the record that contains the timestamp value. The field’s value must be a timestamp or a 64-bit integer (int64). | STRING | HIGH |
Predicates
Transformations can be configured with predicates so that the transformation is applied only to records which satisfy a condition. You can use predicates in a transformation chain and, when combined with the Kafka Connect Filter (Kafka) SMT Usage Reference for Confluent Cloud, predicates can conditionally filter out specific records. For details and examples, see Predicates.