Windowing Table-Valued Functions (Windowing TVFs)¶
Important
Confluent Cloud for Apache Flink®️ is currently available for Preview. A Preview feature is a Confluent Cloud component that is being introduced to gain early feedback from developers. Preview features can be used for evaluation and non-production testing purposes or to provide feedback to Confluent. The warranty, SLA, and Support Services provisions of your agreement with Confluent do not apply to Preview features. Confluent may discontinue providing Preview releases of the Preview features at any time in Confluent’s sole discretion. Check out Getting Help for questions, feedback and requests.
For Flink SQL features and limitations in the preview program, see Notable Limitations in Public Preview.
Confluent Cloud for Apache Flink®️ provides several window table-valued functions (TVFs) for dividing the elements of a table into windows.
Description¶
Windows are central to processing infinite streams. Windows split the stream into “buckets” of finite size, over which you can apply computations. This document focuses on how windowing is performed in Confluent Cloud for Apache Flink®️ and how you can benefit from windowed functions.
Flink SQL provides several window table-valued functions (TVF) to divide the elements of your table into windows, including:
Note that each element can logically belong to more than one window, depending on the windowing table-valued function you use. For example, HOP windowing creates overlapping windows in which a single element can be assigned to multiple windows.
Windowing TVFs are Flink-defined Polymorphic Table Functions
(abbreviated PTF). PTF is part of the
SQL 2016 standard,
a special table-function, but can have a table as a parameter. PTF is a
powerful feature to change the shape of a table. Because PTFs are used
semantically like tables, their invocation occurs in a FROM
clause
of a SELECT
statement.
These are frequently-used computations based on windowing TVF:
Window functions¶
Flink provides 3 built-in windowing TVFs: TUMBLE
, HOP
and
CUMULATE
. The return value of windowing TVF is a new relation that
includes all columns of original relation as well as additional 3
columns named “window_start”, “window_end”, “window_time” to indicate
the assigned window. In streaming mode, the “window_time” field is a
time attribute
of the window. In batch mode, the “window_time” field is an attribute of
type TIMESTAMP
or TIMESTAMP_LTZ
based on input time field type.
The “window_time” field can be used in subsequent time-based operations,
e.g. another windowing TVF, interval-join, or
over aggregation. The value of
window_time
always equal to window_end - 1ms
.
Window alignment¶
Time-based window boundaries align with clock seconds, minutes, hours, and days. For example, assume that you have events with these timestamps (in UTC):
- 00:59:00.000
- 00:59:30.000
- 01:00:15.000
If you put these events into hour-long tumbling windows, the first two land in
the window for 00:00:00-00:59:59.999
, and the third event lands in the
following hour.
Example data¶
The following examples show Window TVFs over mock data produced by the Datagen Source Connector configured with the Gaming Player Activity quickstart.
Note
To show the behavior of windowing more clearly in the following examples,
TIMESTAMP(3)
values may be simplified so that trailing zeroes aren’t shown.
For example, 2020-04-15 08:05:00.000
may be shown as 2020-04-15 08:05
.
Columns may be hidden intentionally to enhance the readability of the content.
TUMBLE¶
The TUMBLE
function assigns each element to a window of specified
window size. Tumbling windows have a fixed size and do not overlap. For
example, suppose you specify a tumbling window with a size of 5 minutes.
In that case, Flink will evaluate the current window, and a new window
started every five minutes, as illustrated by the following figure.
The TUMBLE
function assigns a window for each row of a relation
based on a time attribute field. In streaming mode, the time attribute
field must be either event or processing time attributes.
In batch mode, the time attribute field of window table function must be an attribute of type
TIMESTAMP
or TIMESTAMP_LTZ
.
The return value of TUMBLE
is a new relation that includes all columns of
the original relation, as well as an additional 3 columns named window_start
,
window_end
, and window_time
to indicate the assigned window. The original
time attribute, timecol
is a regular timestamp column after windowing TVF.
The TUMBLE
function takes three required parameters and one optional
parameter:
TUMBLE(TABLE data, DESCRIPTOR(timecol), size [, offset ])
data
: is a table parameter that can be any relation with a time attribute column.timecol
: is a column descriptor indicating which time attributes column of data should be mapped to tumbling windows.size
: is a duration specifying the width of the tumbling windows.offset
: is an optional parameter to specify the offset which window start would be shifted by.
Here is an example invocation on the gaming_player_activity_source
table:
DESCRIBE gaming_player_activity_source;
The output resembles:
+--------------+-----------+----------+---------------+
| Column Name | Data Type | Nullable | Extras |
+--------------+-----------+----------+---------------+
| key | BYTES | NULL | PARTITION KEY |
| player_id | INT | NOT NULL | |
| game_room_id | INT | NOT NULL | |
| points | INT | NOT NULL | |
| coordinates | STRING | NOT NULL | |
+--------------+-----------+----------+---------------+
The following query returns all rows in the gaming_player_activity_source
table.
SELECT * FROM gaming_player_activity_source;
The output resembles:
player_id game_room_id points coordinates
1051 1144 371 [65,36]
1079 3451 38 [20,71]
1017 4177 419 [63,05]
1092 1801 209 [31,67]
1074 3013 401 [32,69]
1003 1038 284 [18,32]
1081 2265 196 [78,68]
The following queries return all rows in the gaming_player_activity_source
table in 10-minute tumbling windows.
SELECT * FROM TABLE(
TUMBLE(TABLE gaming_player_activity_source, DESCRIPTOR($rowtime), INTERVAL '10' MINUTES));
-- or with the named params
-- note: the DATA param must be the first
SELECT * FROM TABLE(
TUMBLE(
DATA => TABLE gaming_player_activity_source,
TIMECOL => DESCRIPTOR($rowtime),
SIZE => INTERVAL '10' MINUTES));
The output resembles:
player_id game_room_id points coordinates window_start window_end window_time
1067 1338 416 [34,29] 2023-11-02 13:20 2023-11-02 13:30 2023-11-02 13:29.999
1057 3169 106 [09,12] 2023-11-02 13:20 2023-11-02 13:30 2023-11-02 13:29.999
1029 3218 430 [58,86] 2023-11-02 13:20 2023-11-02 13:30 2023-11-02 13:29.999
1009 2593 150 [35,67] 2023-11-02 13:20 2023-11-02 13:30 2023-11-02 13:29.999
1061 2131 360 [66,98] 2023-11-02 13:20 2023-11-02 13:30 2023-11-02 13:29.999
1011 3058 30 [43,17] 2023-11-02 13:20 2023-11-02 13:30 2023-11-02 13:29.999
1100 1831 445 [78,21] 2023-11-02 13:20 2023-11-02 13:30 2023-11-02 13:29.999
1058 3316 261 [61,66] 2023-11-02 13:20 2023-11-02 13:30 2023-11-02 13:29.999
1061 3761 388 [55,97] 2023-11-02 13:20 2023-11-02 13:30 2023-11-02 13:29.999
The following query computes the sum of the points
column in the
gaming_player_activity_source
table within 10-minute tumbling windows.
-- apply aggregation on the tumbling windowed table
SELECT window_start, window_end, SUM(points) as `sum`
FROM TABLE(
TUMBLE(TABLE gaming_player_activity_source, DESCRIPTOR($rowtime), INTERVAL '10' MINUTES))
GROUP BY window_start, window_end;
The output resembles:
window_start window_end sum
2023-11-02 10:40 2023-11-02 10:50 734
2023-11-02 10:50 2023-11-02 11:00 2398
2023-11-02 11:00 2023-11-02 11:10 2388
2023-11-02 11:10 2023-11-02 11:20 2391
2023-11-02 11:20 2023-11-02 11:30 2434
2023-11-02 11:30 2023-11-02 11:40 2374
HOP¶
The HOP
function assigns elements to windows of fixed length. Like a
TUMBLE
windowing function, the size of the windows is configured by
the window size parameter. An additional window slide parameter controls
how frequently a hopping window is started. Hence, hopping windows can
be overlapping if the slide is smaller than the window size. In this
case, elements are assigned to multiple windows. Hopping windows are
also known as “sliding windows”.
For example, you could have windows of size 10 minutes that slides by 5 minutes. With this, you get every 5 minutes a window that contains the events that arrived during the last 10 minutes, as depicted by the following figure.
The HOP
function assigns windows that cover rows within the interval
of size and shifting every slide based on a time attribute field. In
streaming mode, the time attribute field must be either event or processing time attributes. In batch mode, the time
attribute field of window table function must be an attribute of type
TIMESTAMP
or TIMESTAMP_LTZ
.
The return value of HOP
is a new relation that includes all columns of
the original relation as well as an additional 3 columns named window_start
,
window_end
, and window_time
to indicate the assigned window. The
original time attribute, timecol
, is a regular timestamp column after
windowing TVF.
The HOP
takes four required parameters and one optional parameter:
HOP(TABLE data, DESCRIPTOR(timecol), slide, size [, offset ])
data
: is a table parameter that can be any relation with an time attribute column.timecol
: is a column descriptor indicating which time attributes column of data should be mapped to hopping windows.slide
: is a duration specifying the duration between the start of sequential hopping windowssize
: is a duration specifying the width of the hopping windows.offset
: is an optional parameter to specify the offset which window start would be shifted by.
The following queries return all rows in the gaming_player_activity_source
table in hopping windows with a 5-minute slide and 10-minute size.
SELECT * FROM TABLE(
HOP(TABLE gaming_player_activity_source, DESCRIPTOR($rowtime), INTERVAL '5' MINUTES, INTERVAL '10' MINUTES));
-- or with the named params
-- note: the DATA param must be the first
SELECT * FROM TABLE(
HOP(
DATA => TABLE gaming_player_activity_source,
TIMECOL => DESCRIPTOR($rowtime),
SLIDE => INTERVAL '5' MINUTES,
SIZE => INTERVAL '10' MINUTES));
The output resembles:
player_id game_room_id points coordinates window_start window_end window_time
1012 4445 410 [71,06] 2023-11-02 13:30 2023-11-02 13:40 2023-11-02 13:39.999
1012 4445 410 [71,06] 2023-11-02 13:25 2023-11-02 13:35 2023-11-02 13:34.999
1088 3611 322 [53,24] 2023-11-02 13:30 2023-11-02 13:40 2023-11-02 13:39.999
1088 3611 322 [53,24] 2023-11-02 13:25 2023-11-02 13:35 2023-11-02 13:34.999
1037 4047 231 [09,96] 2023-11-02 13:30 2023-11-02 13:40 2023-11-02 13:39.999
1037 4047 231 [09,96] 2023-11-02 13:25 2023-11-02 13:35 2023-11-02 13:34.999
1015 2722 334 [89,08] 2023-11-02 13:30 2023-11-02 13:40 2023-11-02 13:39.999
1015 2722 334 [89,08] 2023-11-02 13:25 2023-11-02 13:35 2023-11-02 13:34.999
1035 1361 432 [88,87] 2023-11-02 13:30 2023-11-02 13:40 2023-11-02 13:39.999
1035 1361 432 [88,87] 2023-11-02 13:25 2023-11-02 13:35 2023-11-02 13:34.999
1042 2973 251 [02,81] 2023-11-02 13:30 2023-11-02 13:40 2023-11-02 13:39.999
1042 2973 251 [02,81] 2023-11-02 13:25 2023-11-02 13:35 2023-11-02 13:34.999
The following query computes the sum of the points
column in the
gaming_player_activity_source
table within hopping windows that have a
5-minute slide and 10-minute size.
-- apply aggregation on the hopping windowed table
SELECT window_start, window_end, SUM(points) as `sum`
FROM TABLE(
HOP(TABLE gaming_player_activity_source, DESCRIPTOR($rowtime), INTERVAL '5' MINUTES, INTERVAL '10' MINUTES))
GROUP BY window_start, window_end;
The output resembles:
window_start window_end sum
2023-11-02 11:10:00.000 2023-11-02 11:20:00.000 612161
2023-11-02 11:15:00.000 2023-11-02 11:25:00.000 612540
2023-11-02 11:20:00.000 2023-11-02 11:30:00.000 625610
2023-11-02 11:25:00.000 2023-11-02 11:35:00.000 610675
2023-11-02 11:30:00.000 2023-11-02 11:40:00.000 604854
2023-11-02 11:35:00.000 2023-11-02 11:45:00.000 612940
CUMULATE¶
Cumulating windows are useful in some scenarios, such as tumbling windows with
early firing in a fixed window interval. For example, a daily dashboard might
display cumulative unique views (UVs) from 00:00 to every minute, and the UV
at 10:00 might represent the total number of UVs from 00:00 to 10:00. This can
be implemented easily and efficiently by CUMULATE
windowing.
The CUMULATE
function assigns elements to windows that cover rows within an
initial interval of a specified step size, and it expands by one more step size,
keeping the window start fixed, for every step, until the maximum window size
is reached.
CUMULATE
function windows all have the same window start but add a step
size to each window until the max value is reached, so the window size is
always changing, and the windows overlap. When the max value is reached, the
window start is advanced to the end of the last window, and the size resets to
the step size. In comparison, TUMBLE
function windows all have the same
size, the step size, and do not overlap.
For example, you could have a cumulating window with a 1-hour step and 1-day maximum size, and you will get these windows for every day:
[00:00, 01:00)
[00:00, 02:00)
[00:00, 03:00)
…[00:00, 24:00)
The CUMULATE
function assigns windows based on a time attribute
column. In streaming mode, the time attribute field must be either
event or processing time attributes.
In batch mode, the time attribute field of window table function must be an
attribute of type TIMESTAMP
or TIMESTAMP_LTZ
.
The return value of CUMULATE
is a new relation that includes all columns
of the original relation, as well as an additional 3 columns named
window_start
, window_end
, and window_time
to indicate the assigned
window. The original time attribute, timecol
, is a regular timestamp column
after window TVF.
The CUMULATE
takes four required parameters and one optional parameter:
CUMULATE(TABLE data, DESCRIPTOR(timecol), step, size)
data
: is a table parameter that can be any relation with an time attribute column.timecol
: is a column descriptor indicating which time attributes column of data should be mapped to cumulating windows.step
: is a duration specifying the increased window size between the end of sequential cumulating windows.size
: is a duration specifying the max width of the cumulating windows.size
must be an integral multiple ofstep
.offset
: is an optional parameter to specify the offset which window start would be shifted by.
The following queries return all rows in the gaming_player_activity_source
table in CUMULATE windows that have a 2-minute step and 10-minute size.
SELECT * FROM TABLE(
CUMULATE(TABLE gaming_player_activity_source, DESCRIPTOR($rowtime), INTERVAL '2' MINUTES, INTERVAL '10' MINUTES));
-- or with the named params
-- note: the DATA param must be the first
SELECT * FROM TABLE(
CUMULATE(
DATA => TABLE gaming_player_activity_source,
TIMECOL => DESCRIPTOR($rowtime),
STEP => INTERVAL '2' MINUTES,
SIZE => INTERVAL '10' MINUTES));
The output resembles:
player_id game_room_id points coordinates window_start window_end window_time
1087 3134 464 [97,70] 2023-11-02 12:00:00 2023-11-02 12:06 2023-11-02 12:05:59
1087 3134 464 [97,70] 2023-11-02 12:00:00 2023-11-02 12:08 2023-11-02 12:07:59
1087 3134 464 [97,70] 2023-11-02 12:00:00 2023-11-02 12:10 2023-11-02 12:09:59
1024 4177 93 [88,96] 2023-11-02 12:00:00 2023-11-02 12:02 2023-11-02 12:01:59
1024 4177 93 [88,96] 2023-11-02 12:00:00 2023-11-02 12:04 2023-11-02 12:03:59
1024 4177 93 [88,96] 2023-11-02 12:00:00 2023-11-02 12:06 2023-11-02 12:05:59
1024 4177 93 [88,96] 2023-11-02 12:00:00 2023-11-02 12:08 2023-11-02 12:07:59
1024 4177 93 [88,96] 2023-11-02 12:00:00 2023-11-02 12:10 2023-11-02 12:09:59
1024 2852 299 [91,61] 2023-11-02 12:00:00 2023-11-02 12:02 2023-11-02 12:01:59
1024 2852 299 [91,61] 2023-11-02 12:00:00 2023-11-02 12:04 2023-11-02 12:03:59
1024 2852 299 [91,61] 2023-11-02 12:00:00 2023-11-02 12:06 2023-11-02 12:05:59
1024 2852 299 [91,61] 2023-11-02 12:00:00 2023-11-02 12:08 2023-11-02 12:07:59
1024 2852 299 [91,61] 2023-11-02 12:00:00 2023-11-02 12:10 2023-11-02 12:09:59
The following query computes the sum of the points
column in the
gaming_player_activity_source
table within CUMULATE windows that have a
2-minute step and 10-minute size.
-- apply aggregation on the cumulating windowed table
SELECT window_start, window_end, SUM(points) as `sum`
FROM TABLE(
CUMULATE(TABLE gaming_player_activity_source, DESCRIPTOR($rowtime), INTERVAL '2' MINUTES, INTERVAL '10' MINUTES))
GROUP BY window_start, window_end;
The output resembles:
window_start window_end sum
2023-11-02 12:40:00.000 2023-11-02 12:46:00.000 364387
2023-11-02 12:40:00.000 2023-11-02 12:48:00.000 482213
2023-11-02 12:40:00.000 2023-11-02 12:50:00.000 604723
2023-11-02 12:50:00.000 2023-11-02 12:52:00.000 122034
2023-11-02 12:50:00.000 2023-11-02 12:54:00.000 240866
2023-11-02 12:50:00.000 2023-11-02 12:56:00.000 368831
2023-11-02 12:50:00.000 2023-11-02 12:58:00.000 483390
2023-11-02 12:50:00.000 2023-11-02 13:00:00.000 600559
Window Offset¶
Offset
is an optional parameter that you can use to change the window
assignment. It can be a positive duration or a negative duration. The default
value for a window offset is 0. The same record may be assigned to a
different window if set to a different offset value.
For example, which window would a record be assigned to if it has a timestamp
of 2021-06-30 00:00:04
, for a Tumble window with 10 MINUTE as size?
- If the
offset
value is-16 MINUTE
, the record assigns to window [2021-06-29 23:54:00
,2021-06-30 00:04:00
). - If the
offset
value is-6 MINUTE
, the record assigns to window [2021-06-29 23:54:00
,2021-06-30 00:04:00
). - If the
offset
is-4 MINUTE
, the record assigns to window [2021-06-29 23:56:00
,2021-06-30 00:06:00
). - If the
offset
is0
, the record assigns to window [2021-06-30 00:00:00
,2021-06-30 00:10:00
). - If the
offset
is4 MINUTE
, the record assigns to window [2021-06-29 23:54:00
,2021-06-30 00:04:00
). - If the
offset
is6 MINUTE
, the record assigns to window [2021-06-29 23:56:00
,2021-06-30 00:06:00
). - If the
offset
is16 MINUTE
, the record assigns to window [2021-06-29 23:56:00
,2021-06-30 00:06:00
).
Some window offset parameters may have the same effect on the assignment of
windows. In the above case, -16 MINUTE
, -6 MINUTE
and 4 MINUTE
have the same effect for a tumble window with a 10 MINUTE size.
Note
The effect of window offset is only for updating window assignment. It has no effect on Watermark.
The following SQL examples show how to use offset
in a tumbling window.
SELECT * FROM TABLE(
TUMBLE(TABLE gaming_player_activity_source, DESCRIPTOR($rowtime), INTERVAL '10' MINUTES, INTERVAL '1' MINUTES));
-- or with the named params
-- note: the DATA param must be the first
SELECT * FROM TABLE(
TUMBLE(
DATA => TABLE gaming_player_activity_source,
TIMECOL => DESCRIPTOR($rowtime),
SIZE => INTERVAL '10' MINUTES,
OFFSET => INTERVAL '1' MINUTES));
The output resembles:
player_id game_room_id points coordinates window_start window_end window_time
1027 1018 183 [02,87] 2023-11-02 13:41 2023-11-02 13:51 2023-11-02 13:50:59
1053 1137 469 [12,47] 2023-11-02 13:41 2023-11-02 13:51 2023-11-02 13:50:59
1057 4691 388 [86,09] 2023-11-02 13:41 2023-11-02 13:51 2023-11-02 13:50:59
1055 1978 234 [66,59] 2023-11-02 13:41 2023-11-02 13:51 2023-11-02 13:50:59
1053 1437 333 [31,57] 2023-11-02 13:41 2023-11-02 13:51 2023-11-02 13:50:59
1001 3222 342 [80,76] 2023-11-02 13:41 2023-11-02 13:51 2023-11-02 13:50:59
1091 1617 487 [17,70] 2023-11-02 13:41 2023-11-02 13:51 2023-11-02 13:50:59
1053 4700 417 [84,51] 2023-11-02 13:41 2023-11-02 13:51 2023-11-02 13:50:59
1099 2472 237 [70,89] 2023-11-02 13:41 2023-11-02 13:51 2023-11-02 13:50:59
1072 3667 41 [86,23] 2023-11-02 13:41 2023-11-02 13:51 2023-11-02 13:50:59
1069 3747 475 [75,62] 2023-11-02 13:41 2023-11-02 13:51 2023-11-02 13:50:59
1043 4107 70 [52,64] 2023-11-02 13:41 2023-11-02 13:51 2023-11-02 13:50:59
The following query computes the sum of the points
column in the
gaming_player_activity_source
table within 10-minute tumbling windows
that have an offset of 1 minute.
-- apply aggregation on the tumbling windowed table
SELECT window_start, window_end, SUM(points) as `sum`
FROM TABLE(
TUMBLE(TABLE gaming_player_activity_source, DESCRIPTOR($rowtime), INTERVAL '10' MINUTES, INTERVAL '1' MINUTES))
GROUP BY window_start, window_end;
The output resembles:
window_start window_end sum
2023-11-02 10:41:00.000 2023-11-02 10:51:00.000 241958
2023-11-02 10:51:00.000 2023-11-02 11:01:00.000 618202
2023-11-02 11:01:00.000 2023-11-02 11:11:00.000 607575
2023-11-02 11:11:00.000 2023-11-02 11:21:00.000 617255
2023-11-02 11:21:00.000 2023-11-02 11:31:00.000 622046
2023-11-02 11:31:00.000 2023-11-02 11:41:00.000 605814
2023-11-02 11:41:00.000 2023-11-02 11:51:00.000 599619
2023-11-02 11:51:00.000 2023-11-02 12:01:00.000 609895
2023-11-02 12:01:00.000 2023-11-02 12:11:00.000 606674
2023-11-02 12:11:00.000 2023-11-02 12:21:00.000 590244
2023-11-02 12:21:00.000 2023-11-02 12:31:00.000 599929
2023-11-02 12:31:00.000 2023-11-02 12:41:00.000 628830
2023-11-02 12:41:00.000 2023-11-02 12:51:00.000 597203
2023-11-02 12:51:00.000 2023-11-02 13:01:00.000 600874
2023-11-02 13:01:00.000 2023-11-02 13:11:00.000 616678