Timezone Types in Confluent Cloud for Apache Flink¶
Confluent Cloud for Apache Flink®️ provides rich data types for date and time, including these:
These datetime types and the related datetime functions enable processing business data across timezones.
TIMESTAMP vs TIMESTAMP_LTZ¶
TIMESTAMP type¶
TIMESTAMP(p)
is an abbreviation forTIMESTAMP(p) WITHOUT TIME ZONE
. The precisionp
supports a range from 0 to 9. The default is 6.TIMESTAMP
describes a timestamp that represents year, month, day, hour, minute, second, and fractional seconds.TIMESTAMP
can be specified from a string literal. The following code example shows a SELECT statement that creates a timestamp from a string.SELECT TIMESTAMP '1970-01-01 00:00:04.001';
Your output should resemble:
+----+-------------------------+ | op | EXPR$0 | +----+-------------------------+ | +I | 1970-01-01 00:00:04.001 | +----+-------------------------+ Received a total of 1 row
TIMESTAMP_LTZ type¶
TIMESTAMP_LTZ(p)
is an abbreviation forTIMESTAMP(p) WITH LOCAL TIME ZONE
. The precisionp
supports a range from 0* to 9. The default is 6.TIMESTAMP_LTZ
describes an absolute time point on the time-line. It stores a LONG value representing epoch-milliseconds and an INT representing nanosecond-of-millisecond. The epoch time is measured from the standard Java epoch of1970-01-01T00:00:00Z
. Every datum ofTIMESTAMP_LTZ
type is interpreted in the local timezone configured in the current session. Typically, the local timezone is used for computation and visualization.TIMESTAMP_LTZ
can be used in cross timezones business because the absolute time point. for example, 4001 milliseconds describes a same instantaneous point in different timezones. If the local system time of all machines in the world returns same value, for example, 4001 milliseconds, this is the meaning of “absolute time point”.TIMESTAMP_LTZ
has no literal representation, so you can’t create it from a literal. It can be derived from a LONG epoch time, as shown in the folllowing code example.CREATE VIEW T1 AS SELECT TO_TIMESTAMP_LTZ(4001, 3); SET 'table.local-time-zone' = 'UTC'; SELECT * FROM T1; +---------------------------+ | TO_TIMESTAMP_LTZ(4001, 3) | +---------------------------+ | 1970-01-01 00:00:04.001 | +---------------------------+
Your output should resemble:
+----+-------------------------+ | op | EXPR$0 | +----+-------------------------+ | +I | 1970-01-01 00:00:04.001 | +----+-------------------------+ Received a total of 1 row
Change the timezone and query the time again:
SET 'table.local-time-zone' = 'Asia/Shanghai'; SELECT * FROM T1;
Your output should resemble:
+----+-------------------------+ | op | EXPR$0 | +----+-------------------------+ | +I | 1970-01-01 08:00:04.001 | +----+-------------------------+ Received a total of 1 row
Set the timezone¶
The local timezone defines the current session timezone id. You can configure the timezone in the Flink SQL shell or in your applications.
-- set to UTC timezone SET 'table.local-time-zone' = 'UTC'; -- set to Shanghai timezone SET 'table.local-time-zone' = 'Asia/Shanghai'; -- set to Los_Angeles timezone SET 'table.local-time-zone' = 'America/Los_Angeles';
Datetime functions and timezones¶
The return values of the following datetime functions depend on the configured timezone.
The following example code shows the return types of these datetime functions.
SET 'sql-client.execution.result-mode' = 'tableau';
CREATE VIEW timeview AS SELECT
LOCALTIME,
LOCALTIMESTAMP,
CURRENT_DATE,
CURRENT_TIME,
CURRENT_TIMESTAMP,
CURRENT_ROW_TIMESTAMP(),
NOW(),
PROCTIME();
DESC timeview;
Your output should resemble:
+-------------------+-----------------------------+-------+-----+--------+-----------+
| name | type | null | key | extras | watermark |
+-------------------+-----------------------------+-------+-----+--------+-----------+
| LOCALTIME | TIME(0) | FALSE | | | |
| LOCALTIMESTAMP | TIMESTAMP(3) | FALSE | | | |
| CURRENT_DATE | DATE | FALSE | | | |
| CURRENT_TIME | TIME(0) | FALSE | | | |
| CURRENT_TIMESTAMP | TIMESTAMP_LTZ(3) | FALSE | | | |
| EXPR$5 | TIMESTAMP_LTZ(3) | FALSE | | | |
| EXPR$6 | TIMESTAMP_LTZ(3) | FALSE | | | |
| EXPR$7 | TIMESTAMP_LTZ(3) *PROCTIME* | FALSE | | | |
+-------------------+-----------------------------+-------+-----+--------+-----------+
8 rows in set
Set the timezone to UTC and and query the view.
SET 'table.local-time-zone' = 'UTC';
SELECT * FROM timeview;
Your output should resemble:
+----+-----------+-------------------------+--------------+--------------+-------------------------+-------------------------+-------------------------+-------------------------+
| op | LOCALTIME | LOCALTIMESTAMP | CURRENT_DATE | CURRENT_TIME | CURRENT_TIMESTAMP | EXPR$5 | EXPR$6 | EXPR$7 |
+----+-----------+-------------------------+--------------+--------------+-------------------------+-------------------------+-------------------------+-------------------------+
| +I | 20:03:22 | 2023-03-20 20:03:22.090 | 2023-03-20 | 20:03:22 | 2023-03-20 20:03:22.090 | 2023-03-20 20:03:22.090 | 2023-03-20 20:03:22.090 | 2023-03-20 20:03:22.100 |
+----+-----------+-------------------------+--------------+--------------+-------------------------+-------------------------+-------------------------+-------------------------+
Received a total of 1 row
Change the timezone and query the view again.
SET 'table.local-time-zone' = 'Asia/Shanghai';
SELECT * FROM timeview;
Your output should resemble:
+----+-----------+-------------------------+--------------+--------------+-------------------------+-------------------------+-------------------------+-------------------------+
| op | LOCALTIME | LOCALTIMESTAMP | CURRENT_DATE | CURRENT_TIME | CURRENT_TIMESTAMP | EXPR$5 | EXPR$6 | EXPR$7 |
+----+-----------+-------------------------+--------------+--------------+-------------------------+-------------------------+-------------------------+-------------------------+
| +I | 04:03:58 | 2023-03-21 04:03:58.955 | 2023-03-21 | 04:03:58 | 2023-03-21 04:03:58.955 | 2023-03-21 04:03:58.955 | 2023-03-21 04:03:58.955 | 2023-03-21 04:03:58.965 |
+----+-----------+-------------------------+--------------+--------------+-------------------------+-------------------------+-------------------------+-------------------------+
Received a total of 1 row
TIMESTAMP_LTZ string representation¶
The session timezone is used when represents a TIMESTAMP_LTZ
value
to string format, i.e print the value, cast the value to STRING
type, cast the value to TIMESTAMP
, cast a TIMESTAMP
value to
TIMESTAMP_LTZ
:
CREATE VIEW timeview2 AS SELECT
TO_TIMESTAMP_LTZ(4001, 3) AS ltz,
TIMESTAMP '1970-01-01 00:00:01.001' AS ntz;
DESC timeview2;
+------+------------------+-------+-----+--------+-----------+
| name | type | null | key | extras | watermark |
+------+------------------+-------+-----+--------+-----------+
| ltz | TIMESTAMP_LTZ(3) | TRUE | | | |
| ntz | TIMESTAMP(3) | FALSE | | | |
+------+------------------+-------+-----+--------+-----------+
2 rows in set
SET 'table.local-time-zone' = 'UTC';
SELECT * FROM timeview2;
+----+-------------------------+-------------------------+
| op | ltz | ntz |
+----+-------------------------+-------------------------+
| +I | 1970-01-01 00:00:04.001 | 1970-01-01 00:00:01.001 |
+----+-------------------------+-------------------------+
Received a total of 1 row
SET 'table.local-time-zone' = 'Asia/Shanghai';
SELECT * FROM timeview2;
+----+-------------------------+-------------------------+
| op | ltz | ntz |
+----+-------------------------+-------------------------+
| +I | 1970-01-01 08:00:04.001 | 1970-01-01 00:00:01.001 |
+----+-------------------------+-------------------------+
Received a total of 1 row
CREATE VIEW timeview3 AS SELECT ltz,
CAST(ltz AS TIMESTAMP(3)),
CAST(ltz AS STRING),
ntz,
CAST(ntz AS TIMESTAMP_LTZ(3)) FROM timeview2;
DESC timeview3;
+--------+------------------+-------+-----+--------+-----------+
| name | type | null | key | extras | watermark |
+--------+------------------+-------+-----+--------+-----------+
| ltz | TIMESTAMP_LTZ(3) | TRUE | | | |
| EXPR$1 | TIMESTAMP(3) | TRUE | | | |
| EXPR$2 | STRING | TRUE | | | |
| ntz | TIMESTAMP(3) | FALSE | | | |
| EXPR$4 | TIMESTAMP_LTZ(3) | FALSE | | | |
+--------+------------------+-------+-----+--------+-----------+
5 rows in set
SELECT * FROM timeview3;
+----+-------------------------+-------------------------+--------------------------------+-------------------------+-------------------------+
| op | ltz | EXPR$1 | EXPR$2 | ntz | EXPR$4 |
+----+-------------------------+-------------------------+--------------------------------+-------------------------+-------------------------+
| +I | 1970-01-01 08:00:04.001 | 1970-01-01 08:00:04.001 | 1970-01-01 08:00:04.001 | 1970-01-01 00:00:01.001 | 1970-01-01 00:00:01.001 |
+----+-------------------------+-------------------------+--------------------------------+-------------------------+-------------------------+
Received a total of 1 row
Time attribute and timezone¶
For more information about time attributes, see Time Attributes.
Processing time and time zone¶
Flink SQL defines the process-time attribute with the PROCTIME
function. The PROCTIME
function return type is TIMESTAMP_LTZ
.
Note
In Flink versions earlier than 1.13, the return type of PROCTIME
is TIMESTAMP
, and the return value is the TIMESTAMP
in the UTC
timezone. For example, if the wall-clock time in Shanghai is 2021-03-01 12:00:00
,
the PROCTIME()
function returns 2021-03-01 04:00:00
. Flink
1.13 fixes this issue and uses TIMESTAMP_LTZ
type as the PROCTIME()
return type, so you don’t need to correct the timezone manually.
The PROCTIME
function always represents your local timestamp value by using
the TIMESTAMP_LTZ type, which also supports DayLight Saving Time.
SET 'table.local-time-zone' = 'UTC';
SELECT PROCTIME();
Your output should resemble:
+----+-------------------------+
| op | EXPR$0 |
+----+-------------------------+
| +I | 2023-03-20 21:59:18.380 |
+----+-------------------------+
Received a total of 1 row
Change the local time zone and run the PROCTIME
function again.
SET 'table.local-time-zone' = 'Asia/Shanghai';
SELECT PROCTIME();
Your output should resemble:
+----+-------------------------+
| op | EXPR$0 |
+----+-------------------------+
| +I | 2023-03-21 06:00:32.202 |
+----+-------------------------+
Received a total of 1 row
CREATE TABLE ProcessTimeTable (
item STRING,
price DOUBLE,
proctime as PROCTIME()
) WITH (
'connector' = 'socket',
'hostname' = '127.0.0.1',
'port' = '9999',
'format' = 'csv'
);
CREATE VIEW timeview3 AS
SELECT
TUMBLE_START(proctime, INTERVAL '10' MINUTES) AS window_start,
TUMBLE_END(proctime, INTERVAL '10' MINUTES) AS window_end,
TUMBLE_PROCTIME(proctime, INTERVAL '10' MINUTES) as window_proctime,
item,
MAX(price) as max_price
FROM ProcessTimeTable
GROUP BY TUMBLE(proctime, INTERVAL '10' MINUTES), item;
DESC timeview3;
+-----------------+-----------------------------+-------+-----+--------+-----------+
| name | type | null | key | extras | watermark |
+-----------------+-----------------------------+-------+-----+--------+-----------+
| window_start | TIMESTAMP(3) | FALSE | | | |
| window_end | TIMESTAMP(3) | FALSE | | | |
| window_proctime | TIMESTAMP_LTZ(3) *PROCTIME* | FALSE | | | |
| item | STRING | TRUE | | | |
| max_price | DOUBLE | TRUE | | | |
+-----------------+-----------------------------+-------+-----+--------+-----------+
5 rows in set
Use the following command to ingest data for ProcessTimeTable
in a terminal:
> nc -lk 9999
A,1.1
B,1.2
A,1.8
B,2.5
C,3.8
SET 'table.local-time-zone' = 'UTC';
SELECT * FROM timeview3;
+-------------------------+-------------------------+-------------------------+------+-----------+
| window_start | window_end | window_procime | item | max_price |
+-------------------------+-------------------------+-------------------------+------+-----------+
| 2021-04-15 14:00:00.000 | 2021-04-15 14:10:00.000 | 2021-04-15 14:10:00.005 | A | 1.8 |
| 2021-04-15 14:00:00.000 | 2021-04-15 14:10:00.000 | 2021-04-15 14:10:00.007 | B | 2.5 |
| 2021-04-15 14:00:00.000 | 2021-04-15 14:10:00.000 | 2021-04-15 14:10:00.007 | C | 3.8 |
+-------------------------+-------------------------+-------------------------+------+-----------+
SET 'table.local-time-zone' = 'Asia/Shanghai';
SELECT * FROM MyView3;
Returns different window start, window end, and window proctime values, compared the calculation in the UTC timezone.
+-------------------------+-------------------------+-------------------------+------+-----------+
| window_start | window_end | window_procime | item | max_price |
+-------------------------+-------------------------+-------------------------+------+-----------+
| 2021-04-15 22:00:00.000 | 2021-04-15 22:10:00.000 | 2021-04-15 22:10:00.005 | A | 1.8 |
| 2021-04-15 22:00:00.000 | 2021-04-15 22:10:00.000 | 2021-04-15 22:10:00.007 | B | 2.5 |
| 2021-04-15 22:00:00.000 | 2021-04-15 22:10:00.000 | 2021-04-15 22:10:00.007 | C | 3.8 |
+-------------------------+-------------------------+-------------------------+------+-----------+
Note
A processing-time window is non-deterministic, so each run gets different windows and different aggregations. The previous example shows only how timezone affects a processing-time window.
Event time and timezone¶
Flink SQL supports defining an event-time attribute on TIMESTAMP and TIMESTAMP_LTZ columns.
Event-time attribute on TIMESTAMP¶
If the timestamp data in the source is represented as
year-month-day-hour-minute-second, usually a string value without
timezone information, for example, 2020-04-15 20:13:40.564
, you can
define the event-time attribute as a TIMESTAMP
column.
The following code example shows how to create a view with windows based on a
table’s TIMESTAMP
column.
CREATE TABLE EventTimeTable (
item STRING,
price DOUBLE,
ts TIMESTAMP(3), -- TIMESTAMP data type
WATERMARK FOR ts AS ts - INTERVAL '10' SECOND
) WITH (
'connector' = 'socket',
'hostname' = '127.0.0.1',
'port' = '9999',
'format' = 'csv'
);
CREATE VIEW timeview4 AS
SELECT
TUMBLE_START(ts, INTERVAL '10' MINUTES) AS window_start,
TUMBLE_END(ts, INTERVAL '10' MINUTES) AS window_end,
TUMBLE_ROWTIME(ts, INTERVAL '10' MINUTES) as window_rowtime,
item,
MAX(price) as max_price
FROM EventTimeTable
GROUP BY TUMBLE(ts, INTERVAL '10' MINUTES), item;
DESC timeview4;
+----------------+------------------------+------+-----+--------+-----------+
| name | type | null | key | extras | watermark |
+----------------+------------------------+------+-----+--------+-----------+
| window_start | TIMESTAMP(3) | true | | | |
| window_end | TIMESTAMP(3) | true | | | |
| window_rowtime | TIMESTAMP(3) *ROWTIME* | true | | | |
| item | STRING | true | | | |
| max_price | DOUBLE | true | | | |
+----------------+------------------------+------+-----+--------+-----------+
Use the following command to ingest data for EventTimeTable
in a terminal:
> nc -lk 9999
A,1.1,2021-04-15 14:01:00
B,1.2,2021-04-15 14:02:00
A,1.8,2021-04-15 14:03:00
B,2.5,2021-04-15 14:04:00
C,3.8,2021-04-15 14:05:00
C,3.8,2021-04-15 14:11:00
SET 'table.local-time-zone' = 'UTC';
SELECT * FROM timeview4;
+-------------------------+-------------------------+-------------------------+------+-----------+
| window_start | window_end | window_rowtime | item | max_price |
+-------------------------+-------------------------+-------------------------+------+-----------+
| 2021-04-15 14:00:00.000 | 2021-04-15 14:10:00.000 | 2021-04-15 14:09:59.999 | A | 1.8 |
| 2021-04-15 14:00:00.000 | 2021-04-15 14:10:00.000 | 2021-04-15 14:09:59.999 | B | 2.5 |
| 2021-04-15 14:00:00.000 | 2021-04-15 14:10:00.000 | 2021-04-15 14:09:59.999 | C | 3.8 |
+-------------------------+-------------------------+-------------------------+------+-----------+
SET 'table.local-time-zone' = 'Asia/Shanghai';
SELECT * FROM timeview4;
Returns the same window start, window end and window rowtime compared to calculation in UTC timezone.
+-------------------------+-------------------------+-------------------------+------+-----------+
| window_start | window_end | window_rowtime | item | max_price |
+-------------------------+-------------------------+-------------------------+------+-----------+
| 2021-04-15 14:00:00.000 | 2021-04-15 14:10:00.000 | 2021-04-15 14:09:59.999 | A | 1.8 |
| 2021-04-15 14:00:00.000 | 2021-04-15 14:10:00.000 | 2021-04-15 14:09:59.999 | B | 2.5 |
| 2021-04-15 14:00:00.000 | 2021-04-15 14:10:00.000 | 2021-04-15 14:09:59.999 | C | 3.8 |
+-------------------------+-------------------------+-------------------------+------+-----------+
Event-time attribute on TIMESTAMP_LTZ¶
If the timestamp data in the source is represented as a epoch time,
usually as a LONG value, for example, 1618989564564
, you can define
an event-time attribute as a TIMESTAMP_LTZ
column.
CREATE TABLE EventTimeTable2 (
item STRING,
price DOUBLE,
ts BIGINT, -- long time value in epoch milliseconds
ts_ltz AS TO_TIMESTAMP_LTZ(ts, 3),
WATERMARK FOR ts_ltz AS ts_ltz - INTERVAL '10' SECOND
) WITH (
'connector' = 'socket',
'hostname' = '127.0.0.1',
'port' = '9999',
'format' = 'csv'
);
CREATE VIEW timeview5 AS
SELECT
TUMBLE_START(ts_ltz, INTERVAL '10' MINUTES) AS window_start,
TUMBLE_END(ts_ltz, INTERVAL '10' MINUTES) AS window_end,
TUMBLE_ROWTIME(ts_ltz, INTERVAL '10' MINUTES) as window_rowtime,
item,
MAX(price) as max_price
FROM EventTimeTable2
GROUP BY TUMBLE(ts_ltz, INTERVAL '10' MINUTES), item;
DESC timeview5;
+----------------+----------------------------+-------+-----+--------+-----------+
| name | type | null | key | extras | watermark |
+----------------+----------------------------+-------+-----+--------+-----------+
| window_start | TIMESTAMP(3) | false | | | |
| window_end | TIMESTAMP(3) | false | | | |
| window_rowtime | TIMESTAMP_LTZ(3) *ROWTIME* | true | | | |
| item | STRING | true | | | |
| max_price | DOUBLE | true | | | |
+----------------+----------------------------+-------+-----+--------+-----------+
The input data of EventTimeTable2 is:
A,1.1,1618495260000 # The corresponding utc timestamp is 2021-04-15 14:01:00
B,1.2,1618495320000 # The corresponding utc timestamp is 2021-04-15 14:02:00
A,1.8,1618495380000 # The corresponding utc timestamp is 2021-04-15 14:03:00
B,2.5,1618495440000 # The corresponding utc timestamp is 2021-04-15 14:04:00
C,3.8,1618495500000 # The corresponding utc timestamp is 2021-04-15 14:05:00
C,3.8,1618495860000 # The corresponding utc timestamp is 2021-04-15 14:11:00
SET 'table.local-time-zone' = 'UTC';
SELECT * FROM timeview5;
+-------------------------+-------------------------+-------------------------+------+-----------+
| window_start | window_end | window_rowtime | item | max_price |
+-------------------------+-------------------------+-------------------------+------+-----------+
| 2021-04-15 14:00:00.000 | 2021-04-15 14:10:00.000 | 2021-04-15 14:09:59.999 | A | 1.8 |
| 2021-04-15 14:00:00.000 | 2021-04-15 14:10:00.000 | 2021-04-15 14:09:59.999 | B | 2.5 |
| 2021-04-15 14:00:00.000 | 2021-04-15 14:10:00.000 | 2021-04-15 14:09:59.999 | C | 3.8 |
+-------------------------+-------------------------+-------------------------+------+-----------+
SET 'table.local-time-zone' = 'Asia/Shanghai';
SELECT * FROM timeview5;
Returns the different window start, window end and window rowtime compared to calculation in UTC timezone.
+-------------------------+-------------------------+-------------------------+------+-----------+
| window_start | window_end | window_rowtime | item | max_price |
+-------------------------+-------------------------+-------------------------+------+-----------+
| 2021-04-15 22:00:00.000 | 2021-04-15 22:10:00.000 | 2021-04-15 22:09:59.999 | A | 1.8 |
| 2021-04-15 22:00:00.000 | 2021-04-15 22:10:00.000 | 2021-04-15 22:09:59.999 | B | 2.5 |
| 2021-04-15 22:00:00.000 | 2021-04-15 22:10:00.000 | 2021-04-15 22:09:59.999 | C | 3.8 |
+-------------------------+-------------------------+-------------------------+------+-----------+
Daylight Saving Time support¶
Flink SQL supports defining time attributes on a TIMESTAMP_LTZ column, and Flink SQL uses the TIMESTAMP and TIMESTAMP_LTZ types in window processing to support the Daylight Saving Time.
Flink SQL uses a timestamp literal to split the window and assigns window to
data according to the epoch time of the each row. This means that Flink SQL
uses the TIMESTAMP
type for window start and window end, like
TUMBLE_START
and TUMBLE_END
, and it uses TIMESTAMP_LTZ
for
window-time attributes, like TUMBLE_PROCTIME
and TUMBLE_ROWTIME
.
Given an example tumble window, the Daylight Saving Time in the
America/Los_Angeles
timezone starts at time 2021-03-14 02:00:00
:
long epoch1 = 1615708800000L; // 2021-03-14 00:00:00
long epoch2 = 1615712400000L; // 2021-03-14 01:00:00
long epoch3 = 1615716000000L; // 2021-03-14 03:00:00, skip one hour (2021-03-14 02:00:00)
long epoch4 = 1615719600000L; // 2021-03-14 04:00:00
The tumble window [2021-03-14 00:00:00, 2021-03-14 00:04:00] collects 3 hours’
worth of data in the America/Los_Angeles
timezone, but it collect 4 hours’
worth of data in other non-DST timezones. You only need to define time the
attribute on a TIMESTAMP_LTZ column.
All windows in Flink SQL, like Hop window, Session window, Cumulative window follow this pattern, and all operations in Flink SQL support TIMESTAMP_LTZ, so Flink SQL provides complete support for Daylight Saving Time.
Batch and streaming mode differences¶
The behavior of the following time functions depends on the execution mode.
Flink SQL evaluates the returned values according to execution mode:
- In streaming mode, they’re evaluated for each record.
- In batch mode, they’re evaluated once as the query starts and return the same result for every row.
The CURRENT_ROW_TIMESTAMP time function is evaluated for each record in batch and streaming mode both.