Window Aggregation Queries in Confluent Cloud for Apache Flink¶
Confluent Cloud for Apache Flink®️ enables aggregating data over windows in a table.
Syntax¶
SELECT ...
FROM <windowed_table> -- relation applied windowing TVF
GROUP BY window_start, window_end, ...
Description¶
Window TVF Aggregation¶
Window aggregations are defined in the GROUP BY
clause containing
“window_start” and “window_end” columns of the relation applied
Windowing TVF.
Just like queries with regular GROUP BY
clauses, queries with a
group by window aggregation compute a single result row per group.
Unlike other aggregations on continuous tables, window aggregations do not emit intermediate results but only a final result: the total aggregation at the end of the window. Moreover, window aggregations purge all intermediate state when they’re no longer needed.
Windowing TVFs¶
Flink supports TUMBLE
, HOP
and CUMULATE
types of window
aggregations. In streaming mode, the time attribute field of a window
table-valued function must be on either
event or processing time attributes.
For more information, see Windowing TVF.
In batch mode, the time attribute field of a window table-valued function
must be an attribute of type TIMESTAMP
or TIMESTAMP_LTZ
.
Examples¶
The following examples show Window aggregations over mock data produced by the Datagen Source Connector configured with the Gaming Player Activity quickstart.
Note
To show the behavior of windowing more clearly in the following examples,
TIMESTAMP(3)
values may be simplified so that trailing zeroes aren’t shown.
For example, 2020-04-15 08:05:00.000
may be shown as 2020-04-15 08:05
.
Columns may be hidden intentionally to enhance the readability of the content.
Here are some examples for TUMBLE
, HOP
and CUMULATE
window
aggregations.
DESCRIBE gaming_player_activity_source;
+--------------+-----------+----------+---------------+
| Column Name | Data Type | Nullable | Extras |
+--------------+-----------+----------+---------------+
| key | BYTES | NULL | PARTITION KEY |
| player_id | INT | NOT NULL | |
| game_room_id | INT | NOT NULL | |
| points | INT | NOT NULL | |
| coordinates | STRING | NOT NULL | |
+--------------+-----------+----------+---------------+
SELECT * FROM gaming_player_activity_source;
player_id game_room_id points coordinates
1051 1144 371 [65,36]
1079 3451 38 [20,71]
1017 4177 419 [63,05]
1092 1801 209 [31,67]
1074 3013 401 [32,69]
1003 1038 284 [18,32]
1081 2265 196 [78,68]
-- apply aggregation on the tumbling windowed table
SELECT window_start, window_end, SUM(points) as `sum`
FROM TABLE(
TUMBLE(TABLE gaming_player_activity_source, DESCRIPTOR($rowtime), INTERVAL '10' MINUTES))
GROUP BY window_start, window_end;
window_start window_end sum
2023-11-02 10:40 2023-11-02 10:50 734
2023-11-02 10:50 2023-11-02 11:00 2398
2023-11-02 11:00 2023-11-02 11:10 2388
2023-11-02 11:10 2023-11-02 11:20 2391
2023-11-02 11:20 2023-11-02 11:30 2434
2023-11-02 11:30 2023-11-02 11:40 2374
-- apply aggregation on the hopping windowed table
SELECT window_start, window_end, SUM(points) as `sum`
FROM TABLE(
HOP(TABLE gaming_player_activity_source, DESCRIPTOR($rowtime), INTERVAL '5' MINUTES, INTERVAL '10' MINUTES))
GROUP BY window_start, window_end;
window_start window_end sum
2023-11-02 11:10:00.000 2023-11-02 11:20:00.000 612161
2023-11-02 11:15:00.000 2023-11-02 11:25:00.000 612540
2023-11-02 11:20:00.000 2023-11-02 11:30:00.000 625610
2023-11-02 11:25:00.000 2023-11-02 11:35:00.000 610675
2023-11-02 11:30:00.000 2023-11-02 11:40:00.000 604854
2023-11-02 11:35:00.000 2023-11-02 11:45:00.000 612940
-- apply aggregation on the cumulating windowed table
SELECT window_start, window_end, SUM(points) as `sum`
FROM TABLE(
CUMULATE(TABLE gaming_player_activity_source, DESCRIPTOR($rowtime), INTERVAL '2' MINUTES, INTERVAL '10' MINUTES))
GROUP BY window_start, window_end;
window_start window_end sum
2023-11-02 12:40:00.000 2023-11-02 12:46:00.000 364387
2023-11-02 12:40:00.000 2023-11-02 12:48:00.000 482213
2023-11-02 12:40:00.000 2023-11-02 12:50:00.000 604723
2023-11-02 12:50:00.000 2023-11-02 12:52:00.000 122034
2023-11-02 12:50:00.000 2023-11-02 12:54:00.000 240866
2023-11-02 12:50:00.000 2023-11-02 12:56:00.000 368831
2023-11-02 12:50:00.000 2023-11-02 12:58:00.000 483390
2023-11-02 12:50:00.000 2023-11-02 13:00:00.000 600559
GROUPING SETS¶
Window aggregations also support GROUPING SETS
syntax. Grouping sets
allow for more complex grouping operations than those describable by a
standard GROUP BY
. Rows are grouped separately by each specified
grouping set and aggregates are computed for each group just as for
simple GROUP BY
clauses.
Window aggregations with GROUPING SETS
require both the
window_start
and window_end
columns have to be in the
GROUP BY
clause, but not in the GROUPING SETS
clause.
SELECT window_start, window_end, player_id, SUM(points) as `sum`
FROM TABLE(
TUMBLE(TABLE gaming_player_activity_source, DESCRIPTOR($rowtime), INTERVAL '10' MINUTES))
GROUP BY window_start, window_end, GROUPING SETS ((player_id), ());
window_start window_end player_id sum
2023-11-03 11:20 2023-11-03 11:30 (NULL) 6596
2023-11-03 11:20 2023-11-03 11:30 1025 6232
2023-11-03 11:20 2023-11-03 11:30 1007 4486
2023-11-03 11:30 2023-11-03 11:40 (NULL) 6073
2023-11-03 11:30 2023-11-03 11:40 1025 6953
2023-11-03 11:30 2023-11-03 11:40 1007 3723
Each sublist of GROUPING SETS
may specify zero or more columns or
expressions and is interpreted the same way as though used directly in
the GROUP BY
clause. An empty grouping set means that all rows are
aggregated down to a single group, which is output even if no input rows
were present.
References to the grouping columns or expressions are replaced by null values in result rows for grouping sets in which those columns do not appear.
ROLLUP¶
ROLLUP
is a shorthand notation for specifying a common type of
grouping set. It represents the given list of expressions and all
prefixes of the list, including the empty list.
Window aggregations with ROLLUP
requires both the window_start
and window_end
columns have to be in the GROUP BY
clause, but
not in the ROLLUP
clause.
For example, the following query is equivalent to the one above.
SELECT window_start, window_end, player_id, SUM(points) as `sum`
FROM TABLE(
TUMBLE(TABLE gaming_player_activity_source, DESCRIPTOR($rowtime), INTERVAL '10' MINUTES))
GROUP BY window_start, window_end, ROLLUP (player_id);
CUBE¶
CUBE
is a shorthand notation for specifying a common type of
grouping set. It represents the given list and all of its possible
subsets - the power set.
Window aggregations with CUBE
requires both the window_start
and
window_end
columns have to be in the GROUP BY
clause, but not in
the CUBE
clause.
For example, the following two queries are equivalent.
SELECT window_start, window_end, game_room_id, player_id, SUM(points) as `sum`
FROM TABLE(
TUMBLE(TABLE gaming_player_activity_source, DESCRIPTOR($rowtime), INTERVAL '10' MINUTES))
GROUP BY window_start, window_end, CUBE (player_id, game_room_id);
SELECT window_start, window_end, game_room_id, player_id, SUM(points) as `sum`
FROM TABLE(
TUMBLE(TABLE gaming_player_activity_source, DESCRIPTOR($rowtime), INTERVAL '10' MINUTES))
GROUP BY window_start, window_end, GROUPING SETS (
(player_id, game_room_id),
(player_id ),
( game_room_id),
( )
);
Selecting Group Window Start and End Timestamps¶
The start and end timestamps of group windows can be selected with the
grouped window_start
and window_end
columns.
Cascading Window Aggregation¶
The window_start
and window_end
columns are regular timestamp
columns, not time attributes, so they can’t be used as time attributes
in subsequent time-based operations. To propagate time attributes, you also
need to add window_time
column into GROUP BY
clause. The window_time
is the third column produced by Windowing TVFs,
which is a time attribute of the assigned window.
Adding window_time
into a GROUP BY
clause makes window_time
also to be a group key that can be selected. Following queries can use
this column for subsequent time-based operations, like cascading window
aggregations and Window TopN.
The following code shows a cascading window aggregation in which the first window aggregation propagates the time attribute for the second window aggregation.
-- tumbling 5 minutes for each player_id
WITH fiveminutewindow AS (
-- Note: The window start and window end fields of inner Window TVF
-- are optional in the SELECT clause. But if they appear in the clause,
-- they must be aliased to prevent name conflicts with the window start
-- and window end of the outer Window TVF.
SELECT window_start AS window_5mintumble_start, window_end as window_5mintumble_end, window_time AS rowtime, SUM(points) as `partial_sum`
FROM TABLE(
TUMBLE(TABLE gaming_player_activity_source, DESCRIPTOR($rowtime), INTERVAL '5' MINUTES))
GROUP BY player_id, window_start, window_end, window_time
)
-- tumbling 10 minutes on the first window
SELECT window_start, window_end, SUM(partial_price) as total_price
FROM TABLE(
TUMBLE(TABLE fiveminutewindow, DESCRIPTOR($rowtime), INTERVAL '10' MINUTES))
GROUP BY window_start, window_end;