Timezone Types in Confluent Cloud for Apache Flink

Confluent Cloud for Apache Flink®️ provides rich data types for date and time, including these:

These datetime types and the related datetime functions enable processing business data across timezones.

TIMESTAMP vs TIMESTAMP_LTZ

TIMESTAMP type

  • TIMESTAMP(p) is an abbreviation for TIMESTAMP(p) WITHOUT TIME ZONE. The precision p supports a range from 0 to 9. The default is 6.

  • TIMESTAMP describes a timestamp that represents year, month, day, hour, minute, second, and fractional seconds.

  • TIMESTAMP can be specified from a string literal. The following code example shows a SELECT statement that creates a timestamp from a string.

    SELECT TIMESTAMP '1970-01-01 00:00:04.001';
    

    Your output should resemble:

    +----+-------------------------+
    | op |                  EXPR$0 |
    +----+-------------------------+
    | +I | 1970-01-01 00:00:04.001 |
    +----+-------------------------+
    Received a total of 1 row
    

TIMESTAMP_LTZ type

  • TIMESTAMP_LTZ(p) is an abbreviation for TIMESTAMP(p) WITH LOCAL TIME ZONE. The precision p supports a range from 0* to 9. The default is 6.

  • TIMESTAMP_LTZ describes an absolute time point on the time-line. It stores a LONG value representing epoch-milliseconds and an INT representing nanosecond-of-millisecond. The epoch time is measured from the standard Java epoch of 1970-01-01T00:00:00Z. Every datum of TIMESTAMP_LTZ type is interpreted in the local timezone configured in the current session. Typically, the local timezone is used for computation and visualization.

  • TIMESTAMP_LTZ can be used in cross timezones business because the absolute time point. for example, 4001 milliseconds describes a same instantaneous point in different timezones. If the local system time of all machines in the world returns same value, for example, 4001 milliseconds, this is the meaning of “absolute time point”.

  • TIMESTAMP_LTZ has no literal representation, so you can’t create it from a literal. It can be derived from a LONG epoch time, as shown in the folllowing code example.

    CREATE VIEW T1 AS SELECT TO_TIMESTAMP_LTZ(4001, 3);
    SET 'table.local-time-zone' = 'UTC';
    SELECT * FROM T1;
    +---------------------------+
    | TO_TIMESTAMP_LTZ(4001, 3) |
    +---------------------------+
    |   1970-01-01 00:00:04.001 |
    +---------------------------+
    

    Your output should resemble:

    +----+-------------------------+
    | op |                  EXPR$0 |
    +----+-------------------------+
    | +I | 1970-01-01 00:00:04.001 |
    +----+-------------------------+
    Received a total of 1 row
    

    Change the timezone and query the time again:

    SET 'table.local-time-zone' = 'Asia/Shanghai';
    SELECT * FROM T1;
    

    Your output should resemble:

    +----+-------------------------+
    | op |                  EXPR$0 |
    +----+-------------------------+
    | +I | 1970-01-01 08:00:04.001 |
    +----+-------------------------+
    Received a total of 1 row
    

Set the timezone

The local timezone defines the current session timezone id. You can configure the timezone in the Flink SQL shell or in your applications.

-- set to UTC timezone
SET 'table.local-time-zone' = 'UTC';

-- set to Shanghai timezone
SET 'table.local-time-zone' = 'Asia/Shanghai';

-- set to Los_Angeles timezone
SET 'table.local-time-zone' = 'America/Los_Angeles';

Datetime functions and timezones

The return values of the following datetime functions depend on the configured timezone.

The following example code shows the return types of these datetime functions.

SET 'sql-client.execution.result-mode' = 'tableau';

CREATE VIEW timeview AS SELECT
  LOCALTIME,
  LOCALTIMESTAMP,
  CURRENT_DATE,
  CURRENT_TIME,
  CURRENT_TIMESTAMP,
  CURRENT_ROW_TIMESTAMP(),
  NOW(),
  PROCTIME();

DESC timeview;

Your output should resemble:

+-------------------+-----------------------------+-------+-----+--------+-----------+
|              name |                        type |  null | key | extras | watermark |
+-------------------+-----------------------------+-------+-----+--------+-----------+
|         LOCALTIME |                     TIME(0) | FALSE |     |        |           |
|    LOCALTIMESTAMP |                TIMESTAMP(3) | FALSE |     |        |           |
|      CURRENT_DATE |                        DATE | FALSE |     |        |           |
|      CURRENT_TIME |                     TIME(0) | FALSE |     |        |           |
| CURRENT_TIMESTAMP |            TIMESTAMP_LTZ(3) | FALSE |     |        |           |
|            EXPR$5 |            TIMESTAMP_LTZ(3) | FALSE |     |        |           |
|            EXPR$6 |            TIMESTAMP_LTZ(3) | FALSE |     |        |           |
|            EXPR$7 | TIMESTAMP_LTZ(3) *PROCTIME* | FALSE |     |        |           |
+-------------------+-----------------------------+-------+-----+--------+-----------+
8 rows in set

Set the timezone to UTC and and query the view.

SET 'table.local-time-zone' = 'UTC';
SELECT * FROM timeview;

Your output should resemble:

+----+-----------+-------------------------+--------------+--------------+-------------------------+-------------------------+-------------------------+-------------------------+
| op | LOCALTIME |          LOCALTIMESTAMP | CURRENT_DATE | CURRENT_TIME |       CURRENT_TIMESTAMP |                  EXPR$5 |                  EXPR$6 |                  EXPR$7 |
+----+-----------+-------------------------+--------------+--------------+-------------------------+-------------------------+-------------------------+-------------------------+
| +I |  20:03:22 | 2023-03-20 20:03:22.090 |   2023-03-20 |     20:03:22 | 2023-03-20 20:03:22.090 | 2023-03-20 20:03:22.090 | 2023-03-20 20:03:22.090 | 2023-03-20 20:03:22.100 |
+----+-----------+-------------------------+--------------+--------------+-------------------------+-------------------------+-------------------------+-------------------------+
Received a total of 1 row

Change the timezone and query the view again.

SET 'table.local-time-zone' = 'Asia/Shanghai';
SELECT * FROM timeview;

Your output should resemble:

+----+-----------+-------------------------+--------------+--------------+-------------------------+-------------------------+-------------------------+-------------------------+
| op | LOCALTIME |          LOCALTIMESTAMP | CURRENT_DATE | CURRENT_TIME |       CURRENT_TIMESTAMP |                  EXPR$5 |                  EXPR$6 |                  EXPR$7 |
+----+-----------+-------------------------+--------------+--------------+-------------------------+-------------------------+-------------------------+-------------------------+
| +I |  04:03:58 | 2023-03-21 04:03:58.955 |   2023-03-21 |     04:03:58 | 2023-03-21 04:03:58.955 | 2023-03-21 04:03:58.955 | 2023-03-21 04:03:58.955 | 2023-03-21 04:03:58.965 |
+----+-----------+-------------------------+--------------+--------------+-------------------------+-------------------------+-------------------------+-------------------------+
Received a total of 1 row

TIMESTAMP_LTZ string representation

The session timezone is used when represents a TIMESTAMP_LTZ value to string format, i.e print the value, cast the value to STRING type, cast the value to TIMESTAMP, cast a TIMESTAMP value to TIMESTAMP_LTZ:

CREATE VIEW timeview2 AS SELECT
  TO_TIMESTAMP_LTZ(4001, 3) AS ltz,
  TIMESTAMP '1970-01-01 00:00:01.001' AS ntz;
DESC timeview2;
+------+------------------+-------+-----+--------+-----------+
| name |             type |  null | key | extras | watermark |
+------+------------------+-------+-----+--------+-----------+
|  ltz | TIMESTAMP_LTZ(3) |  TRUE |     |        |           |
|  ntz |     TIMESTAMP(3) | FALSE |     |        |           |
+------+------------------+-------+-----+--------+-----------+
2 rows in set
SET 'table.local-time-zone' = 'UTC';
SELECT * FROM timeview2;
+----+-------------------------+-------------------------+
| op |                     ltz |                     ntz |
+----+-------------------------+-------------------------+
| +I | 1970-01-01 00:00:04.001 | 1970-01-01 00:00:01.001 |
+----+-------------------------+-------------------------+
Received a total of 1 row
SET 'table.local-time-zone' = 'Asia/Shanghai';
SELECT * FROM timeview2;
+----+-------------------------+-------------------------+
| op |                     ltz |                     ntz |
+----+-------------------------+-------------------------+
| +I | 1970-01-01 08:00:04.001 | 1970-01-01 00:00:01.001 |
+----+-------------------------+-------------------------+
Received a total of 1 row
CREATE VIEW timeview3 AS SELECT ltz,
  CAST(ltz AS TIMESTAMP(3)),
  CAST(ltz AS STRING),
  ntz,
  CAST(ntz AS TIMESTAMP_LTZ(3)) FROM timeview2;
DESC timeview3;
+--------+------------------+-------+-----+--------+-----------+
|   name |             type |  null | key | extras | watermark |
+--------+------------------+-------+-----+--------+-----------+
|    ltz | TIMESTAMP_LTZ(3) |  TRUE |     |        |           |
| EXPR$1 |     TIMESTAMP(3) |  TRUE |     |        |           |
| EXPR$2 |           STRING |  TRUE |     |        |           |
|    ntz |     TIMESTAMP(3) | FALSE |     |        |           |
| EXPR$4 | TIMESTAMP_LTZ(3) | FALSE |     |        |           |
+--------+------------------+-------+-----+--------+-----------+
5 rows in set
SELECT * FROM timeview3;
+----+-------------------------+-------------------------+--------------------------------+-------------------------+-------------------------+
| op |                     ltz |                  EXPR$1 |                         EXPR$2 |                     ntz |                  EXPR$4 |
+----+-------------------------+-------------------------+--------------------------------+-------------------------+-------------------------+
| +I | 1970-01-01 08:00:04.001 | 1970-01-01 08:00:04.001 |        1970-01-01 08:00:04.001 | 1970-01-01 00:00:01.001 | 1970-01-01 00:00:01.001 |
+----+-------------------------+-------------------------+--------------------------------+-------------------------+-------------------------+
Received a total of 1 row

Time attribute and timezone

For more information about time attributes, see Time Attributes.

Processing time and time zone

Flink SQL defines the process-time attribute with the PROCTIME function. The PROCTIME function return type is TIMESTAMP_LTZ.

Note

In Flink versions earlier than 1.13, the return type of PROCTIME is TIMESTAMP, and the return value is the TIMESTAMP in the UTC timezone. For example, if the wall-clock time in Shanghai is 2021-03-01 12:00:00, the PROCTIME() function returns 2021-03-01 04:00:00. Flink 1.13 fixes this issue and uses TIMESTAMP_LTZ type as the PROCTIME() return type, so you don’t need to correct the timezone manually.

The PROCTIME function always represents your local timestamp value by using the TIMESTAMP_LTZ type, which also supports DayLight Saving Time.

SET 'table.local-time-zone' = 'UTC';
SELECT PROCTIME();

Your output should resemble:

+----+-------------------------+
| op |                  EXPR$0 |
+----+-------------------------+
| +I | 2023-03-20 21:59:18.380 |
+----+-------------------------+
Received a total of 1 row

Change the local time zone and run the PROCTIME function again.

SET 'table.local-time-zone' = 'Asia/Shanghai';
SELECT PROCTIME();

Your output should resemble:

+----+-------------------------+
| op |                  EXPR$0 |
+----+-------------------------+
| +I | 2023-03-21 06:00:32.202 |
+----+-------------------------+
Received a total of 1 row
CREATE TABLE ProcessTimeTable (
       item STRING,
       price DOUBLE,
       proctime as PROCTIME()
 ) WITH (
     'connector' = 'socket',
     'hostname' = '127.0.0.1',
     'port' = '9999',
     'format' = 'csv'
);

CREATE VIEW timeview3 AS
 SELECT
     TUMBLE_START(proctime, INTERVAL '10' MINUTES) AS window_start,
     TUMBLE_END(proctime, INTERVAL '10' MINUTES) AS window_end,
     TUMBLE_PROCTIME(proctime, INTERVAL '10' MINUTES) as window_proctime,
     item,
     MAX(price) as max_price
 FROM ProcessTimeTable
     GROUP BY TUMBLE(proctime, INTERVAL '10' MINUTES), item;

DESC timeview3;
+-----------------+-----------------------------+-------+-----+--------+-----------+
|            name |                        type |  null | key | extras | watermark |
+-----------------+-----------------------------+-------+-----+--------+-----------+
|    window_start |                TIMESTAMP(3) | FALSE |     |        |           |
|      window_end |                TIMESTAMP(3) | FALSE |     |        |           |
| window_proctime | TIMESTAMP_LTZ(3) *PROCTIME* | FALSE |     |        |           |
|            item |                      STRING |  TRUE |     |        |           |
|       max_price |                      DOUBLE |  TRUE |     |        |           |
+-----------------+-----------------------------+-------+-----+--------+-----------+
5 rows in set

Use the following command to ingest data for ProcessTimeTable in a terminal:

> nc -lk 9999
A,1.1
B,1.2
A,1.8
B,2.5
C,3.8
SET 'table.local-time-zone' = 'UTC';
SELECT * FROM timeview3;
+-------------------------+-------------------------+-------------------------+------+-----------+
|            window_start |              window_end |          window_procime | item | max_price |
+-------------------------+-------------------------+-------------------------+------+-----------+
| 2021-04-15 14:00:00.000 | 2021-04-15 14:10:00.000 | 2021-04-15 14:10:00.005 |    A |       1.8 |
| 2021-04-15 14:00:00.000 | 2021-04-15 14:10:00.000 | 2021-04-15 14:10:00.007 |    B |       2.5 |
| 2021-04-15 14:00:00.000 | 2021-04-15 14:10:00.000 | 2021-04-15 14:10:00.007 |    C |       3.8 |
+-------------------------+-------------------------+-------------------------+------+-----------+
SET 'table.local-time-zone' = 'Asia/Shanghai';
SELECT * FROM MyView3;

Returns different window start, window end, and window proctime values, compared the calculation in the UTC timezone.

+-------------------------+-------------------------+-------------------------+------+-----------+
|            window_start |              window_end |          window_procime | item | max_price |
+-------------------------+-------------------------+-------------------------+------+-----------+
| 2021-04-15 22:00:00.000 | 2021-04-15 22:10:00.000 | 2021-04-15 22:10:00.005 |    A |       1.8 |
| 2021-04-15 22:00:00.000 | 2021-04-15 22:10:00.000 | 2021-04-15 22:10:00.007 |    B |       2.5 |
| 2021-04-15 22:00:00.000 | 2021-04-15 22:10:00.000 | 2021-04-15 22:10:00.007 |    C |       3.8 |
+-------------------------+-------------------------+-------------------------+------+-----------+

Note

A processing-time window is non-deterministic, so each run gets different windows and different aggregations. The previous example shows only how timezone affects a processing-time window.

Event time and timezone

Flink SQL supports defining an event-time attribute on TIMESTAMP and TIMESTAMP_LTZ columns.

Event-time attribute on TIMESTAMP

If the timestamp data in the source is represented as year-month-day-hour-minute-second, usually a string value without timezone information, for example, 2020-04-15 20:13:40.564, you can define the event-time attribute as a TIMESTAMP column.

The following code example shows how to create a view with windows based on a table’s TIMESTAMP column.

CREATE TABLE EventTimeTable (
       item STRING,
       price DOUBLE,
       ts TIMESTAMP(3), -- TIMESTAMP data type
       WATERMARK FOR ts AS ts - INTERVAL '10' SECOND
 ) WITH (
     'connector' = 'socket',
     'hostname' = '127.0.0.1',
     'port' = '9999',
     'format' = 'csv'
);

CREATE VIEW timeview4 AS
 SELECT
     TUMBLE_START(ts, INTERVAL '10' MINUTES) AS window_start,
     TUMBLE_END(ts, INTERVAL '10' MINUTES) AS window_end,
     TUMBLE_ROWTIME(ts, INTERVAL '10' MINUTES) as window_rowtime,
     item,
     MAX(price) as max_price
 FROM EventTimeTable
     GROUP BY TUMBLE(ts, INTERVAL '10' MINUTES), item;

DESC timeview4;
+----------------+------------------------+------+-----+--------+-----------+
|           name |                   type | null | key | extras | watermark |
+----------------+------------------------+------+-----+--------+-----------+
|   window_start |           TIMESTAMP(3) | true |     |        |           |
|     window_end |           TIMESTAMP(3) | true |     |        |           |
| window_rowtime | TIMESTAMP(3) *ROWTIME* | true |     |        |           |
|           item |                 STRING | true |     |        |           |
|      max_price |                 DOUBLE | true |     |        |           |
+----------------+------------------------+------+-----+--------+-----------+

Use the following command to ingest data for EventTimeTable in a terminal:

> nc -lk 9999
A,1.1,2021-04-15 14:01:00
B,1.2,2021-04-15 14:02:00
A,1.8,2021-04-15 14:03:00
B,2.5,2021-04-15 14:04:00
C,3.8,2021-04-15 14:05:00
C,3.8,2021-04-15 14:11:00
SET 'table.local-time-zone' = 'UTC';
SELECT * FROM timeview4;
+-------------------------+-------------------------+-------------------------+------+-----------+
|            window_start |              window_end |          window_rowtime | item | max_price |
+-------------------------+-------------------------+-------------------------+------+-----------+
| 2021-04-15 14:00:00.000 | 2021-04-15 14:10:00.000 | 2021-04-15 14:09:59.999 |    A |       1.8 |
| 2021-04-15 14:00:00.000 | 2021-04-15 14:10:00.000 | 2021-04-15 14:09:59.999 |    B |       2.5 |
| 2021-04-15 14:00:00.000 | 2021-04-15 14:10:00.000 | 2021-04-15 14:09:59.999 |    C |       3.8 |
+-------------------------+-------------------------+-------------------------+------+-----------+
SET 'table.local-time-zone' = 'Asia/Shanghai';
SELECT * FROM timeview4;

Returns the same window start, window end and window rowtime compared to calculation in UTC timezone.

+-------------------------+-------------------------+-------------------------+------+-----------+
|            window_start |              window_end |          window_rowtime | item | max_price |
+-------------------------+-------------------------+-------------------------+------+-----------+
| 2021-04-15 14:00:00.000 | 2021-04-15 14:10:00.000 | 2021-04-15 14:09:59.999 |    A |       1.8 |
| 2021-04-15 14:00:00.000 | 2021-04-15 14:10:00.000 | 2021-04-15 14:09:59.999 |    B |       2.5 |
| 2021-04-15 14:00:00.000 | 2021-04-15 14:10:00.000 | 2021-04-15 14:09:59.999 |    C |       3.8 |
+-------------------------+-------------------------+-------------------------+------+-----------+

Event-time attribute on TIMESTAMP_LTZ

If the timestamp data in the source is represented as a epoch time, usually as a LONG value, for example, 1618989564564, you can define an event-time attribute as a TIMESTAMP_LTZ column.

CREATE TABLE EventTimeTable2 (
       item STRING,
       price DOUBLE,
       ts BIGINT, -- long time value in epoch milliseconds
       ts_ltz AS TO_TIMESTAMP_LTZ(ts, 3),
       WATERMARK FOR ts_ltz AS ts_ltz - INTERVAL '10' SECOND
 ) WITH (
     'connector' = 'socket',
     'hostname' = '127.0.0.1',
     'port' = '9999',
     'format' = 'csv'
);

CREATE VIEW timeview5 AS
 SELECT
     TUMBLE_START(ts_ltz, INTERVAL '10' MINUTES) AS window_start,
     TUMBLE_END(ts_ltz, INTERVAL '10' MINUTES) AS window_end,
     TUMBLE_ROWTIME(ts_ltz, INTERVAL '10' MINUTES) as window_rowtime,
     item,
     MAX(price) as max_price
 FROM EventTimeTable2
     GROUP BY TUMBLE(ts_ltz, INTERVAL '10' MINUTES), item;

DESC timeview5;
+----------------+----------------------------+-------+-----+--------+-----------+
|           name |                       type |  null | key | extras | watermark |
+----------------+----------------------------+-------+-----+--------+-----------+
|   window_start |               TIMESTAMP(3) | false |     |        |           |
|     window_end |               TIMESTAMP(3) | false |     |        |           |
| window_rowtime | TIMESTAMP_LTZ(3) *ROWTIME* |  true |     |        |           |
|           item |                     STRING |  true |     |        |           |
|      max_price |                     DOUBLE |  true |     |        |           |
+----------------+----------------------------+-------+-----+--------+-----------+

The input data of EventTimeTable2 is:

A,1.1,1618495260000  # The corresponding utc timestamp is 2021-04-15 14:01:00
B,1.2,1618495320000  # The corresponding utc timestamp is 2021-04-15 14:02:00
A,1.8,1618495380000  # The corresponding utc timestamp is 2021-04-15 14:03:00
B,2.5,1618495440000  # The corresponding utc timestamp is 2021-04-15 14:04:00
C,3.8,1618495500000  # The corresponding utc timestamp is 2021-04-15 14:05:00
C,3.8,1618495860000  # The corresponding utc timestamp is 2021-04-15 14:11:00
SET 'table.local-time-zone' = 'UTC';
SELECT * FROM timeview5;
+-------------------------+-------------------------+-------------------------+------+-----------+
|            window_start |              window_end |          window_rowtime | item | max_price |
+-------------------------+-------------------------+-------------------------+------+-----------+
| 2021-04-15 14:00:00.000 | 2021-04-15 14:10:00.000 | 2021-04-15 14:09:59.999 |    A |       1.8 |
| 2021-04-15 14:00:00.000 | 2021-04-15 14:10:00.000 | 2021-04-15 14:09:59.999 |    B |       2.5 |
| 2021-04-15 14:00:00.000 | 2021-04-15 14:10:00.000 | 2021-04-15 14:09:59.999 |    C |       3.8 |
+-------------------------+-------------------------+-------------------------+------+-----------+
SET 'table.local-time-zone' = 'Asia/Shanghai';
SELECT * FROM timeview5;

Returns the different window start, window end and window rowtime compared to calculation in UTC timezone.

+-------------------------+-------------------------+-------------------------+------+-----------+
|            window_start |              window_end |          window_rowtime | item | max_price |
+-------------------------+-------------------------+-------------------------+------+-----------+
| 2021-04-15 22:00:00.000 | 2021-04-15 22:10:00.000 | 2021-04-15 22:09:59.999 |    A |       1.8 |
| 2021-04-15 22:00:00.000 | 2021-04-15 22:10:00.000 | 2021-04-15 22:09:59.999 |    B |       2.5 |
| 2021-04-15 22:00:00.000 | 2021-04-15 22:10:00.000 | 2021-04-15 22:09:59.999 |    C |       3.8 |
+-------------------------+-------------------------+-------------------------+------+-----------+

Daylight Saving Time support

Flink SQL supports defining time attributes on a TIMESTAMP_LTZ column, and Flink SQL uses the TIMESTAMP and TIMESTAMP_LTZ types in window processing to support the Daylight Saving Time.

Flink SQL uses a timestamp literal to split the window and assigns window to data according to the epoch time of the each row. This means that Flink SQL uses the TIMESTAMP type for window start and window end, like TUMBLE_START and TUMBLE_END, and it uses TIMESTAMP_LTZ for window-time attributes, like TUMBLE_PROCTIME and TUMBLE_ROWTIME. Given an example tumble window, the Daylight Saving Time in the America/Los_Angeles timezone starts at time 2021-03-14 02:00:00:

long epoch1 = 1615708800000L; // 2021-03-14 00:00:00
long epoch2 = 1615712400000L; // 2021-03-14 01:00:00
long epoch3 = 1615716000000L; // 2021-03-14 03:00:00, skip one hour (2021-03-14 02:00:00)
long epoch4 = 1615719600000L; // 2021-03-14 04:00:00

The tumble window [2021-03-14 00:00:00, 2021-03-14 00:04:00] collects 3 hours’ worth of data in the America/Los_Angeles timezone, but it collect 4 hours’ worth of data in other non-DST timezones. You only need to define time the attribute on a TIMESTAMP_LTZ column.

All windows in Flink SQL, like Hop window, Session window, Cumulative window follow this pattern, and all operations in Flink SQL support TIMESTAMP_LTZ, so Flink SQL provides complete support for Daylight Saving Time.

Batch and streaming mode differences

The behavior of the following time functions depends on the execution mode.

Flink SQL evaluates the returned values according to execution mode:

  • In streaming mode, they’re evaluated for each record.
  • In batch mode, they’re evaluated once as the query starts and return the same result for every row.

The CURRENT_ROW_TIMESTAMP time function is evaluated for each record in batch and streaming mode both.