How to Use a Custom Timestamp Column in ksqlDB for Confluent Platform¶
Context¶
You have events that have a timestamp attribute. You want to do time-related processing over them and want ksqlDB to use those timestamps during processing. Because ksqlDB defaults to using the timestamp metadata of the underlying Kafka records, you need to tell ksqlDB where to find the timestamp attribute within the events. This is called using event-time.
In action¶
CREATE STREAM s1 (
k VARCHAR KEY,
ts VARCHAR,
v1 INT,
v2 VARCHAR
) WITH (
kafka_topic = 's1',
partitions = 1,
value_format = 'avro',
timestamp = 'ts', -- the column to use as a timestamp
timestamp_format = 'yyyy-MM-dd HH:mm:ss' -- the format to parse the timestamp
);
Using event-time¶
Using event-time allows ksqlDB to handle out-of-order events during
time-related processing. Set the timestamp
property when creating a
stream or table to denote which column to use as the timestamp. If the
timestamp column is a string, also set the timestamp_format
property
to tell ksqlDB how to parse it.
Begin by telling ksqlDB to start all queries from the earliest point in each topic.
SET 'auto.offset.reset' = 'earliest';
Create a stream s1
that has a timestamp column, ts
. Notice that
the timestamp
property hasn’t been set yet. This will make it easier
to see how the functionality behaves later in this guide.
CREATE STREAM s1 (
k VARCHAR KEY,
ts VARCHAR,
v1 INT,
v2 VARCHAR
) WITH (
kafka_topic = 's1',
partitions = 1,
value_format = 'avro'
);
Insert some rows into s1
, setting the ts
column to dates that
are not “now”.
INSERT INTO s1 (
k, ts, v1, v2
) VALUES (
'k1', '2020-05-04 01:00:00', 0, 'a'
);
INSERT INTO s1 (
k, ts, v1, v2
) VALUES (
'k2', '2020-05-04 02:00:00', 1, 'b'
);
Query the stream for its columns, including ROWTIME
. ROWTIME
is
a system-column that ksqlDB reserves to track the timestamp of the
event.
SELECT k,
ROWTIME,
TIMESTAMPTOSTRING(ROWTIME, 'yyyy-MM-dd HH:mm:ss.SSS') AS rowtime_formatted,
ts,
v1,
v2
FROM s1
EMIT CHANGES;
Your results should look similar to what is below with the exception of
ROWTIME
and ROWTIME_FORMATTED
, which will mirror your wall
clock. Because you didn’t yet instruct ksqlDB to use event-time,
ROWTIME
is inherited from the underlying Kafka record. Kafka’s
default is to set the timestamp at which the record was produced to the
topic.
+------------------------------------+------------------------------------+------------------------------------+------------------------------------+------------------------------------+------------------------------------+
|K |ROWTIME |ROWTIME_FORMATTED |TS |V1 |V2 |
+------------------------------------+------------------------------------+------------------------------------+------------------------------------+------------------------------------+------------------------------------+
|k1 |1589564380616 |2020-05-15 17:39:40.616 |2020-05-04 01:00:00 |0 |a |
|k2 |1589564380731 |2020-05-15 17:39:40.731 |2020-05-04 02:00:00 |1 |b |
Derive a new stream, s2
, from s1
and tell ksqlDB to use
event-time. Set the timestamp
property to the ts
column.
CREATE STREAM S2 WITH (
timestamp = 'ts',
timestamp_format = 'yyyy-MM-dd HH:mm:ss'
) AS
SELECT *
FROM s1
EMIT CHANGES;
Now compare the timestamps again. This time, notice that ROWTIME
has
been set to the same value as ts
. s2
is now using event-time.
SELECT k,
ROWTIME,
TIMESTAMPTOSTRING(ROWTIME, 'yyyy-MM-dd HH:mm:ss.SSS') AS rowtime_formatted,
ts,
v1,
v2
FROM s2
EMIT CHANGES;
The query should return the following results.
+------------------------------------+------------------------------------+------------------------------------+------------------------------------+------------------------------------+------------------------------------+
|K |ROWTIME |ROWTIME_FORMATTED |TS |V1 |V2 |
+------------------------------------+------------------------------------+------------------------------------+------------------------------------+------------------------------------+------------------------------------+
|k1 |1588554000000 |2020-05-04 01:00:00.000 |2020-05-04 01:00:00 |0 |a |
|k2 |1588557600000 |2020-05-04 02:00:00.000 |2020-05-04 02:00:00 |1 |b |
Any new streams or tables derived from s2
will continue to have
their timestamp set to ts
unless an operation instructs otherwise.
Timestamps on base streams/tables¶
Not only can you change the timestamp to use as you derive new streams
and tables, you can also set it on base ones, too. Simply set the
timestamp
and timestamp_format
properties on the WITH
clause.
CREATE STREAM s3 (
k VARCHAR KEY,
ts VARCHAR,
v1 INT
) WITH (
kafka_topic = 's3',
partitions = 1,
value_format = 'avro',
timestamp = 'ts',
timestamp_format = 'yyyy-MM-dd HH:mm:ss'
);
Note that the underlying timestamp metadata for the Kafka records in
topic s3
are not modified. ksqlDB has merely marked that any
derived streams or tables from s3
should use the value of ts
for
ROWTIME
.
Timestamps as long values¶
You can use timestamps that are represented as milliseconds since the Unix epoch, too.
Create a stream s4
with a timestamp column of type BIGINT
.
Because the timestamp is a number, ksqlDB doesn’t need to know how to
parse its timestamp format — it can interpret it directly as
milliseconds since the Unix epoch. This means you can omit the
timestamp_format
property.
CREATE STREAM s4 (
k VARCHAR KEY,
ts BIGINT,
v1 INT,
v2 VARCHAR
) WITH (
kafka_topic = 's4',
partitions = 1,
value_format = 'avro',
timestamp = 'ts'
);
Insert some rows with millisecond timestamps.
INSERT INTO s4 (
k, ts, v1, v2
) VALUES (
'k1', 1562634000000, 0, 'a'
);
INSERT INTO s4 (
k, ts, v1, v2
) VALUES (
'k2', 1588509000000, 1, 'b'
);
INSERT INTO s4 (
k, ts, v1, v2
) VALUES (
'k3', 1588736700000, 2, 'c'
);
And run a similar query as above. Remember to set auto.offset.reset
to earliest
if you haven’t yet.
SELECT k,
ROWTIME,
ts,
TIMESTAMPTOSTRING(ROWTIME, 'yyyy-MM-dd HH:mm:ss.SSS') AS rowtime_formatted,
TIMESTAMPTOSTRING(ts, 'yyyy-MM-dd HH:mm:ss.SSS') AS ts_formatted,
v1,
v2
FROM s4
EMIT CHANGES;
The query should return the following results.
+------------------------------+------------------------------+------------------------------+------------------------------+------------------------------+------------------------------+------------------------------+
|K |ROWTIME |TS |ROWTIME_FORMATTED |TS_FORMATTED |V1 |V2 |
+------------------------------+------------------------------+------------------------------+------------------------------+------------------------------+------------------------------+------------------------------+
|k1 |1562634000000 |1562634000000 |2019-07-09 01:00:00.000 |2019-07-09 01:00:00.000 |0 |a |
|k2 |1588509000000 |1588509000000 |2020-05-03 12:30:00.000 |2020-05-03 12:30:00.000 |1 |b |
|k3 |1588736700000 |1588736700000 |2020-05-06 03:45:00.000 |2020-05-06 03:45:00.000 |2 |c |
Timestamps represented by TIMESTAMP columns¶
Similar to the previous section, you can also use columns of type
TIMESTAMP
. These columns require data to be in
yyyy-mm-ddThh:mm:ss[.S]
format, so there is no need to provide the
timestamp_format
property.
Create a stream s5
with a column ts
of type TIMESTAMP
.
CREATE STREAM s5 (
k VARCHAR KEY,
ts TIMESTAMP,
v1 INT,
v2 VARCHAR
) WITH (
kafka_topic = 's5',
partitions = 1,
value_format = 'avro',
timestamp = 'ts'
);
Insert some rows with timestamps.
INSERT INTO s5 (
k, ts, v1, v2
) VALUES (
'k1', '2019-07-09T01:00', 0, 'a'
);
INSERT INTO s5 (
k, ts, v1, v2
) VALUES (
'k2', '2020-05-03T12:30:00', 1, 'b'
);
INSERT INTO s5 (
k, ts, v1, v2
) VALUES (
'k3', '2020-05-06T03:45', 2, 'c'
);
And run the following query. Remember to set auto.offset.reset
to
earliest
if you haven’t yet.
SELECT k,
ROWTIME,
ts,
v1,
v2
FROM s5
EMIT CHANGES;
The query should return the following results.
+------------------------------+------------------------------+------------------------------+------------------------------+------------------------------+
|K |ROWTIME |TS |V1 |V2 |
+------------------------------+------------------------------+------------------------------+------------------------------+------------------------------+
|k1 |1562634000000 |2019-07-09T01:00:00.000 |0 |a |
|k2 |1588509000000 |2020-05-03T12:30:00.000 |1 |b |
|k3 |1588736700000 |2020-05-06T03:45:00.000 |2 |c |