Kafka Connect ExtractTimeStamp SMT Usage Reference for Confluent Cloud

The ExtractTimeStamp Single Message Transform (SMT) is used to extract a specific time value from a field within a Kafka record’s key or value and promote that value to the record’s official timestamp metadata.

Note

The ExtractTimeStamp SMT is supported on the HTTP V2 Sink , MongoDB Sink, and JDBC Sink connectors.

To apply the ExtractTimeStamp SMT, add the following to your connector configuration.

{
  "transforms" : "extractTimestamp",
  "transforms.extractTimestamp.type" : "com.github.jcustenborder.kafka.connect.transform.common.ExtractTimestamp$Value",
  "transforms.extractTimestamp.field.name" : "<TIMESTAMP_FIELD_NAME>"
 }

Update the <TIMESTAMP_FIELD_NAME> placeholder to the name of the field containing the record’s event time.

Examples

The example below shows how to use ExtractTimeStamp SMT:

  • Input:

    {
       "topic" : "topic",
       "kafkaPartition" : 1,
       "value" : {
         "timestamp" : 1512164613123
       },
       "timestampType" : "NO_TIMESTAMP_TYPE",
       "offset" : 1,
       "headers" : [ ]
    }
    
  • Adding the SMT to your connector configuration:

    To apply the ExtractTimeStamp SMT on timestamp field, add the following to your connector configuration:

    {
      "transforms" : "extractTimestamp",
      "transforms.extractTimestamp.type" : "com.github.jcustenborder.kafka.connect.transform.common.ExtractTimestamp$Value",
      "transforms.extractTimestamp.field.name" : "timestamp"
    }
    
  • Output:

    After the ExtractTimeStamp SMT applies, the value transforms as follows:

    {
      "topic" : "topic",
      "kafkaPartition" : 1,
      "value" : {
        "timestamp" : 1512164613123
       },
      "timestamp" : 1512164613123,
      "timestampType" : "NO_TIMESTAMP_TYPE",
      "offset" : 1,
      "headers" : [ ]
    }
    

Properties

Name

Description

Type

Default

Valid Values

Importance

field.name

Specifies the field in the record that contains the timestamp value. The field’s value must be a timestamp or a 64-bit integer (int64).

STRING

HIGH

Predicates

Transformations can be configured with predicates so that the transformation is applied only to records which satisfy a condition. You can use predicates in a transformation chain and, when combined with the Kafka Connect Filter (Kafka) SMT Usage Reference for Confluent Cloud, predicates can conditionally filter out specific records. For details and examples, see Predicates.