Window Top-N Queries in Confluent Cloud for Apache Flink¶
Confluent Cloud for Apache Flink® enables Window Top-N queries in dynamic tables.
Syntax¶
SELECT [column_list]
FROM (
SELECT [column_list],
ROW_NUMBER() OVER (PARTITION BY window_start, window_end [, col_key1...]
ORDER BY col1 [asc|desc][, col2 [asc|desc]...]) AS rownum
FROM table_name) -- relation applied windowing TVF
WHERE rownum <= N [AND conditions]
Description¶
Window Top-N is a special Top-N that returns the N smallest or largest values for each window and other partitioned keys.
For streaming queries, unlike regular Top-N on continuous tables, Window Top-N doesn’t emit intermediate results, but only a final result, the total Top N records at the end of the window. Moreover, Window Top-N purges all intermediate state when no longer needed, so Window Top-N queries have better performance if you don’t need results updated per record.
Usually, Window Top-N is used with Windowing TVF directly, but Window Top-N can be used with other operations based on Windowing TVF, like Window Aggregation, and Window Join.
You can define Window Top-N with the same syntax as regular Top-N. For more information, see Top-N.
In addition, Window Top-N requires that the PARTITION BY
clause contains
window_start
and window_end
columns of the relation applied by
Windowing TVF or Window Aggregation.
Otherwise, the optimizer can’t translate the query.
Examples¶
The following examples show Window Top-N aggregations over example data streams that you can experiment with.
Note
To show the behavior of windowing more clearly in the following examples,
TIMESTAMP(3)
values may be simplified so that trailing zeroes aren’t shown.
For example, 2020-04-15 08:05:00.000
may be shown as 2020-04-15 08:05
.
Columns may be hidden intentionally to enhance the readability of the content.
Window Top-N follows after Window Aggregation¶
The following example shows how to calculate Top 3 customers who have the highest order value for every tumbling 10 minutes window.
DESCRIBE `examples`.`marketplace`.`orders`;
+--------------+-----------+----------+---------------+
| Column Name | Data Type | Nullable | Extras |
+--------------+-----------+----------+---------------+
| order_id | STRING | NOT NULL | |
| customer_id | INT | NOT NULL | |
| product_id | STRING | NOT NULL | |
| price | DOUBLE | NOT NULL | |
+--------------+-----------+----------+---------------+
SELECT * FROM `examples`.`marketplace`.`orders`;
order_id customer_id product_id price
d770a538-a70c-4de6-9d06-e6c16c5bef5a 3075 1379 32.21
787ee1f4-d0d0-4c39-bdb9-44dc2d203d55 3028 1335 34.74
7ab7ce23-5f61-4398-afad-b1e3f548fee3 3148 1045 69.26
6fea712c-9454-497e-8038-ebaf6dfc7a17 3247 1390 67.26
dc9daf5e-98d5-4bcd-8839-251fed13b75e 3167 1309 12.04
ab3151d0-2950-49cd-9783-016ccc6a3281 3105 1094 21.52
d27ca945-3cff-48a4-afcc-7b17446aa95d 3168 1250 99.95
SELECT *
FROM (
SELECT *, ROW_NUMBER() OVER (PARTITION BY window_start, window_end ORDER BY price DESC) as rownum
FROM (
SELECT window_start, window_end, customer_id, SUM(price) as price, COUNT(*) as cnt
FROM TABLE(
TUMBLE(TABLE `examples`.`marketplace`.`orders`, DESCRIPTOR($rowtime), INTERVAL '10' MINUTES))
GROUP BY window_start, window_end, customer_id
)
) WHERE rownum <= 3;
window_start window_end customer_id price cnt rownum
2023-11-02 17:50 2023-11-02 18:00 3084 1523.75 18 1
2023-11-02 17:50 2023-11-02 18:00 3092 1487.32 15 2
2023-11-02 17:50 2023-11-02 18:00 3082 1452.18 17 3
2023-11-02 18:00 2023-11-02 18:10 3095 1698.50 20 1
2023-11-02 18:00 2023-11-02 18:10 3088 1645.23 19 2
2023-11-02 18:00 2023-11-02 18:10 3079 1589.75 16 3
Window Top-N follows after Windowing TVF¶
The following example shows how to calculate Top 3 customers which have the highest order value for every tumbling 10 minutes window.
SELECT *
FROM (
SELECT $rowtime, price, product_id, customer_id, window_start, window_end, ROW_NUMBER() OVER (PARTITION BY window_start, window_end ORDER BY price DESC) as rownum
FROM TABLE(
TUMBLE(TABLE `examples`.`marketplace`.`orders`, DESCRIPTOR($rowtime), INTERVAL '10' MINUTES))
) WHERE rownum <= 3;
$rowtime price product_id customer_id window_start window_end rownum
2023-11-05 19:35:38 99.53 1382 3120 2023-11-05 19:30 2023-11-05 19:40 1
2023-11-05 19:35:39 99.04 1216 3204 2023-11-05 19:30 2023-11-05 19:40 2
2023-11-05 19:35:32 98.95 1364 3114 2023-11-05 19:30 2023-11-05 19:40 3
2023-11-05 19:42:41 97.75 1295 3187 2023-11-05 19:40 2023-11-05 19:50 1
2023-11-05 19:41:53 97.30 1428 3256 2023-11-05 19:40 2023-11-05 19:50 2
2023-11-05 19:43:17 96.80 1173 3092 2023-11-05 19:40 2023-11-05 19:50 3