Kafka Connect ExtractTimeStamp SMT Usage Reference for Confluent Cloud

The ExtractTimeStamp Single Message Transform (SMT) is used to extract a specific time value from a field within a Kafka record’s key or value and promote that value to the record’s official timestamp metadata.

Note

The ExtractTimeStamp SMT is available only for managed Sink connectors.

To apply the ExtractTimeStamp SMT, add the following to your connector configuration.

{
  "transforms" : "extractTimestamp",
  "transforms.extractTimestamp.type" : "com.github.jcustenborder.kafka.connect.transform.common.ExtractTimestamp$Value",
  "transforms.extractTimestamp.field.name" : "<TIMESTAMP_FIELD_NAME>"
 }

Update the <TIMESTAMP_FIELD_NAME> placeholder to the name of the field containing the record’s event time.

Examples

The example below shows how to use ExtractTimeStamp SMT:

  • Input:

    {
       "topic" : "topic",
       "kafkaPartition" : 1,
       "value" : {
         "timestamp" : 1512164613123
       },
       "timestampType" : "NO_TIMESTAMP_TYPE",
       "offset" : 1,
       "headers" : [ ]
    }
    
  • Adding the SMT to your connector configuration:

    To apply the ExtractTimeStamp SMT on timestamp field, add the following to your connector configuration:

    {
      "transforms" : "extractTimestamp",
      "transforms.extractTimestamp.type" : "com.github.jcustenborder.kafka.connect.transform.common.ExtractTimestamp$Value",
      "transforms.extractTimestamp.field.name" : "timestamp"
    }
    
  • Output:

    After the ExtractTimeStamp SMT applies, the value transforms as follows:

    {
      "topic" : "topic",
      "kafkaPartition" : 1,
      "value" : {
        "timestamp" : 1512164613123
       },
      "timestamp" : 1512164613123,
      "timestampType" : "NO_TIMESTAMP_TYPE",
      "offset" : 1,
      "headers" : [ ]
    }
    

Properties

Name

Description

Type

Default

Valid Values

Importance

field.name

Specifies the field in the record that contains the timestamp value. The field’s value must be a timestamp or a 64-bit integer (int64).

STRING

HIGH

Predicates

Transformations can be configured with predicates so that the transformation is applied only to records which satisfy a condition. You can use predicates in a transformation chain and, when combined with the Kafka Connect Filter (Kafka) SMT Usage Reference for Confluent Cloud, predicates can conditionally filter out specific records. For details and examples, see Predicates.