Kafka Connect ExtractTimeStamp SMT Usage Reference for Confluent Cloud
The ExtractTimeStamp Single Message Transform (SMT) is used to extract a specific time value from a field within a Kafka record’s key or value and promote that value to the record’s official timestamp metadata.
Note
The ExtractTimeStamp SMT is available only for managed Sink connectors.
To apply the ExtractTimeStamp SMT, add the following to your connector configuration.
{
"transforms" : "extractTimestamp",
"transforms.extractTimestamp.type" : "com.github.jcustenborder.kafka.connect.transform.common.ExtractTimestamp$Value",
"transforms.extractTimestamp.field.name" : "<TIMESTAMP_FIELD_NAME>"
}
Update the <TIMESTAMP_FIELD_NAME> placeholder to the name of the field containing the record’s event time.
Examples
The example below shows how to use ExtractTimeStamp SMT:
Input:
{ "topic" : "topic", "kafkaPartition" : 1, "value" : { "timestamp" : 1512164613123 }, "timestampType" : "NO_TIMESTAMP_TYPE", "offset" : 1, "headers" : [ ] }
Adding the SMT to your connector configuration:
To apply the
ExtractTimeStampSMT ontimestampfield, add the following to your connector configuration:{ "transforms" : "extractTimestamp", "transforms.extractTimestamp.type" : "com.github.jcustenborder.kafka.connect.transform.common.ExtractTimestamp$Value", "transforms.extractTimestamp.field.name" : "timestamp" }
Output:
After the
ExtractTimeStampSMT applies, the value transforms as follows:{ "topic" : "topic", "kafkaPartition" : 1, "value" : { "timestamp" : 1512164613123 }, "timestamp" : 1512164613123, "timestampType" : "NO_TIMESTAMP_TYPE", "offset" : 1, "headers" : [ ] }
Properties
Name | Description | Type | Default | Valid Values | Importance |
|---|---|---|---|---|---|
| Specifies the field in the record that contains the timestamp value. The field’s value must be a timestamp or a 64-bit integer (int64). | STRING | HIGH |
Predicates
Transformations can be configured with predicates so that the transformation is applied only to records which satisfy a condition. You can use predicates in a transformation chain and, when combined with the Kafka Connect Filter (Kafka) SMT Usage Reference for Confluent Cloud, predicates can conditionally filter out specific records. For details and examples, see Predicates.