Windowing Table-Valued Functions (Windowing TVFs)

Confluent Cloud for Apache Flink®️ provides several window table-valued functions (TVFs) for dividing the elements of a table into windows.

Description

Windows are central to processing infinite streams. Windows split the stream into “buckets” of finite size, over which you can apply computations. This document focuses on how windowing is performed in Confluent Cloud for Apache Flink and how you can benefit from windowed functions.

Flink provides several window table-valued functions (TVF) to divide the elements of your table into windows, including:

Note that each element can logically belong to more than one window, depending on the windowing table-valued function you use. For example, HOP windowing creates overlapping windows in which a single element can be assigned to multiple windows.

Windowing TVFs are Flink-defined Polymorphic Table Functions (abbreviated PTF). PTF is part of the SQL 2016 standard, a special table-function, but can have a table as a parameter. PTF is a powerful feature to change the shape of a table. Because PTFs are used semantically like tables, their invocation occurs in a FROM clause of a SELECT statement.

These are frequently-used computations based on windowing TVF:

Window functions

Flink provides 3 built-in windowing TVFs: TUMBLE, HOP and CUMULATE. The return value of windowing TVF is a new relation that includes all columns of original relation as well as additional 3 columns named “window_start”, “window_end”, “window_time” to indicate the assigned window. In streaming mode, the “window_time” field is a time attribute of the window. In batch mode, the “window_time” field is an attribute of type TIMESTAMP or TIMESTAMP_LTZ based on input time field type. The “window_time” field can be used in subsequent time-based operations, e.g. another windowing TVF, interval-join, or over aggregation. The value of window_time always equal to window_end - 1ms.

Window alignment

Time-based window boundaries align with clock seconds, minutes, hours, and days. For example, assume that you have events with these timestamps (in UTC):

  • 00:59:00.000
  • 00:59:30.000
  • 01:00:15.000

If you put these events into hour-long tumbling windows, the first two land in the window for 00:00:00-00:59:59.999, and the third event lands in the following hour.

Supported time units

Window TVFs support the following time units:

  • SECOND
  • MINUTE
  • HOUR
  • DAY

MONTH and YEAR time units are not currently supported.

Examples

The following examples show Window TVFs over mock data produced by the Datagen Source Connector configured with the Gaming Player Activity quickstart.

Note

To show the behavior of windowing more clearly in the following examples, TIMESTAMP(3) values may be simplified so that trailing zeroes aren’t shown. For example, 2020-04-15 08:05:00.000 may be shown as 2020-04-15 08:05. Columns may be hidden intentionally to enhance the readability of the content.

TUMBLE

The TUMBLE function assigns each element to a window of specified window size. Tumbling windows have a fixed size and do not overlap. For example, suppose you specify a tumbling window with a size of 5 minutes. In that case, Flink will evaluate the current window, and a new window started every five minutes, as illustrated by the following figure.

Tumbling windows in Confluent Cloud for Apache Flink®️

The TUMBLE function assigns a window for each row of a relation based on a time attribute field. In streaming mode, the time attribute field must be either event or processing time attributes. In batch mode, the time attribute field of window table function must be an attribute of type TIMESTAMP or TIMESTAMP_LTZ.

The return value of TUMBLE is a new relation that includes all columns of the original relation, as well as an additional 3 columns named window_start, window_end, and window_time to indicate the assigned window. The original time attribute, timecol is a regular timestamp column after windowing TVF.

The TUMBLE function takes three required parameters and one optional parameter:

TUMBLE(TABLE data, DESCRIPTOR(timecol), size [, offset ])
  • data: is a table parameter that can be any relation with a time attribute column.
  • timecol: is a column descriptor indicating which time attributes column of data should be mapped to tumbling windows.
  • size: is a duration specifying the width of the tumbling windows.
  • offset: is an optional parameter to specify the offset which window start would be shifted by.

Here is an example invocation on the gaming_player_activity_source table:

DESCRIBE gaming_player_activity_source;

The output resembles:

+--------------+-----------+----------+---------------+
| Column Name  | Data Type | Nullable |    Extras     |
+--------------+-----------+----------+---------------+
| key          | BYTES     | NULL     | PARTITION KEY |
| player_id    | INT       | NOT NULL |               |
| game_room_id | INT       | NOT NULL |               |
| points       | INT       | NOT NULL |               |
| coordinates  | STRING    | NOT NULL |               |
+--------------+-----------+----------+---------------+

The following query returns all rows in the gaming_player_activity_source table.

SELECT * FROM gaming_player_activity_source;

The output resembles:

player_id game_room_id points coordinates
1051      1144         371    [65,36]
1079      3451         38     [20,71]
1017      4177         419    [63,05]
1092      1801         209    [31,67]
1074      3013         401    [32,69]
1003      1038         284    [18,32]
1081      2265         196    [78,68]

The following queries return all rows in the gaming_player_activity_source table in 10-minute tumbling windows.

SELECT * FROM TABLE(
   TUMBLE(TABLE gaming_player_activity_source, DESCRIPTOR($rowtime), INTERVAL '10' MINUTES));

-- or with the named params
-- note: the DATA param must be the first
SELECT * FROM TABLE(
   TUMBLE(
     DATA => TABLE gaming_player_activity_source,
     TIMECOL => DESCRIPTOR($rowtime),
     SIZE => INTERVAL '10' MINUTES));

The output resembles:

player_id game_room_id points coordinates window_start     window_end       window_time
1067      1338         416    [34,29]     2023-11-02 13:20 2023-11-02 13:30 2023-11-02 13:29.999
1057      3169         106    [09,12]     2023-11-02 13:20 2023-11-02 13:30 2023-11-02 13:29.999
1029      3218         430    [58,86]     2023-11-02 13:20 2023-11-02 13:30 2023-11-02 13:29.999
1009      2593         150    [35,67]     2023-11-02 13:20 2023-11-02 13:30 2023-11-02 13:29.999
1061      2131         360    [66,98]     2023-11-02 13:20 2023-11-02 13:30 2023-11-02 13:29.999
1011      3058         30     [43,17]     2023-11-02 13:20 2023-11-02 13:30 2023-11-02 13:29.999
1100      1831         445    [78,21]     2023-11-02 13:20 2023-11-02 13:30 2023-11-02 13:29.999
1058      3316         261    [61,66]     2023-11-02 13:20 2023-11-02 13:30 2023-11-02 13:29.999
1061      3761         388    [55,97]     2023-11-02 13:20 2023-11-02 13:30 2023-11-02 13:29.999

The following query computes the sum of the points column in the gaming_player_activity_source table within 10-minute tumbling windows.

-- apply aggregation on the tumbling windowed table
SELECT window_start, window_end, SUM(points) as `sum`
  FROM TABLE(
    TUMBLE(TABLE gaming_player_activity_source, DESCRIPTOR($rowtime), INTERVAL '10' MINUTES))
  GROUP BY window_start, window_end;

The output resembles:

window_start     window_end       sum
2023-11-02 10:40 2023-11-02 10:50 734
2023-11-02 10:50 2023-11-02 11:00 2398
2023-11-02 11:00 2023-11-02 11:10 2388
2023-11-02 11:10 2023-11-02 11:20 2391
2023-11-02 11:20 2023-11-02 11:30 2434
2023-11-02 11:30 2023-11-02 11:40 2374

HOP

The HOP function assigns elements to windows of fixed length. Like a TUMBLE windowing function, the size of the windows is configured by the window size parameter. An additional window slide parameter controls how frequently a hopping window is started. Hence, hopping windows can be overlapping if the slide is smaller than the window size. In this case, elements are assigned to multiple windows. Hopping windows are also known as “sliding windows”.

For example, you could have windows of size 10 minutes that slides by 5 minutes. With this, you get every 5 minutes a window that contains the events that arrived during the last 10 minutes, as depicted by the following figure.

Hopping windows in Confluent Cloud for Apache Flink®️

The HOP function assigns windows that cover rows within the interval of size and shifting every slide based on a time attribute field. In streaming mode, the time attribute field must be either event or processing time attributes. In batch mode, the time attribute field of window table function must be an attribute of type TIMESTAMP or TIMESTAMP_LTZ.

The return value of HOP is a new relation that includes all columns of the original relation as well as an additional 3 columns named window_start, window_end, and window_time to indicate the assigned window. The original time attribute, timecol, is a regular timestamp column after windowing TVF.

The HOP takes four required parameters and one optional parameter:

HOP(TABLE data, DESCRIPTOR(timecol), slide, size [, offset ])
  • data: is a table parameter that can be any relation with an time attribute column.
  • timecol: is a column descriptor indicating which time attributes column of data should be mapped to hopping windows.
  • slide: is a duration specifying the duration between the start of sequential hopping windows
  • size: is a duration specifying the width of the hopping windows.
  • offset: is an optional parameter to specify the offset which window start would be shifted by.

The following queries return all rows in the gaming_player_activity_source table in hopping windows with a 5-minute slide and 10-minute size.

SELECT * FROM TABLE(
    HOP(TABLE gaming_player_activity_source, DESCRIPTOR($rowtime), INTERVAL '5' MINUTES, INTERVAL '10' MINUTES));

-- or with the named params
-- note: the DATA param must be the first
SELECT * FROM TABLE(
    HOP(
      DATA => TABLE gaming_player_activity_source,
      TIMECOL => DESCRIPTOR($rowtime),
      SLIDE => INTERVAL '5' MINUTES,
      SIZE => INTERVAL '10' MINUTES));

The output resembles:

player_id game_room_id points coordinates window_start     window_end       window_time
1012      4445         410    [71,06]     2023-11-02 13:30 2023-11-02 13:40 2023-11-02 13:39.999
1012      4445         410    [71,06]     2023-11-02 13:25 2023-11-02 13:35 2023-11-02 13:34.999
1088      3611         322    [53,24]     2023-11-02 13:30 2023-11-02 13:40 2023-11-02 13:39.999
1088      3611         322    [53,24]     2023-11-02 13:25 2023-11-02 13:35 2023-11-02 13:34.999
1037      4047         231    [09,96]     2023-11-02 13:30 2023-11-02 13:40 2023-11-02 13:39.999
1037      4047         231    [09,96]     2023-11-02 13:25 2023-11-02 13:35 2023-11-02 13:34.999
1015      2722         334    [89,08]     2023-11-02 13:30 2023-11-02 13:40 2023-11-02 13:39.999
1015      2722         334    [89,08]     2023-11-02 13:25 2023-11-02 13:35 2023-11-02 13:34.999
1035      1361         432    [88,87]     2023-11-02 13:30 2023-11-02 13:40 2023-11-02 13:39.999
1035      1361         432    [88,87]     2023-11-02 13:25 2023-11-02 13:35 2023-11-02 13:34.999
1042      2973         251    [02,81]     2023-11-02 13:30 2023-11-02 13:40 2023-11-02 13:39.999
1042      2973         251    [02,81]     2023-11-02 13:25 2023-11-02 13:35 2023-11-02 13:34.999

The following query computes the sum of the points column in the gaming_player_activity_source table within hopping windows that have a 5-minute slide and 10-minute size.

-- apply aggregation on the hopping windowed table
SELECT window_start, window_end, SUM(points) as `sum`
  FROM TABLE(
    HOP(TABLE gaming_player_activity_source, DESCRIPTOR($rowtime), INTERVAL '5' MINUTES, INTERVAL '10' MINUTES))
  GROUP BY window_start, window_end;

The output resembles:

window_start            window_end              sum
2023-11-02 11:10:00.000 2023-11-02 11:20:00.000 612161
2023-11-02 11:15:00.000 2023-11-02 11:25:00.000 612540
2023-11-02 11:20:00.000 2023-11-02 11:30:00.000 625610
2023-11-02 11:25:00.000 2023-11-02 11:35:00.000 610675
2023-11-02 11:30:00.000 2023-11-02 11:40:00.000 604854
2023-11-02 11:35:00.000 2023-11-02 11:45:00.000 612940

CUMULATE

Cumulating windows are useful in some scenarios, such as tumbling windows with early firing in a fixed window interval. For example, a daily dashboard might display cumulative unique views (UVs) from 00:00 to every minute, and the UV at 10:00 might represent the total number of UVs from 00:00 to 10:00. This can be implemented easily and efficiently by CUMULATE windowing.

The CUMULATE function assigns elements to windows that cover rows within an initial interval of a specified step size, and it expands by one more step size, keeping the window start fixed, for every step, until the maximum window size is reached.

CUMULATE function windows all have the same window start but add a step size to each window until the max value is reached, so the window size is always changing, and the windows overlap. When the max value is reached, the window start is advanced to the end of the last window, and the size resets to the step size. In comparison, TUMBLE function windows all have the same size, the step size, and do not overlap.

Cumulating windows in Confluent Cloud for Apache Flink®️

For example, you could have a cumulating window with a 1-hour step and 1-day maximum size, and you will get these windows for every day:

  • [00:00, 01:00)
  • [00:00, 02:00)
  • [00:00, 03:00)
  • [00:00, 24:00)

The CUMULATE function assigns windows based on a time attribute column. In streaming mode, the time attribute field must be either event or processing time attributes. In batch mode, the time attribute field of window table function must be an attribute of type TIMESTAMP or TIMESTAMP_LTZ.

The return value of CUMULATE is a new relation that includes all columns of the original relation, as well as an additional 3 columns named window_start, window_end, and window_time to indicate the assigned window. The original time attribute, timecol, is a regular timestamp column after window TVF.

The CUMULATE takes four required parameters and one optional parameter:

CUMULATE(TABLE data, DESCRIPTOR(timecol), step, size)
  • data: is a table parameter that can be any relation with an time attribute column.
  • timecol: is a column descriptor indicating which time attributes column of data should be mapped to cumulating windows.
  • step: is a duration specifying the increased window size between the end of sequential cumulating windows.
  • size: is a duration specifying the max width of the cumulating windows. size must be an integral multiple of step.
  • offset: is an optional parameter to specify the offset which window start would be shifted by.

The following queries return all rows in the gaming_player_activity_source table in CUMULATE windows that have a 2-minute step and 10-minute size.

SELECT * FROM TABLE(
    CUMULATE(TABLE gaming_player_activity_source, DESCRIPTOR($rowtime), INTERVAL '2' MINUTES, INTERVAL '10' MINUTES));

-- or with the named params
-- note: the DATA param must be the first
SELECT * FROM TABLE(
    CUMULATE(
      DATA => TABLE gaming_player_activity_source,
      TIMECOL => DESCRIPTOR($rowtime),
      STEP => INTERVAL '2' MINUTES,
      SIZE => INTERVAL '10' MINUTES));

The output resembles:

player_id game_room_id points coordinates window_start        window_end       window_time
1087      3134         464    [97,70]     2023-11-02 12:00:00 2023-11-02 12:06 2023-11-02 12:05:59
1087      3134         464    [97,70]     2023-11-02 12:00:00 2023-11-02 12:08 2023-11-02 12:07:59
1087      3134         464    [97,70]     2023-11-02 12:00:00 2023-11-02 12:10 2023-11-02 12:09:59
1024      4177         93     [88,96]     2023-11-02 12:00:00 2023-11-02 12:02 2023-11-02 12:01:59
1024      4177         93     [88,96]     2023-11-02 12:00:00 2023-11-02 12:04 2023-11-02 12:03:59
1024      4177         93     [88,96]     2023-11-02 12:00:00 2023-11-02 12:06 2023-11-02 12:05:59
1024      4177         93     [88,96]     2023-11-02 12:00:00 2023-11-02 12:08 2023-11-02 12:07:59
1024      4177         93     [88,96]     2023-11-02 12:00:00 2023-11-02 12:10 2023-11-02 12:09:59
1024      2852         299    [91,61]     2023-11-02 12:00:00 2023-11-02 12:02 2023-11-02 12:01:59
1024      2852         299    [91,61]     2023-11-02 12:00:00 2023-11-02 12:04 2023-11-02 12:03:59
1024      2852         299    [91,61]     2023-11-02 12:00:00 2023-11-02 12:06 2023-11-02 12:05:59
1024      2852         299    [91,61]     2023-11-02 12:00:00 2023-11-02 12:08 2023-11-02 12:07:59
1024      2852         299    [91,61]     2023-11-02 12:00:00 2023-11-02 12:10 2023-11-02 12:09:59

The following query computes the sum of the points column in the gaming_player_activity_source table within CUMULATE windows that have a 2-minute step and 10-minute size.

-- apply aggregation on the cumulating windowed table
SELECT window_start, window_end, SUM(points) as `sum`
  FROM TABLE(
    CUMULATE(TABLE gaming_player_activity_source, DESCRIPTOR($rowtime), INTERVAL '2' MINUTES, INTERVAL '10' MINUTES))
  GROUP BY window_start, window_end;

The output resembles:

window_start            window_end              sum
2023-11-02 12:40:00.000 2023-11-02 12:46:00.000 364387
2023-11-02 12:40:00.000 2023-11-02 12:48:00.000 482213
2023-11-02 12:40:00.000 2023-11-02 12:50:00.000 604723
2023-11-02 12:50:00.000 2023-11-02 12:52:00.000 122034
2023-11-02 12:50:00.000 2023-11-02 12:54:00.000 240866
2023-11-02 12:50:00.000 2023-11-02 12:56:00.000 368831
2023-11-02 12:50:00.000 2023-11-02 12:58:00.000 483390
2023-11-02 12:50:00.000 2023-11-02 13:00:00.000 600559

Window Offset

Offset is an optional parameter that you can use to change the window assignment. It can be a positive duration or a negative duration. The default value for a window offset is 0. The same record may be assigned to a different window if set to a different offset value.

For example, which window would a record be assigned to if it has a timestamp of 2021-06-30 00:00:04, for a Tumble window with 10 MINUTE as size?

  • If the offset value is -16 MINUTE, the record assigns to window [2021-06-29 23:54:00, 2021-06-30 00:04:00).
  • If the offset value is -6 MINUTE, the record assigns to window [2021-06-29 23:54:00, 2021-06-30 00:04:00).
  • If the offset is -4 MINUTE, the record assigns to window [2021-06-29 23:56:00, 2021-06-30 00:06:00).
  • If the offset is 0, the record assigns to window [2021-06-30 00:00:00, 2021-06-30 00:10:00).
  • If the offset is 4 MINUTE, the record assigns to window [2021-06-29 23:54:00, 2021-06-30 00:04:00).
  • If the offset is 6 MINUTE, the record assigns to window [2021-06-29 23:56:00, 2021-06-30 00:06:00).
  • If the offset is 16 MINUTE, the record assigns to window [2021-06-29 23:56:00, 2021-06-30 00:06:00).

Some window offset parameters may have the same effect on the assignment of windows. In the above case, -16 MINUTE, -6 MINUTE and 4 MINUTE have the same effect for a tumble window with a 10 MINUTE size.

Note

The effect of window offset is only for updating window assignment. It has no effect on Watermark.

Examples

The following SQL examples show how to use offset in a tumbling window.

SELECT * FROM TABLE(
   TUMBLE(TABLE gaming_player_activity_source, DESCRIPTOR($rowtime), INTERVAL '10' MINUTES, INTERVAL '1' MINUTES));

-- or with the named params
-- note: the DATA param must be the first
SELECT * FROM TABLE(
   TUMBLE(
     DATA => TABLE gaming_player_activity_source,
     TIMECOL => DESCRIPTOR($rowtime),
     SIZE => INTERVAL '10' MINUTES,
     OFFSET => INTERVAL '1' MINUTES));

The output resembles:

player_id game_room_id points coordinates window_start     window_end       window_time
1027      1018         183    [02,87]     2023-11-02 13:41 2023-11-02 13:51 2023-11-02 13:50:59
1053      1137         469    [12,47]     2023-11-02 13:41 2023-11-02 13:51 2023-11-02 13:50:59
1057      4691         388    [86,09]     2023-11-02 13:41 2023-11-02 13:51 2023-11-02 13:50:59
1055      1978         234    [66,59]     2023-11-02 13:41 2023-11-02 13:51 2023-11-02 13:50:59
1053      1437         333    [31,57]     2023-11-02 13:41 2023-11-02 13:51 2023-11-02 13:50:59
1001      3222         342    [80,76]     2023-11-02 13:41 2023-11-02 13:51 2023-11-02 13:50:59
1091      1617         487    [17,70]     2023-11-02 13:41 2023-11-02 13:51 2023-11-02 13:50:59
1053      4700         417    [84,51]     2023-11-02 13:41 2023-11-02 13:51 2023-11-02 13:50:59
1099      2472         237    [70,89]     2023-11-02 13:41 2023-11-02 13:51 2023-11-02 13:50:59
1072      3667         41     [86,23]     2023-11-02 13:41 2023-11-02 13:51 2023-11-02 13:50:59
1069      3747         475    [75,62]     2023-11-02 13:41 2023-11-02 13:51 2023-11-02 13:50:59
1043      4107         70     [52,64]     2023-11-02 13:41 2023-11-02 13:51 2023-11-02 13:50:59

The following query computes the sum of the points column in the gaming_player_activity_source table within 10-minute tumbling windows that have an offset of 1 minute.

-- apply aggregation on the tumbling windowed table
SELECT window_start, window_end, SUM(points) as `sum`
  FROM TABLE(
    TUMBLE(TABLE gaming_player_activity_source, DESCRIPTOR($rowtime), INTERVAL '10' MINUTES, INTERVAL '1' MINUTES))
  GROUP BY window_start, window_end;

The output resembles:

window_start            window_end              sum
2023-11-02 10:41:00.000 2023-11-02 10:51:00.000 241958
2023-11-02 10:51:00.000 2023-11-02 11:01:00.000 618202
2023-11-02 11:01:00.000 2023-11-02 11:11:00.000 607575
2023-11-02 11:11:00.000 2023-11-02 11:21:00.000 617255
2023-11-02 11:21:00.000 2023-11-02 11:31:00.000 622046
2023-11-02 11:31:00.000 2023-11-02 11:41:00.000 605814
2023-11-02 11:41:00.000 2023-11-02 11:51:00.000 599619
2023-11-02 11:51:00.000 2023-11-02 12:01:00.000 609895
2023-11-02 12:01:00.000 2023-11-02 12:11:00.000 606674
2023-11-02 12:11:00.000 2023-11-02 12:21:00.000 590244
2023-11-02 12:21:00.000 2023-11-02 12:31:00.000 599929
2023-11-02 12:31:00.000 2023-11-02 12:41:00.000 628830
2023-11-02 12:41:00.000 2023-11-02 12:51:00.000 597203
2023-11-02 12:51:00.000 2023-11-02 13:01:00.000 600874
2023-11-02 13:01:00.000 2023-11-02 13:11:00.000 616678