Window Top-N Queries¶
Important
Confluent Cloud for Apache Flink®️ is currently available for Preview. A Preview feature is a Confluent Cloud component that is being introduced to gain early feedback from developers. Preview features can be used for evaluation and non-production testing purposes or to provide feedback to Confluent. The warranty, SLA, and Support Services provisions of your agreement with Confluent do not apply to Preview features. Confluent may discontinue providing Preview releases of the Preview features at any time in Confluent’s sole discretion. Check out Getting Help for questions, feedback and requests.
For Flink SQL features and limitations in the preview program, see Notable Limitations in Public Preview.
Confluent Cloud for Apache Flink®️ enables Window Top-N queries in dynamic tables.
Syntax¶
SELECT [column_list]
FROM (
SELECT [column_list],
ROW_NUMBER() OVER (PARTITION BY window_start, window_end [, col_key1...]
ORDER BY col1 [asc|desc][, col2 [asc|desc]...]) AS rownum
FROM table_name) -- relation applied windowing TVF
WHERE rownum <= N [AND conditions]
Description¶
Window Top-N is a special Top-N that returns the N smallest or largest values for each window and other partitioned keys.
For streaming queries, unlike regular Top-N on continuous tables, Window Top-N doesn’t emit intermediate results, but only a final result, the total Top N records at the end of the window. Moreover, Window Top-N purges all intermediate state when no longer needed, so Window Top-N queries have better performance if you don’t need results updated per record.
Usually, Window Top-N is used with Windowing TVF directly, but Window Top-N can be used with other operations based on Windowing TVF, like Window Aggregation, and Window Join.
You can define Window Top-N with the same syntax as regular Top-N. For more information, see Top-N.
In addition, Window Top-N requires that the PARTITION BY
clause contains
window_start
and window_end
columns of the relation applied by
Windowing TVF or Window Aggregation.
Otherwise, the optimizer can’t translate the query.
Examples¶
The following examples show Window Top-N aggregations over mock data produced by the Datagen Source Connector configured with the Gaming Player Activity quickstart.
Note
To show the behavior of windowing more clearly in the following examples,
TIMESTAMP(3)
values may be simplified so that trailing zeroes aren’t shown.
For example, 2020-04-15 08:05:00.000
may be shown as 2020-04-15 08:05
.
Columns may be hidden intentionally to enhance the readability of the content.
Window Top-N follows after Window Aggregation¶
The following example shows how to calculate Top 3 players who have the highest points for every tumbling 10 minutes window.
DESCRIBE gaming_player_activity_source;
+--------------+-----------+----------+---------------+
| Column Name | Data Type | Nullable | Extras |
+--------------+-----------+----------+---------------+
| key | BYTES | NULL | PARTITION KEY |
| player_id | INT | NOT NULL | |
| game_room_id | INT | NOT NULL | |
| points | INT | NOT NULL | |
| coordinates | STRING | NOT NULL | |
+--------------+-----------+----------+---------------+
SELECT * FROM gaming_player_activity_source;
player_id game_room_id points coordinates
1051 1144 371 [65,36]
1079 3451 38 [20,71]
1017 4177 419 [63,05]
1092 1801 209 [31,67]
1074 3013 401 [32,69]
1003 1038 284 [18,32]
1081 2265 196 [78,68]
SELECT *
FROM (
SELECT *, ROW_NUMBER() OVER (PARTITION BY window_start, window_end ORDER BY points DESC) as rownum
FROM (
SELECT window_start, window_end, player_id, SUM(points) as points, COUNT(*) as cnt
FROM TABLE(
TUMBLE(TABLE gaming_player_activity_source, DESCRIPTOR($rowtime), INTERVAL '10' MINUTES))
GROUP BY window_start, window_end, player_id
)
) WHERE rownum <= 3;
window_start window_end player_id points cnt rownum
2023-11-02 17:50 2023-11-02 18:00 1078 9313 34 1
2023-11-02 17:50 2023-11-02 18:00 1086 9226 28 2
2023-11-02 17:50 2023-11-02 18:00 1007 8883 32 3
2023-11-02 18:00 2023-11-02 18:10 1079 9162 27 1
2023-11-02 18:00 2023-11-02 18:10 1087 8571 33 2
2023-11-02 18:00 2023-11-02 18:10 1026 8315 30 3
Window Top-N follows after Windowing TVF¶
The following example shows how to calculate Top 3 game rooms which have the highest points for every tumbling 10 minutes window.
SELECT *
FROM (
SELECT $rowtime, points, game_room_id, player_id, window_start, window_end, ROW_NUMBER() OVER (PARTITION BY window_start, window_end ORDER BY points DESC) as rownum
FROM TABLE(
TUMBLE(TABLE gaming_player_activity_source, DESCRIPTOR($rowtime), INTERVAL '10' MINUTES))
) WHERE rownum <= 3;
$rowtime points game_room_id player_id window_start window_end rownum
2023-11-05 19:38:55.592 499 3484 1012 2023-11-05 19:30 2023-11-05 19:40 1
2023-11-05 19:32:25.521 499 4754 1059 2023-11-05 19:30 2023-11-05 19:40 2
2023-11-05 19:32:59.130 499 1334 1002 2023-11-05 19:30 2023-11-05 19:40 3
2023-11-05 19:42:26.568 499 1163 1090 2023-11-05 19:40 2023-11-05 19:50 1
2023-11-05 19:47:07.202 499 2418 1002 2023-11-05 19:40 2023-11-05 19:50 2
2023-11-05 19:42:47.027 499 4881 1019 2023-11-05 19:40 2023-11-05 19:50 3
Limitations¶
- Flink SQL only supports Window Top-N follows after Windowing TVF with TUMBLE, HOP, and CUMULATE windows.
- Window Top-N follows after Windowing TVF with SESSION windows aren’t supported.