Window Aggregation Queries in Confluent Cloud for Apache Flink

Confluent Cloud for Apache Flink®️ enables aggregating data over windows in a table.

Syntax

SELECT ...
FROM <windowed_table> -- relation applied windowing TVF
GROUP BY window_start, window_end, ...

Description

Window TVF Aggregation

Window aggregations are defined in the GROUP BY clause containing “window_start” and “window_end” columns of the relation applied Windowing TVF.

Just like queries with regular GROUP BY clauses, queries with a group by window aggregation compute a single result row per group.

Unlike other aggregations on continuous tables, window aggregations do not emit intermediate results but only a final result: the total aggregation at the end of the window. Moreover, window aggregations purge all intermediate state when they’re no longer needed.

Windowing TVFs

Flink supports TUMBLE, HOP and CUMULATE types of window aggregations. In streaming mode, the time attribute field of a window table-valued function must be on either event or processing time attributes.

For more information, see Windowing TVF. In batch mode, the time attribute field of a window table-valued function must be an attribute of type TIMESTAMP or TIMESTAMP_LTZ.

Examples

The following examples show Window aggregations over mock data produced by the Datagen Source Connector configured with the Gaming Player Activity quickstart.

Note

To show the behavior of windowing more clearly in the following examples, TIMESTAMP(3) values may be simplified so that trailing zeroes aren’t shown. For example, 2020-04-15 08:05:00.000 may be shown as 2020-04-15 08:05. Columns may be hidden intentionally to enhance the readability of the content.

Here are some examples for TUMBLE, HOP and CUMULATE window aggregations.

DESCRIBE gaming_player_activity_source;
+--------------+-----------+----------+---------------+
| Column Name  | Data Type | Nullable |    Extras     |
+--------------+-----------+----------+---------------+
| key          | BYTES     | NULL     | PARTITION KEY |
| player_id    | INT       | NOT NULL |               |
| game_room_id | INT       | NOT NULL |               |
| points       | INT       | NOT NULL |               |
| coordinates  | STRING    | NOT NULL |               |
+--------------+-----------+----------+---------------+
SELECT * FROM gaming_player_activity_source;
player_id game_room_id points coordinates
1051      1144         371    [65,36]
1079      3451         38     [20,71]
1017      4177         419    [63,05]
1092      1801         209    [31,67]
1074      3013         401    [32,69]
1003      1038         284    [18,32]
1081      2265         196    [78,68]
-- apply aggregation on the tumbling windowed table
SELECT window_start, window_end, SUM(points) as `sum`
  FROM TABLE(
    TUMBLE(TABLE gaming_player_activity_source, DESCRIPTOR($rowtime), INTERVAL '10' MINUTES))
  GROUP BY window_start, window_end;
window_start     window_end       sum
2023-11-02 10:40 2023-11-02 10:50 734
2023-11-02 10:50 2023-11-02 11:00 2398
2023-11-02 11:00 2023-11-02 11:10 2388
2023-11-02 11:10 2023-11-02 11:20 2391
2023-11-02 11:20 2023-11-02 11:30 2434
2023-11-02 11:30 2023-11-02 11:40 2374
-- apply aggregation on the hopping windowed table
SELECT window_start, window_end, SUM(points) as `sum`
  FROM TABLE(
    HOP(TABLE gaming_player_activity_source, DESCRIPTOR($rowtime), INTERVAL '5' MINUTES, INTERVAL '10' MINUTES))
  GROUP BY window_start, window_end;
window_start            window_end              sum
2023-11-02 11:10:00.000 2023-11-02 11:20:00.000 612161
2023-11-02 11:15:00.000 2023-11-02 11:25:00.000 612540
2023-11-02 11:20:00.000 2023-11-02 11:30:00.000 625610
2023-11-02 11:25:00.000 2023-11-02 11:35:00.000 610675
2023-11-02 11:30:00.000 2023-11-02 11:40:00.000 604854
2023-11-02 11:35:00.000 2023-11-02 11:45:00.000 612940
-- apply aggregation on the cumulating windowed table
SELECT window_start, window_end, SUM(points) as `sum`
  FROM TABLE(
    CUMULATE(TABLE gaming_player_activity_source, DESCRIPTOR($rowtime), INTERVAL '2' MINUTES, INTERVAL '10' MINUTES))
  GROUP BY window_start, window_end;
window_start            window_end              sum
2023-11-02 12:40:00.000 2023-11-02 12:46:00.000 364387
2023-11-02 12:40:00.000 2023-11-02 12:48:00.000 482213
2023-11-02 12:40:00.000 2023-11-02 12:50:00.000 604723
2023-11-02 12:50:00.000 2023-11-02 12:52:00.000 122034
2023-11-02 12:50:00.000 2023-11-02 12:54:00.000 240866
2023-11-02 12:50:00.000 2023-11-02 12:56:00.000 368831
2023-11-02 12:50:00.000 2023-11-02 12:58:00.000 483390
2023-11-02 12:50:00.000 2023-11-02 13:00:00.000 600559

GROUPING SETS

Window aggregations also support GROUPING SETS syntax. Grouping sets allow for more complex grouping operations than those describable by a standard GROUP BY. Rows are grouped separately by each specified grouping set and aggregates are computed for each group just as for simple GROUP BY clauses.

Window aggregations with GROUPING SETS require both the window_start and window_end columns have to be in the GROUP BY clause, but not in the GROUPING SETS clause.

SELECT window_start, window_end, player_id, SUM(points) as `sum`
  FROM TABLE(
    TUMBLE(TABLE gaming_player_activity_source, DESCRIPTOR($rowtime), INTERVAL '10' MINUTES))
  GROUP BY window_start, window_end, GROUPING SETS ((player_id), ());
window_start     window_end       player_id sum
2023-11-03 11:20 2023-11-03 11:30 (NULL)    6596
2023-11-03 11:20 2023-11-03 11:30 1025      6232
2023-11-03 11:20 2023-11-03 11:30 1007      4486
2023-11-03 11:30 2023-11-03 11:40 (NULL)    6073
2023-11-03 11:30 2023-11-03 11:40 1025      6953
2023-11-03 11:30 2023-11-03 11:40 1007      3723

Each sublist of GROUPING SETS may specify zero or more columns or expressions and is interpreted the same way as though used directly in the GROUP BY clause. An empty grouping set means that all rows are aggregated down to a single group, which is output even if no input rows were present.

References to the grouping columns or expressions are replaced by null values in result rows for grouping sets in which those columns do not appear.

ROLLUP

ROLLUP is a shorthand notation for specifying a common type of grouping set. It represents the given list of expressions and all prefixes of the list, including the empty list.

Window aggregations with ROLLUP requires both the window_start and window_end columns have to be in the GROUP BY clause, but not in the ROLLUP clause.

For example, the following query is equivalent to the one above.

SELECT window_start, window_end, player_id, SUM(points) as `sum`
    FROM TABLE(
      TUMBLE(TABLE gaming_player_activity_source, DESCRIPTOR($rowtime), INTERVAL '10' MINUTES))
    GROUP BY window_start, window_end, ROLLUP (player_id);

CUBE

CUBE is a shorthand notation for specifying a common type of grouping set. It represents the given list and all of its possible subsets - the power set.

Window aggregations with CUBE requires both the window_start and window_end columns have to be in the GROUP BY clause, but not in the CUBE clause.

For example, the following two queries are equivalent.

SELECT window_start, window_end, game_room_id, player_id, SUM(points) as `sum`
   FROM TABLE(
     TUMBLE(TABLE gaming_player_activity_source, DESCRIPTOR($rowtime), INTERVAL '10' MINUTES))
   GROUP BY window_start, window_end, CUBE (player_id, game_room_id);

SELECT window_start, window_end, game_room_id, player_id, SUM(points) as `sum`
   FROM TABLE(
     TUMBLE(TABLE gaming_player_activity_source, DESCRIPTOR($rowtime), INTERVAL '10' MINUTES))
   GROUP BY window_start, window_end, GROUPING SETS (
            (player_id, game_room_id),
            (player_id              ),
            (           game_room_id),
            (                 )
      );

Selecting Group Window Start and End Timestamps

The start and end timestamps of group windows can be selected with the grouped window_start and window_end columns.

Cascading Window Aggregation

The window_start and window_end columns are regular timestamp columns, not time attributes, so they can’t be used as time attributes in subsequent time-based operations. To propagate time attributes, you also need to add window_time column into GROUP BY clause. The window_time is the third column produced by Windowing TVFs, which is a time attribute of the assigned window.

Adding window_time into a GROUP BY clause makes window_time also to be a group key that can be selected. Following queries can use this column for subsequent time-based operations, like cascading window aggregations and Window TopN.

The following code shows a cascading window aggregation in which the first window aggregation propagates the time attribute for the second window aggregation.

-- tumbling 5 minutes for each player_id
WITH fiveminutewindow AS (
-- Note: The window start and window end fields of inner Window TVF
-- are optional in the SELECT clause. But if they appear in the clause,
-- they must be aliased to prevent name conflicts with the window start
-- and window end of the outer Window TVF.
SELECT window_start AS window_5mintumble_start, window_end as window_5mintumble_end, window_time AS rowtime, SUM(points) as `partial_sum`
  FROM TABLE(
    TUMBLE(TABLE gaming_player_activity_source, DESCRIPTOR($rowtime), INTERVAL '5' MINUTES))
  GROUP BY player_id, window_start, window_end, window_time
)
-- tumbling 10 minutes on the first window
SELECT window_start, window_end, SUM(partial_price) as total_price
  FROM TABLE(
      TUMBLE(TABLE fiveminutewindow, DESCRIPTOR($rowtime), INTERVAL '10' MINUTES))
  GROUP BY window_start, window_end;