Windowing Table-Valued Functions (Windowing TVFs) in Confluent Cloud for Apache Flink¶
Confluent Cloud for Apache Flink® provides several window table-valued functions (TVFs) for dividing the elements of a table into windows.
Description¶
Windows are central to processing infinite streams. Windows split the stream into “buckets” of finite size, over which you can apply computations. This document focuses on how windowing is performed in Confluent Cloud for Apache Flink and how you can benefit from windowed functions.
Flink provides several window table-valued functions (TVF) to divide the elements of your table into windows, including:
Note that each element can logically belong to more than one window, depending on the windowing table-valued function you use. For example, HOP windowing creates overlapping windows in which a single element can be assigned to multiple windows.
Windowing TVFs are Flink-defined Polymorphic Table Functions
(abbreviated PTF). PTF is part of the
SQL 2016 standard,
a special table-function, but can have a table as a parameter. PTF is a
powerful feature to change the shape of a table. Because PTFs are used
semantically like tables, their invocation occurs in a FROM
clause
of a SELECT
statement.
These are frequently-used computations based on windowing TVF:
Window functions¶
Flink provides 4 built-in windowing TVFs: TUMBLE
, HOP
, CUMULATE
and SESSION
. The return value of windowing TVF is a new relation that
includes all columns of original relation as well as additional 3
columns named “window_start”, “window_end”, “window_time” to indicate
the assigned window. In streaming mode, the “window_time” field is a
time attribute
of the window. In batch mode, the “window_time” field is an attribute of
type TIMESTAMP
or TIMESTAMP_LTZ
based on input time field type.
The “window_time” field can be used in subsequent time-based operations,
e.g. another windowing TVF, interval-join, or
over aggregation. The value of
window_time
always equal to window_end - 1ms
.
Window alignment¶
Time-based window boundaries align with clock seconds, minutes, hours, and days. For example, assume that you have events with these timestamps (in UTC):
- 00:59:00.000
- 00:59:30.000
- 01:00:15.000
If you put these events into hour-long tumbling windows, the first two land in
the window for 00:00:00-00:59:59.999
, and the third event lands in the
following hour.
Supported time units¶
Window TVFs support the following time units:
- SECOND
- MINUTE
- HOUR
- DAY
MONTH and YEAR time units are not currently supported.
Examples¶
The following examples show Window TVFs over over example data streams that you can experiment with.
Note
To show the behavior of windowing more clearly in the following examples,
TIMESTAMP(3)
values may be simplified so that trailing zeroes aren’t shown.
For example, 2020-04-15 08:05:00.000
may be shown as 2020-04-15 08:05
.
Columns may be hidden intentionally to enhance the readability of the content.
TUMBLE¶
The TUMBLE
function assigns each element to a window of specified
window size. Tumbling windows have a fixed size and do not overlap. For
example, suppose you specify a tumbling window with a size of 5 minutes.
In that case, Flink will evaluate the current window, and a new window
started every five minutes, as illustrated by the following figure.
The TUMBLE
function assigns a window for each row of a relation
based on a time attribute field. In streaming mode, the time attribute
field must be an event time attribute.
In batch mode, the time attribute field of window table function must be an attribute of type
TIMESTAMP
or TIMESTAMP_LTZ
.
The return value of TUMBLE
is a new relation that includes all columns of
the original relation, as well as an additional 3 columns named window_start
,
window_end
, and window_time
to indicate the assigned window. The original
time attribute, timecol
is a regular timestamp column after windowing TVF.
The TUMBLE
function takes three required parameters and one optional
parameter:
TUMBLE(TABLE data, DESCRIPTOR(timecol), size [, offset ])
data
: is a table parameter that can be any relation with a time attribute column.timecol
: is a column descriptor indicating which time attributes column of data should be mapped to tumbling windows.size
: is a duration specifying the width of the tumbling windows.offset
: is an optional parameter to specify the offset which window start would be shifted by.
Here is an example invocation on the orders
table:
DESCRIBE `examples`.`marketplace`.`orders`;
The output resembles:
+--------------+-----------+----------+---------------+
| Column Name | Data Type | Nullable | Extras |
+--------------+-----------+----------+---------------+
| order_id | STRING | NOT NULL | |
| customer_id | INT | NOT NULL | |
| product_id | STRING | NOT NULL | |
| price | DOUBLE | NOT NULL | |
+--------------+-----------+----------+---------------+
The following query returns all rows in the orders
table.
SELECT * FROM `examples`.`marketplace`.`orders`;
The output resembles:
order_id customer_id product_id price
d770a538-a70c-4de6-9d06-e6c16c5bef5a 3075 1379 32.21
787ee1f4-d0d0-4c39-bdb9-44dc2d203d55 3028 1335 34.74
7ab7ce23-5f61-4398-afad-b1e3f548fee3 3148 1045 69.26
6fea712c-9454-497e-8038-ebaf6dfc7a17 3247 1390 67.26
dc9daf5e-98d5-4bcd-8839-251fed13b75e 3167 1309 12.04
ab3151d0-2950-49cd-9783-016ccc6a3281 3105 1094 21.52
d27ca945-3cff-48a4-afcc-7b17446aa95d 3168 1250 99.95
The following queries return all rows in the orders
table in 10-minute tumbling windows.
SELECT * FROM TABLE(
TUMBLE(TABLE `examples`.`marketplace`.`orders`, DESCRIPTOR($rowtime), INTERVAL '10' MINUTES))
-- or with the named params
-- note: the DATA param must be the first
SELECT * FROM TABLE(
TUMBLE(
DATA => TABLE `examples`.`marketplace`.`orders`,
TIMECOL => DESCRIPTOR($rowtime),
SIZE => INTERVAL '10' MINUTES));
The output resembles:
order_id customer_id product_id price $rowtime window_start window_end window_time
e69058b5-7ed9-44fa-86ff-4d6f8baff028 3145 1488 63.94 2023-11-02 13:20:27 2023-11-02 13:20:00 2023-11-02 13:30:00 2023-11-02 13:29:59.999
92e81cc4-93c4-488b-9386-ae9300d7cd21 3223 1328 29.37 2023-11-02 13:20:27 2023-11-02 13:20:00 2023-11-02 13:30:00 2023-11-02 13:29:59.999
7ca2ddaa-dd5e-41dc-ac47-c9aa7477d913 3223 1402 49.78 2023-11-02 13:20:27 2023-11-02 13:20:00 2023-11-02 13:30:00 2023-11-02 13:29:59.999
84efa0d0-7157-4cd3-a893-e7d2780cefdd 3076 1321 47.38 2023-11-02 13:20:27 2023-11-02 13:20:00 2023-11-02 13:30:00 2023-11-02 13:29:59.999
d72a37d2-ef15-4740-8ae8-1199ddf84ea9 3211 1234 56.27 2023-11-02 13:20:27 2023-11-02 13:20:00 2023-11-02 13:30:00 2023-11-02 13:29:59.999
4d57c754-63e1-413a-8af8-768d54d128ee 3126 1223 21.52 2023-11-02 13:20:27 2023-11-02 13:20:00 2023-11-02 13:30:00 2023-11-02 13:29:59.999
80f9fe0b-3e5d-4c25-aa6e-0b3dacfa36de 3087 1393 70.26 2023-11-02 13:20:27 2023-11-02 13:20:00 2023-11-02 13:30:00 2023-11-02 13:29:59.999
ea733533-1516-41b6-b5e3-cadcb6f71529 3079 1488 17.55 2023-11-02 13:20:27 2023-11-02 13:20:00 2023-11-02 13:30:00 2023-11-02 13:29:59.999
cef1cd9f-379e-4791-8a0d-69eec8adae35 3211 1293 91.20 2023-11-02 13:20:27 2023-11-02 13:20:00 2023-11-02 13:30:00 2023-11-02 13:29:59.999
The following query computes the sum of the price
column in the
orders
table within 10-minute tumbling windows.
-- apply aggregation on the tumbling windowed table
SELECT window_start, window_end, SUM(price) as `sum`
FROM TABLE(
TUMBLE(TABLE `examples`.`marketplace`.`orders`, DESCRIPTOR($rowtime), INTERVAL '10' MINUTES))
GROUP BY window_start, window_end;
The output resembles:
window_start window_end sum
2023-11-02 10:40:00 2023-11-02 10:50:00 258484.93
2023-11-02 10:50:00 2023-11-02 11:00:00 287632.15
2023-11-02 11:00:00 2023-11-02 11:10:00 271945.78
2023-11-02 11:10:00 2023-11-02 11:20:00 315207.46
2023-11-02 11:20:00 2023-11-02 11:30:00 342618.92
2023-11-02 11:30:00 2023-11-02 11:40:00 329754.31
HOP¶
The HOP
function assigns elements to windows of fixed length. Like a
TUMBLE
windowing function, the size of the windows is configured by
the window size parameter. An additional window slide parameter controls
how frequently a hopping window is started. Hence, hopping windows can
be overlapping if the slide is smaller than the window size. In this
case, elements are assigned to multiple windows. Hopping windows are
also known as “sliding windows”.
For example, you could have windows of size 10 minutes that slides by 5 minutes. With this, you get every 5 minutes a window that contains the events that arrived during the last 10 minutes, as depicted by the following figure.
The HOP
function assigns windows that cover rows within the interval
of size and shifting every slide based on a time attribute field. In
streaming mode, the time attribute field must be an event time attribute. In batch mode, the time
attribute field of window table function must be an attribute of type
TIMESTAMP
or TIMESTAMP_LTZ
.
The return value of HOP
is a new relation that includes all columns of
the original relation as well as an additional 3 columns named window_start
,
window_end
, and window_time
to indicate the assigned window. The
original time attribute, timecol
, is a regular timestamp column after
windowing TVF.
The HOP
takes four required parameters and one optional parameter:
HOP(TABLE data, DESCRIPTOR(timecol), slide, size [, offset ])
data
: is a table parameter that can be any relation with an time attribute column.timecol
: is a column descriptor indicating which time attributes column of data should be mapped to hopping windows.slide
: is a duration specifying the duration between the start of sequential hopping windowssize
: is a duration specifying the width of the hopping windows.offset
: is an optional parameter to specify the offset which window start would be shifted by.
The following queries return all rows in the orders
table in hopping windows with a 5-minute slide and 10-minute size.
SELECT * FROM TABLE(
HOP(TABLE `examples`.`marketplace`.`orders`, DESCRIPTOR($rowtime), INTERVAL '5' MINUTES, INTERVAL '10' MINUTES))
-- or with the named params
-- note: the DATA param must be the first
SELECT * FROM TABLE(
HOP(
DATA => TABLE `examples`.`marketplace`.`orders`,
TIMECOL => DESCRIPTOR($rowtime),
SLIDE => INTERVAL '5' MINUTES,
SIZE => INTERVAL '10' MINUTES));
The output resembles:
order_id customer_id product_id price $rowtime window_start window_end window_time
10ae1386-496e-4c6c-9436-7f7e2e7a59f9 3160 1015 26.20 2023-11-02 19:24:46 2023-11-02 19:20:00 2023-11-02 19:30:00 2023-11-02 19:29:59.999
10ae1386-496e-4c6c-9436-7f7e2e7a59f9 3160 1015 26.20 2023-11-02 19:24:46 2023-11-02 19:15:00 2023-11-02 19:25:00 2023-11-02 19:24:59.999
66ecb3b3-7a3d-43ac-b3a2-4c35e06a8d7c 3046 1081 20.24 2023-11-02 19:24:46 2023-11-02 19:20:00 2023-11-02 19:30:00 2023-11-02 19:29:59.999
66ecb3b3-7a3d-43ac-b3a2-4c35e06a8d7c 3046 1081 20.24 2023-11-02 19:24:46 2023-11-02 19:15:00 2023-11-02 19:25:00 2023-11-02 19:24:59.999
4d86db03-a573-4fc2-9699-85455331a7c4 3023 1346 85.45 2023-11-02 19:24:46 2023-11-02 19:20:00 2023-11-02 19:30:00 2023-11-02 19:29:59.999
4d86db03-a573-4fc2-9699-85455331a7c4 3023 1346 85.45 2023-11-02 19:24:46 2023-11-02 19:15:00 2023-11-02 19:25:00 2023-11-02 19:24:59.999
d1460cf7-9472-45e0-9c2d-40537c9f34c0 3114 1333 49.56 2023-11-02 19:24:47 2023-11-02 19:20:00 2023-11-02 19:30:00 2023-11-02 19:29:59.999
d1460cf7-9472-45e0-9c2d-40537c9f34c0 3114 1333 49.56 2023-11-02 19:24:47 2023-11-02 19:15:00 2023-11-02 19:25:00 2023-11-02 19:24:59.999
e38984d8-5683-4e55-9f7a-e43350de7c3d 3024 1402 90.75 2023-11-02 19:24:47 2023-11-02 19:20:00 2023-11-02 19:30:00 2023-11-02 19:29:59.999
e38984d8-5683-4e55-9f7a-e43350de7c3d 3024 1402 90.75 2023-11-02 19:24:47 2023-11-02 19:15:00 2023-11-02 19:25:00 2023-11-02 19:24:59.999
The following query computes the sum of the price
column in the
orders
table within hopping windows that have a
5-minute slide and 10-minute size.
-- apply aggregation on the hopping windowed table
SELECT window_start, window_end, SUM(price) as `sum`
FROM TABLE(
HOP(TABLE `examples`.`marketplace`.`orders`, DESCRIPTOR($rowtime), INTERVAL '5' MINUTES, INTERVAL '10' MINUTES))
GROUP BY window_start, window_end;
The output resembles:
window_start window_end sum
2023-11-02 11:10:00 2023-11-02 11:20:00 296049.38
2023-11-02 11:15:00 2023-11-02 11:25:00 1122455.07
2023-11-02 11:20:00 2023-11-02 11:30:00 1648270.20
2023-11-02 11:25:00 2023-11-02 11:35:00 2143271.00
2023-11-02 11:30:00 2023-11-02 11:40:00 2701592.45
2023-11-02 11:35:00 2023-11-02 11:45:00 3214376.78
CUMULATE¶
Cumulating windows are useful in some scenarios, such as tumbling windows with
early firing in a fixed window interval. For example, a daily dashboard might
display cumulative unique views (UVs) from 00:00 to every minute, and the UV
at 10:00 might represent the total number of UVs from 00:00 to 10:00. This can
be implemented easily and efficiently by CUMULATE
windowing.
The CUMULATE
function assigns elements to windows that cover rows within an
initial interval of a specified step size, and it expands by one more step size,
keeping the window start fixed, for every step, until the maximum window size
is reached.
CUMULATE
function windows all have the same window start but add a step
size to each window until the max value is reached, so the window size is
always changing, and the windows overlap. When the max value is reached, the
window start is advanced to the end of the last window, and the size resets to
the step size. In comparison, TUMBLE
function windows all have the same
size, the step size, and do not overlap.
For example, you could have a cumulating window with a 1-hour step and 1-day maximum size, and you will get these windows for every day:
[00:00, 01:00)
[00:00, 02:00)
[00:00, 03:00)
…[00:00, 24:00)
The CUMULATE
function assigns windows based on a time attribute
column. In streaming mode, the time attribute field must be an
event time attribute.
In batch mode, the time attribute field of window table function must be an
attribute of type TIMESTAMP
or TIMESTAMP_LTZ
.
The return value of CUMULATE
is a new relation that includes all columns
of the original relation, as well as an additional 3 columns named
window_start
, window_end
, and window_time
to indicate the assigned
window. The original time attribute, timecol
, is a regular timestamp column
after window TVF.
The CUMULATE
takes four required parameters and one optional parameter:
CUMULATE(TABLE data, DESCRIPTOR(timecol), step, size)
data
: is a table parameter that can be any relation with an time attribute column.timecol
: is a column descriptor indicating which time attributes column of data should be mapped to cumulating windows.step
: is a duration specifying the increased window size between the end of sequential cumulating windows.size
: is a duration specifying the max width of the cumulating windows.size
must be an integral multiple ofstep
.offset
: is an optional parameter to specify the offset which window start would be shifted by.
The following queries return all rows in the orders
table in CUMULATE windows that have a 2-minute step and 10-minute size.
SELECT * FROM TABLE(
CUMULATE(TABLE `examples`.`marketplace`.`orders`, DESCRIPTOR($rowtime), INTERVAL '2' MINUTES, INTERVAL '10' MINUTES));
-- or with the named params
-- note: the DATA param must be the first
SELECT * FROM TABLE(
CUMULATE(
DATA => TABLE `examples`.`marketplace`.`orders`,
TIMECOL => DESCRIPTOR($rowtime),
STEP => INTERVAL '2' MINUTES,
SIZE => INTERVAL '10' MINUTES));
The output resembles:
order_id customer_id product_id price $rowtime window_start window_end window_time
2572a2e0-2ba2-4947-8926-e70e31b68df3 3239 1015 13.59 2023-11-02 19:27:39 2023-11-02 19:20:00 2023-11-02 19:28:00 2023-11-02 19:27:59.999
2572a2e0-2ba2-4947-8926-e70e31b68df3 3239 1015 13.59 2023-11-02 19:27:39 2023-11-02 19:20:00 2023-11-02 19:30:00 2023-11-02 19:29:59.999
7f791e40-a524-4a9b-bb0d-35a2c1b5a7c4 3102 1374 93.59 2023-11-02 19:27:39 2023-11-02 19:20:00 2023-11-02 19:28:00 2023-11-02 19:27:59.999
7f791e40-a524-4a9b-bb0d-35a2c1b5a7c4 3102 1374 93.59 2023-11-02 19:27:39 2023-11-02 19:20:00 2023-11-02 19:30:00 2023-11-02 19:29:59.999
47e70310-8fa4-4568-b521-7e2b68b06634 3026 1142 58.26 2023-11-02 19:27:39 2023-11-02 19:20:00 2023-11-02 19:28:00 2023-11-02 19:27:59.999
47e70310-8fa4-4568-b521-7e2b68b06634 3026 1142 58.26 2023-11-02 19:27:39 2023-11-02 19:20:00 2023-11-02 19:30:00 2023-11-02 19:29:59.999
fe1b440e-dc75-4092-be11-8e1c3afe55c7 3106 1057 11.37 2023-11-02 19:27:39 2023-11-02 19:20:00 2023-11-02 19:28:00 2023-11-02 19:27:59.999
fe1b440e-dc75-4092-be11-8e1c3afe55c7 3106 1057 11.37 2023-11-02 19:27:39 2023-11-02 19:20:00 2023-11-02 19:30:00 2023-11-02 19:29:59.999
6668e4dc-d574-44db-8f0f-2b8e1b1f3c2e 3061 1049 26.20 2023-11-02 19:27:39 2023-11-02 19:20:00 2023-11-02 19:28:00 2023-11-02 19:27:59.999
6668e4dc-d574-44db-8f0f-2b8e1b1f3c2e 3061 1049 26.20 2023-11-02 19:27:39 2023-11-02 19:20:00 2023-11-02 19:30:00 2023-11-02 19:29:59.999
The following query computes the sum of the price
column in the
orders
table within CUMULATE windows that have a
2-minute step and 10-minute size.
-- apply aggregation on the cumulating windowed table
SELECT window_start, window_end, SUM(price) as `sum`
FROM TABLE(
CUMULATE(TABLE `examples`.`marketplace`.`orders`, DESCRIPTOR($rowtime), INTERVAL '2' MINUTES, INTERVAL '10' MINUTES))
GROUP BY window_start, window_end;
The output resembles:
window_start window_end sum
2023-11-02 12:40:00.000 2023-11-02 12:46:00.000 327376.23
2023-11-02 12:40:00.000 2023-11-02 12:48:00.000 661272.70
2023-11-02 12:40:00.000 2023-11-02 12:50:00.000 989294.13
2023-11-02 12:50:00.000 2023-11-02 12:52:00.000 1316596.58
2023-11-02 12:50:00.000 2023-11-02 12:54:00.000 1648097.20
2023-11-02 12:50:00.000 2023-11-02 12:56:00.000 1977881.53
2023-11-02 12:50:00.000 2023-11-02 12:58:00.000 2304080.32
2023-11-02 12:50:00.000 2023-11-02 13:00:00.000 2636795.56
SESSION¶
The SESSION
function groups elements by sessions of activity. Unlike TUMBLE
and HOP
windows, session
windows do not overlap and do not have a fixed start and end time. Instead, a session window closes when it doesn’t
receive elements for a certain period of time, i.e., when a gap of inactivity occurs. A session window is configured
with a static session gap that defines the duration of inactivity. When this period expires, the current session closes
and subsequent elements are assigned to a new session window.
For example, you could have windows with a gap of 1 minute. With this configuration, when the interval between two events is less than 1 minute, these events will be grouped into the same session window. If there is no data for 1 minute following the latest event, then this session window will close and be sent downstream. Subsequent events will be assigned to a new session window.
The SESSION
function assigns windows that cover rows based on a time attribute. In streaming mode, the time
attribute field must be an event time attribute.
The return value of SESSION
is a new relation that includes all columns of the original relation, as well as
three additional columns named window_start
, window_end
, and window_time
to indicate the assigned window.
The original time attribute timecol
becomes a regular timestamp column after the windowing TVF.
The SESSION
function takes three required parameters and one optional parameter:
SESSION(TABLE data [PARTITION BY(keycols, ...)], DESCRIPTOR(timecol), gap)
data
: is a table parameter that can be any relation with a time attribute column.keycols
: is a column or set of columns indicating which columns should be used to partition the data prior to session windows.timecol
: is a column descriptor indicating which time attribute column of data should be mapped to session windows.gap
: is the maximum interval in timestamp for two events to be considered part of the same session window.
The following query returns all columns from the orders
table within SESSION windows that have a 1-minute gap,
partitioned by product_id
:
SELECT * FROM TABLE(
SESSION(TABLE `examples`.`marketplace`.`orders` PARTITION BY product_id, DESCRIPTOR($rowtime), INTERVAL '1' MINUTES));
-- or with the named params
-- note: the DATA param must be the first
SELECT * FROM TABLE(
SESSION(
DATA => TABLE `examples`.`marketplace`.`orders` PARTITION BY product_id,
TIMECOL => DESCRIPTOR($rowtime),
GAP => INTERVAL '1' MINUTES));
The output resembles:
order_id customer_id product_id price $rowtime window_start window_end window_time
d7ef1f9a-4f5f-406e-bbad-25db521c38bf 3068 1234 17.08 2023-11-02T19:43:58.626Z 2023-11-02 21:43:58.626 2023-11-02 21:44:58.626 2023-11-02T19:44:58.625Z
804f0c86-a59a-4425-a293-b28bafaa9674 3071 1332 48.12 2023-11-02T19:44:00.506Z 2023-11-02 21:44:00.506 2023-11-02 21:45:00.506 2023-11-02T19:45:00.505Z
61ea63e3-f040-4501-b78e-8db1fdcf45fc 3179 1267 12.35 2023-11-02T19:43:58.405Z 2023-11-02 21:43:58.405 2023-11-02 21:45:07.925 2023-11-02T19:45:07.924Z
b70ba5bc-428c-41d7-b8fc-8014dd3fd429 3234 1267 40.81 2023-11-02T19:44:00.365Z 2023-11-02 21:43:58.405 2023-11-02 21:45:07.925 2023-11-02T19:45:07.924Z
37688f8c-65ee-4e27-a567-4890e6c7663b 3179 1267 98.17 2023-11-02T19:44:07.925Z 2023-11-02 21:43:58.405 2023-11-02 21:45:07.925 2023-11-02T19:45:07.924Z
4cfa0cc6-881a-43b3-bb34-1746c3b93094 3077 1047 16.78 2023-11-02T19:44:01.985Z 2023-11-02 21:44:01.985 2023-11-02 21:45:23.285 2023-11-02T19:45:23.284Z
e007ce6e-5a76-4390-8fb3-50f46025b965 3095 1047 77.48 2023-11-02T19:44:11.365Z 2023-11-02 21:44:01.985 2023-11-02 21:45:23.285 2023-11-02T19:45:23.284Z
487a0248-a534-489e-bbc5-733e87d19cc7 3200 1047 47.86 2023-11-02T19:44:23.285Z 2023-11-02 21:44:01.985 2023-11-02 21:45:23.285 2023-11-02T19:45:23.284Z
4dd1ab51-8ca4-4de6-9f79-bb2ad7ab2498 3043 1235 36.5 2023-11-02T19:43:57.785Z 2023-11-02 21:43:57.785 2023-11-02 21:45:24.625 2023-11-02T19:45:24.624Z
bb524ec6-1b21-40f1-8c54-3aac7b454c5b 3232 1235 36.98 2023-11-02T19:44:07.265Z 2023-11-02 21:43:57.785 2023-11-02 21:45:24.625 2023-11-02T19:45:24.624Z
9c218c8a-1566-4982-9640-a0deb9ac203c 3065 1235 30.17 2023-11-02T19:44:16.966Z 2023-11-02 21:43:57.785 2023-11-02 21:45:24.625 2023-11-02T19:45:24.624Z
6623c41b-04fa-4df0-a312-45b6dfcdc639 3143 1235 12.2 2023-11-02T19:44:24.625Z 2023-11-02 21:43:57.785 2023-11-02 21:45:24.625 2023-11-02T19:45:24.624Z
The following query computes the sum of the price
column in the orders
table within SESSION windows that have a 5-minute gap.
SELECT window_start, window_end, customer_id, SUM(price) as `sum`
FROM TABLE(
SESSION(TABLE `examples`.`marketplace`.`orders` PARTITION BY customer_id, DESCRIPTOR($rowtime), INTERVAL '1' MINUTES))
GROUP BY window_start, window_end, customer_id;
The output resembles:
window_start window_end sum
2023-11-02 12:40:00 2023-11-02 12:46:00 327376.23
2023-11-02 12:40:00 2023-11-02 12:48:00 661272.70
2023-11-02 12:40:00 2023-11-02 12:50:00 989294.13
2023-11-02 12:50:00 2023-11-02 12:52:00 1316596.58
2023-11-02 12:50:00 2023-11-02 12:54:00 1648097.20
2023-11-02 12:50:00 2023-11-02 12:56:00 1977881.53
2023-11-02 12:50:00 2023-11-02 12:58:00 2304080.32
2023-11-02 12:50:00 2023-11-02 13:00:00 2636795.56
Window Offset¶
Offset
is an optional parameter that you can use to change the window
assignment. It can be a positive duration or a negative duration. The default
value for a window offset is 0. The same record may be assigned to a
different window if set to a different offset value.
For example, which window would a record be assigned to if it has a timestamp
of 2021-06-30 00:00:04
, for a Tumble window with 10 MINUTE as size?
- If the
offset
value is-16 MINUTE
, the record assigns to window [2021-06-29 23:54:00
,2021-06-30 00:04:00
). - If the
offset
value is-6 MINUTE
, the record assigns to window [2021-06-29 23:54:00
,2021-06-30 00:04:00
). - If the
offset
is-4 MINUTE
, the record assigns to window [2021-06-29 23:56:00
,2021-06-30 00:06:00
). - If the
offset
is0
, the record assigns to window [2021-06-30 00:00:00
,2021-06-30 00:10:00
). - If the
offset
is4 MINUTE
, the record assigns to window [2021-06-29 23:54:00
,2021-06-30 00:04:00
). - If the
offset
is6 MINUTE
, the record assigns to window [2021-06-29 23:56:00
,2021-06-30 00:06:00
). - If the
offset
is16 MINUTE
, the record assigns to window [2021-06-29 23:56:00
,2021-06-30 00:06:00
).
Some window offset parameters may have the same effect on the assignment of
windows. In the above case, -16 MINUTE
, -6 MINUTE
and 4 MINUTE
have the same effect for a tumble window with a 10 MINUTE size.
Note
The effect of window offset is only for updating window assignment. It has no effect on Watermark.
Examples¶
The following SQL examples show how to use offset
in a tumbling window.
SELECT * FROM TABLE(
TUMBLE(TABLE `examples`.`marketplace`.`orders`, DESCRIPTOR($rowtime), INTERVAL '10' MINUTES, INTERVAL '1' MINUTES));
-- or with the named params
-- note: the DATA param must be the first
SELECT * FROM TABLE(
TUMBLE(
DATA => TABLE `examples`.`marketplace`.`orders`,
TIMECOL => DESCRIPTOR($rowtime),
SIZE => INTERVAL '10' MINUTES,
OFFSET => INTERVAL '1' MINUTES));
The output resembles:
order_id customer_id product_id price $rowtime window_start window_end window_time
0932497b-a3c2-4f80-9b1f-9d099b091696 3063 1035 75.85 2023-11-02 19:29:51 2023-11-02 19:21:00 2023-11-02 19:31:00 2023-11-02 19:30:59.999
20f4529c-9c86-4a54-8c38-f6c3caa1d7b8 3131 1207 89.00 2023-11-02 19:29:51 2023-11-02 19:21:00 2023-11-02 19:31:00 2023-11-02 19:30:59.999
cbda6c08-e0c7-41cb-ae04-c50f5b1f5e3c 3074 1312 63.71 2023-11-02 19:29:51 2023-11-02 19:21:00 2023-11-02 19:31:00 2023-11-02 19:30:59.999
d049ed28-cbbb-479b-8df6-8c637c1b68f5 3006 1201 72.14 2023-11-02 19:29:51 2023-11-02 19:21:00 2023-11-02 19:31:00 2023-11-02 19:30:59.999
63b6f2ef-c0e9-4737-ab81-f5acb93e4a64 3182 1346 76.18 2023-11-02 19:29:51 2023-11-02 19:21:00 2023-11-02 19:31:00 2023-11-02 19:30:59.999
00c088db-9cb7-4128-a4fd-4e06c0e95f7a 3198 1166 63.49 2023-11-02 19:29:51 2023-11-02 19:21:00 2023-11-02 19:31:00 2023-11-02 19:30:59.999
b9ca292e-635a-4ef7-a6ee-bcf099df7c1b 3236 1462 69.13 2023-11-02 19:29:51 2023-11-02 19:21:00 2023-11-02 19:31:00 2023-11-02 19:30:59.999
3299fd08-264e-4e49-8bb9-82cae18c5d7c 3058 1226 59.53 2023-11-02 19:29:51 2023-11-02 19:21:00 2023-11-02 19:31:00 2023-11-02 19:30:59.999
45878388-7cb3-409d-91a4-8ef1f02c8576 3028 1228 16.63 2023-11-02 19:29:51 2023-11-02 19:21:00 2023-11-02 19:31:00 2023-11-02 19:30:59.999
c2fef024-c0c2-4c0f-9880-bc423d1c2db6 3219 1071 80.66 2023-11-02 19:29:51 2023-11-02 19:21:00 2023-11-02 19:31:00 2023-11-02 19:30:59.999
The following query computes the sum of the price
column in the
orders
table within 10-minute tumbling windows
that have an offset of 1 minute.
-- apply aggregation on the tumbling windowed table
SELECT window_start, window_end, SUM(price) as `sum`
FROM TABLE(
TUMBLE(TABLE `examples`.`marketplace`.`orders`, DESCRIPTOR($rowtime), INTERVAL '10' MINUTES, INTERVAL '1' MINUTES))
GROUP BY window_start, window_end;
The output resembles:
window_start window_end sum
2023-11-02 19:21:00 2023-11-02 19:31:00 7285.64
2023-11-02 19:22:00 2023-11-02 19:32:00 6932.18
2023-11-02 19:23:00 2023-11-02 19:33:00 7104.53
2023-11-02 19:24:00 2023-11-02 19:34:00 7456.92
2023-11-02 19:25:00 2023-11-02 19:35:00 7198.75
2023-11-02 19:26:00 2023-11-02 19:36:00 6875.39
2023-11-02 19:27:00 2023-11-02 19:37:00 7312.87
2023-11-02 19:28:00 2023-11-02 19:38:00 7089.26
2023-11-02 19:29:00 2023-11-02 19:39:00 7401.58
2023-11-02 19:30:00 2023-11-02 19:40:00 7156.43