Window Top-N Queries

Confluent Cloud for Apache Flink®️ enables Window Top-N queries in dynamic tables.

Syntax

SELECT [column_list]
FROM (
   SELECT [column_list],
     ROW_NUMBER() OVER (PARTITION BY window_start, window_end [, col_key1...]
       ORDER BY col1 [asc|desc][, col2 [asc|desc]...]) AS rownum
   FROM table_name) -- relation applied windowing TVF
WHERE rownum <= N [AND conditions]

Description

Window Top-N is a special Top-N that returns the N smallest or largest values for each window and other partitioned keys.

For streaming queries, unlike regular Top-N on continuous tables, Window Top-N doesn’t emit intermediate results, but only a final result, the total Top N records at the end of the window. Moreover, Window Top-N purges all intermediate state when no longer needed, so Window Top-N queries have better performance if you don’t need results updated per record.

Usually, Window Top-N is used with Windowing TVF directly, but Window Top-N can be used with other operations based on Windowing TVF, like Window Aggregation, and Window Join.

You can define Window Top-N with the same syntax as regular Top-N. For more information, see Top-N.

In addition, Window Top-N requires that the PARTITION BY clause contains window_start and window_end columns of the relation applied by Windowing TVF or Window Aggregation. Otherwise, the optimizer can’t translate the query.

Examples

The following examples show Window Top-N aggregations over mock data produced by the Datagen Source Connector configured with the Gaming Player Activity quickstart.

Note

To show the behavior of windowing more clearly in the following examples, TIMESTAMP(3) values may be simplified so that trailing zeroes aren’t shown. For example, 2020-04-15 08:05:00.000 may be shown as 2020-04-15 08:05. Columns may be hidden intentionally to enhance the readability of the content.

Window Top-N follows after Window Aggregation

The following example shows how to calculate Top 3 players who have the highest points for every tumbling 10 minutes window.

DESCRIBE gaming_player_activity_source;
+--------------+-----------+----------+---------------+
| Column Name  | Data Type | Nullable |    Extras     |
+--------------+-----------+----------+---------------+
| key          | BYTES     | NULL     | PARTITION KEY |
| player_id    | INT       | NOT NULL |               |
| game_room_id | INT       | NOT NULL |               |
| points       | INT       | NOT NULL |               |
| coordinates  | STRING    | NOT NULL |               |
+--------------+-----------+----------+---------------+
SELECT * FROM gaming_player_activity_source;
player_id game_room_id points coordinates
1051      1144         371    [65,36]
1079      3451         38     [20,71]
1017      4177         419    [63,05]
1092      1801         209    [31,67]
1074      3013         401    [32,69]
1003      1038         284    [18,32]
1081      2265         196    [78,68]
SELECT *
  FROM (
    SELECT *, ROW_NUMBER() OVER (PARTITION BY window_start, window_end ORDER BY points DESC) as rownum
    FROM (
      SELECT window_start, window_end, player_id, SUM(points) as points, COUNT(*) as cnt
      FROM TABLE(
        TUMBLE(TABLE gaming_player_activity_source, DESCRIPTOR($rowtime), INTERVAL '10' MINUTES))
      GROUP BY window_start, window_end, player_id
    )
  ) WHERE rownum <= 3;
window_start     window_end       player_id points cnt rownum
2023-11-02 17:50 2023-11-02 18:00 1078      9313   34  1
2023-11-02 17:50 2023-11-02 18:00 1086      9226   28  2
2023-11-02 17:50 2023-11-02 18:00 1007      8883   32  3
2023-11-02 18:00 2023-11-02 18:10 1079      9162   27  1
2023-11-02 18:00 2023-11-02 18:10 1087      8571   33  2
2023-11-02 18:00 2023-11-02 18:10 1026      8315   30  3

Window Top-N follows after Windowing TVF

The following example shows how to calculate Top 3 game rooms which have the highest points for every tumbling 10 minutes window.

SELECT *
  FROM (
    SELECT $rowtime, points, game_room_id, player_id, window_start, window_end, ROW_NUMBER() OVER (PARTITION BY window_start, window_end ORDER BY points DESC) as rownum
    FROM TABLE(
                TUMBLE(TABLE gaming_player_activity_source, DESCRIPTOR($rowtime), INTERVAL '10' MINUTES))
  ) WHERE rownum <= 3;
$rowtime                points game_room_id player_id window_start     window_end       rownum
2023-11-05 19:38:55.592 499    3484         1012      2023-11-05 19:30 2023-11-05 19:40 1
2023-11-05 19:32:25.521 499    4754         1059      2023-11-05 19:30 2023-11-05 19:40 2
2023-11-05 19:32:59.130 499    1334         1002      2023-11-05 19:30 2023-11-05 19:40 3
2023-11-05 19:42:26.568 499    1163         1090      2023-11-05 19:40 2023-11-05 19:50 1
2023-11-05 19:47:07.202 499    2418         1002      2023-11-05 19:40 2023-11-05 19:50 2
2023-11-05 19:42:47.027 499    4881         1019      2023-11-05 19:40 2023-11-05 19:50 3

Limitations

  • Flink SQL only supports Window Top-N follows after Windowing TVF with TUMBLE, HOP, and CUMULATE windows.
  • Window Top-N follows after Windowing TVF with SESSION windows aren’t supported.