Window Aggregation Queries in Confluent Cloud for Apache Flink¶
Confluent Cloud for Apache Flink® enables aggregating data over windows in a table.
Syntax¶
SELECT ...
FROM <windowed_table> -- relation applied windowing TVF
GROUP BY window_start, window_end, ...
Description¶
Window TVF Aggregation¶
Window aggregations are defined in the GROUP BY
clause containing
“window_start” and “window_end” columns of the relation applied
Windowing TVF.
Just like queries with regular GROUP BY
clauses, queries with a
group by window aggregation compute a single result row per group.
Unlike other aggregations on continuous tables, window aggregations do not emit intermediate results but only a final result: the total aggregation at the end of the window. Moreover, window aggregations purge all intermediate state when they’re no longer needed.
Windowing TVFs¶
Flink supports TUMBLE
, HOP
, CUMULATE
and SESSION
types of window
aggregations. The time attribute field of a window
table-valued function must be
event time attributes.
For more information, see Windowing TVF.
In batch mode, the time attribute field of a window table-valued function
must be an attribute of type TIMESTAMP
or TIMESTAMP_LTZ
.
Examples¶
The following examples show Window aggregations over example data streams that you can experiment with.
Note
To show the behavior of windowing more clearly in the following examples,
TIMESTAMP(3)
values may be simplified so that trailing zeroes aren’t shown.
For example, 2020-04-15 08:05:00.000
may be shown as 2020-04-15 08:05
.
Columns may be hidden intentionally to enhance the readability of the content.
Here are some examples for TUMBLE
, HOP
, CUMULATE
and SESSION
window
aggregations.
DESCRIBE `examples`.`marketplace`.`orders`;
+--------------+-----------+----------+---------------+
| Column Name | Data Type | Nullable | Extras |
+--------------+-----------+----------+---------------+
| order_id | STRING | NOT NULL | |
| customer_id | INT | NOT NULL | |
| product_id | STRING | NOT NULL | |
| price | DOUBLE | NOT NULL | |
+--------------+-----------+----------+---------------+
SELECT * FROM `examples`.`marketplace`.`orders`;
order_id customer_id product_id price
d770a538-a70c-4de6-9d06-e6c16c5bef5a 3075 1379 32.21
787ee1f4-d0d0-4c39-bdb9-44dc2d203d55 3028 1335 34.74
7ab7ce23-5f61-4398-afad-b1e3f548fee3 3148 1045 69.26
6fea712c-9454-497e-8038-ebaf6dfc7a17 3247 1390 67.26
dc9daf5e-98d5-4bcd-8839-251fed13b75e 3167 1309 12.04
ab3151d0-2950-49cd-9783-016ccc6a3281 3105 1094 21.52
d27ca945-3cff-48a4-afcc-7b17446aa95d 3168 1250 99.95
-- apply aggregation on the tumbling windowed table
SELECT window_start, window_end, SUM(price) as `sum`
FROM TABLE(
TUMBLE(TABLE `examples`.`marketplace`.`orders`, DESCRIPTOR($rowtime), INTERVAL '10' MINUTES))
GROUP BY window_start, window_end;
window_start window_end sum
2023-11-02 10:40:00 2023-11-02 10:50:00 258484.93
2023-11-02 10:50:00 2023-11-02 11:00:00 287632.15
2023-11-02 11:00:00 2023-11-02 11:10:00 271945.78
2023-11-02 11:10:00 2023-11-02 11:20:00 315207.46
2023-11-02 11:20:00 2023-11-02 11:30:00 342618.92
2023-11-02 11:30:00 2023-11-02 11:40:00 329754.31
-- apply aggregation on the hopping windowed table
SELECT window_start, window_end, SUM(price) as `sum`
FROM TABLE(
HOP(TABLE `examples`.`marketplace`.`orders`, DESCRIPTOR($rowtime), INTERVAL '5' MINUTES, INTERVAL '10' MINUTES))
GROUP BY window_start, window_end;
window_start window_end sum
2023-11-02 11:10:00 2023-11-02 11:20:00 296049.38
2023-11-02 11:15:00 2023-11-02 11:25:00 1122455.07
2023-11-02 11:20:00 2023-11-02 11:30:00 1648270.20
2023-11-02 11:25:00 2023-11-02 11:35:00 2143271.00
2023-11-02 11:30:00 2023-11-02 11:40:00 2701592.45
2023-11-02 11:35:00 2023-11-02 11:45:00 3214376.78
-- apply aggregation on the cumulating windowed table
SELECT window_start, window_end, SUM(price) as `sum`
FROM TABLE(
CUMULATE(TABLE `examples`.`marketplace`.`orders`, DESCRIPTOR($rowtime), INTERVAL '2' MINUTES, INTERVAL '10' MINUTES))
GROUP BY window_start, window_end;
window_start window_end sum
2023-11-02 12:40:00.000 2023-11-02 12:46:00.000 327376.23
2023-11-02 12:40:00.000 2023-11-02 12:48:00.000 661272.70
2023-11-02 12:40:00.000 2023-11-02 12:50:00.000 989294.13
2023-11-02 12:50:00.000 2023-11-02 12:52:00.000 1316596.58
2023-11-02 12:50:00.000 2023-11-02 12:54:00.000 1648097.20
2023-11-02 12:50:00.000 2023-11-02 12:56:00.000 1977881.53
2023-11-02 12:50:00.000 2023-11-02 12:58:00.000 2304080.32
2023-11-02 12:50:00.000 2023-11-02 13:00:00.000 2636795.56
-- apply aggregation on the session windowed table
SELECT window_start, window_end, customer_id, SUM(price) as `sum`
FROM TABLE(
SESSION(TABLE `examples`.`marketplace`.`orders` PARTITION BY customer_id, DESCRIPTOR($rowtime), INTERVAL '1' MINUTES))
GROUP BY window_start, window_end, customer_id;
window_start window_end sum
2023-11-02 12:40:00 2023-11-02 12:46:00 327376.23
2023-11-02 12:40:00 2023-11-02 12:48:00 661272.70
2023-11-02 12:40:00 2023-11-02 12:50:00 989294.13
2023-11-02 12:50:00 2023-11-02 12:52:00 1316596.58
2023-11-02 12:50:00 2023-11-02 12:54:00 1648097.20
2023-11-02 12:50:00 2023-11-02 12:56:00 1977881.53
2023-11-02 12:50:00 2023-11-02 12:58:00 2304080.32
2023-11-02 12:50:00 2023-11-02 13:00:00 2636795.56
GROUPING SETS¶
Window aggregations also support GROUPING SETS
syntax. Grouping sets
allow for more complex grouping operations than those describable by a
standard GROUP BY
. Rows are grouped separately by each specified
grouping set and aggregates are computed for each group just as for
simple GROUP BY
clauses.
Window aggregations with GROUPING SETS
require both the
window_start
and window_end
columns have to be in the
GROUP BY
clause, but not in the GROUPING SETS
clause.
SELECT window_start, window_end, player_id, SUM(points) as `sum`
FROM TABLE(
TUMBLE(TABLE `examples`.`marketplace`.`orders`, DESCRIPTOR($rowtime), INTERVAL '10' MINUTES))
GROUP BY window_start, window_end, GROUPING SETS ((player_id), ());
window_start window_end player_id sum
2023-11-03 11:20 2023-11-03 11:30 (NULL) 6596
2023-11-03 11:20 2023-11-03 11:30 1025 6232
2023-11-03 11:20 2023-11-03 11:30 1007 4486
2023-11-03 11:30 2023-11-03 11:40 (NULL) 6073
2023-11-03 11:30 2023-11-03 11:40 1025 6953
2023-11-03 11:30 2023-11-03 11:40 1007 3723
Each sublist of GROUPING SETS
may specify zero or more columns or
expressions and is interpreted the same way as though used directly in
the GROUP BY
clause. An empty grouping set means that all rows are
aggregated down to a single group, which is output even if no input rows
were present.
References to the grouping columns or expressions are replaced by null values in result rows for grouping sets in which those columns do not appear.
ROLLUP¶
ROLLUP
is a shorthand notation for specifying a common type of
grouping set. It represents the given list of expressions and all
prefixes of the list, including the empty list.
Window aggregations with ROLLUP
requires both the window_start
and window_end
columns have to be in the GROUP BY
clause, but
not in the ROLLUP
clause.
For example, the following query is equivalent to the one above.
SELECT window_start, window_end, player_id, SUM(points) as `sum`
FROM TABLE(
TUMBLE(TABLE `examples`.`marketplace`.`orders`, DESCRIPTOR($rowtime), INTERVAL '10' MINUTES))
GROUP BY window_start, window_end, ROLLUP (player_id);
CUBE¶
CUBE
is a shorthand notation for specifying a common type of
grouping set. It represents the given list and all of its possible
subsets - the power set.
Window aggregations with CUBE
requires both the window_start
and
window_end
columns have to be in the GROUP BY
clause, but not in
the CUBE
clause.
For example, the following two queries are equivalent.
SELECT window_start, window_end, game_room_id, player_id, SUM(points) as `sum`
FROM TABLE(
TUMBLE(TABLE `examples`.`marketplace`.`orders`, DESCRIPTOR($rowtime), INTERVAL '10' MINUTES))
GROUP BY window_start, window_end, CUBE (player_id, game_room_id);
SELECT window_start, window_end, game_room_id, player_id, SUM(points) as `sum`
FROM TABLE(
TUMBLE(TABLE `examples`.`marketplace`.`orders`, DESCRIPTOR($rowtime), INTERVAL '10' MINUTES))
GROUP BY window_start, window_end, GROUPING SETS (
(player_id, game_room_id),
(player_id ),
( game_room_id),
( )
);
Selecting Group Window Start and End Timestamps¶
The start and end timestamps of group windows can be selected with the
grouped window_start
and window_end
columns.
Cascading Window Aggregation¶
The window_start
and window_end
columns are regular timestamp
columns, not time attributes, so they can’t be used as time attributes
in subsequent time-based operations. To propagate time attributes, you also
need to add window_time
column into GROUP BY
clause. The window_time
is the third column produced by Windowing TVFs,
which is a time attribute of the assigned window.
Adding window_time
into a GROUP BY
clause makes window_time
also to be a group key that can be selected. Following queries can use
this column for subsequent time-based operations, like cascading window
aggregations and Window TopN.
The following code shows a cascading window aggregation in which the first window aggregation propagates the time attribute for the second window aggregation.
-- tumbling 5 minutes for each player_id
WITH fiveminutewindow AS (
-- Note: The window start and window end fields of inner Window TVF
-- are optional in the SELECT clause. But if they appear in the clause,
-- they must be aliased to prevent name conflicts with the window start
-- and window end of the outer Window TVF.
SELECT window_start AS window_5mintumble_start, window_end as window_5mintumble_end, window_time AS rowtime, SUM(points) as `partial_sum`
FROM TABLE(
TUMBLE(TABLE `examples`.`marketplace`.`orders`, DESCRIPTOR($rowtime), INTERVAL '5' MINUTES))
GROUP BY player_id, window_start, window_end, window_time
)
-- tumbling 10 minutes on the first window
SELECT window_start, window_end, SUM(partial_price) as total_price
FROM TABLE(
TUMBLE(TABLE fiveminutewindow, DESCRIPTOR($rowtime), INTERVAL '10' MINUTES))
GROUP BY window_start, window_end;