Window Deduplication Queries¶
Important
Confluent Cloud for Apache Flink®️ is currently available for Preview. A Preview feature is a Confluent Cloud component that is being introduced to gain early feedback from developers. Preview features can be used for evaluation and non-production testing purposes or to provide feedback to Confluent. The warranty, SLA, and Support Services provisions of your agreement with Confluent do not apply to Preview features. Confluent may discontinue providing Preview releases of the Preview features at any time in Confluent’s sole discretion. Check out Getting Help for questions, feedback and requests.
For SQL features and limitations in the preview program, see Notable Limitations in Public Preview.
Confluent Cloud for Apache Flink®️ enables removing duplicate rows over a set of columns in a windowed table.
Syntax¶
SELECT [column_list]
FROM (
SELECT [column_list],
ROW_NUMBER() OVER (PARTITION BY window_start, window_end [, column_key1...]
ORDER BY time_attr [asc|desc]) AS rownum
FROM table_name) -- relation applied windowing TVF
WHERE (rownum = 1 | rownum <=1 | rownum < 2) [AND conditions]
Parameter Specification
Note
This query pattern must be followed exactly, otherwise, the optimizer won’t translate the query to Window Deduplication.
ROW_NUMBER()
: Assigns an unique, sequential number to each row, starting with one.PARTITION BY window_start, window_end [, column_key1...]
: Specifies the partition columns which containwindow_start
,window_end
and other partition keys.ORDER BY time_attr [asc|desc]
: Specifies the ordering column, which must be a time attribute. SQL supports the processing time attribute and the event time attribute. Ordering by ASC means keeping the first row, ordering by DESC means keeping the last row.WHERE (rownum = 1 | rownum <=1 | rownum < 2)
: Therownum = 1 | rownum <=1 | rownum < 2
is required for the optimizer to recognize the query should be translated to Window Deduplication.
Description¶
Window Deduplication is a special deduplication that removes duplicate rows over a set of columns, keeping the first row or the last row for each window and partitioned keys.
For streaming queries, unlike regular deduplicate on continuous tables, Window Deduplication doesn’t emit intermediate results, instead emitting only a final result at the end of the window. Also, window Deduplication purges all intermediate state when it’s no longer needed. As a result, Window Deduplication queries have better performance, if you don’t need results updated per row.
Usually, Window Deduplication is used with Windowing TVF directly. Window Deduplication can be used with other operations based on Windowing TVF, like Window Aggregation, Window TopN, and Window Join.
Window Deduplication can be defined in the same syntax as regular
Deduplication. For more information, see Deduplication Queries.
Window Deduplication requires that the PARTITION BY
clause contains
window_start
and window_end
columns of the relation, otherwise,
the optimizer can’t translate the query.
Flink uses ROW_NUMBER()
to remove duplicates, similar to its usage in
Top-N Queries. Deduplication is a special case of the Top-N query,
in which N
is one and order is by processing time or event time.
Example¶
The following example shows how to keep the last record for every 10-minute tumbling window.
The mock data is produced by the Datagen Source Connector configured with the Gaming Player Activity quickstart.
DESCRIBE gaming_player_activity_source;
+--------------+-----------+----------+---------------+
| Column Name | Data Type | Nullable | Extras |
+--------------+-----------+----------+---------------+
| key | BYTES | NULL | PARTITION KEY |
| player_id | INT | NOT NULL | |
| game_room_id | INT | NOT NULL | |
| points | INT | NOT NULL | |
| coordinates | STRING | NOT NULL | |
+--------------+-----------+----------+---------------+
SELECT * FROM gaming_player_activity_source;
player_id game_room_id points coordinates
1051 1144 371 [65,36]
1079 3451 38 [20,71]
1017 4177 419 [63,05]
1092 1801 209 [31,67]
1074 3013 401 [32,69]
1003 1038 284 [18,32]
1081 2265 196 [78,68]
SELECT *
FROM (
SELECT $rowtime, points, game_room_id, player_id, window_start, window_end,
ROW_NUMBER() OVER (PARTITION BY window_start, window_end ORDER BY $rowtime DESC) AS rownum
FROM TABLE(
TUMBLE(TABLE gaming_player_activity_source, DESCRIPTOR($rowtime), INTERVAL '10' MINUTES))
) WHERE rownum <= 1;
$rowtime points game_room_id player_id window_start window_end rownum
2023-11-03 19:59:59.407 371 2504 1094 2023-11-03 19:50 2023-11-03 20:00 1
2023-11-03 20:09:59.921 188 4342 1036 2023-11-03 20:00 2023-11-03 20:10 1
2023-11-03 20:19:59.741 128 3427 1046 2023-11-03 20:10 2023-11-03 20:20 1
2023-11-03 20:29:59.992 311 1000 1049 2023-11-03 20:20 2023-11-03 20:30 1
2023-11-03 20:39:59.569 429 1217 1062 2023-11-03 20:30 2023-11-03 20:40 1
Limitations¶
Limitation on Window Deduplication following Windowing TVFs directly¶
If Window Deduplication follows Windowing Table-Valued Functions (Windowing TVFs), the Windowing Table-Valued Functions (Windowing TVFs) must be with TUMBLE windows, HOP windows, or CUMULATE windows, instead of Session windows.
Limitation on time attribute of order key¶
Window Deduplication requires that the order key must be Event time instead of Processing time.