Window Join Queries in Confluent Cloud for Apache Flink¶
Confluent Cloud for Apache Flink® enables joining data over time windows in dynamic tables.
Syntax¶
The following shows the syntax of the INNER/LEFT/RIGHT/FULL OUTER Window Join statement.
SELECT ...
FROM L [LEFT|RIGHT|FULL OUTER] JOIN R -- L and R are relations applied windowing TVF
ON L.window_start = R.window_start AND L.window_end = R.window_end AND ...
Description¶
A window join adds the dimension of time into the join criteria themselves. In doing so, the window join joins the elements of two streams that share a common key and are in the same window.
For streaming queries, unlike other joins on continuous tables, window join does not emit intermediate results but only emits final results at the end of the window. Moreover, window join purge all intermediate state when no longer needed.
Usually, Window Join is used with Windowing TVF. Also, Window Join can follow after other operations based on Windowing TVF, like Window Aggregation, Window TopN.
Window Join requires that the join on condition contains window_starts
equality of input tables and window_ends
equality of input tables.
Window Join supports INNER/LEFT/RIGHT/FULL OUTER/ANTI/SEMI JOIN. The syntax is very similar for all of the different joins.
Examples¶
The following examples show Window joins over mock data produced by the Datagen Source Connector configured with the Gaming Player Activity quickstart.
Note
To show the behavior of windowing more clearly in the following examples,
TIMESTAMP(3)
values may be simplified so that trailing zeroes aren’t shown.
For example, 2020-04-15 08:05:00.000
may be shown as 2020-04-15 08:05
.
Columns may be hidden intentionally to enhance the readability of the content.
FULL OUTER JOIN¶
The following example shows a FULL OUTER JOIN, with a Window Join that works on a Tumble Window TVF.
When performing a window join, all elements with a common key and a common
tumbling window are joined together. By scoping the region of time for the
oin into fixed five-minute intervals, the datasets are chopped into two
distinct windows of time: [12:00, 12:05)
and [12:05, 12:10)
. The L2 and
R2 rows don’t join together because they fall into separate windows.
describe LeftTable;
+-------------+--------------+----------+--------+
| Column Name | Data Type | Nullable | Extras |
+-------------+--------------+----------+--------+
| row_time | TIMESTAMP(3) | NULL | |
| num | INT | NULL | |
| id | STRING | NULL | |
+-------------+--------------+----------+--------+
SELECT * FROM LeftTable;
row_time num id
2023-11-03 12:22:47.268 1 L1
2023-11-03 12:22:43.189 2 L2
2023-11-03 12:22:47.486 3 L3
describe RightTable;
+-------------+--------------+----------+--------+
| Column Name | Data Type | Nullable | Extras |
+-------------+--------------+----------+--------+
| row_time | TIMESTAMP(3) | NULL | |
| num | INT | NULL | |
| id | STRING | NULL | |
+-------------+--------------+----------+--------+
SELECT * FROM RightTable;
row_time num id
2023-11-03 12:23:22.045 2 R2
2023-11-03 12:23:16.437 3 R3
2023-11-03 12:23:18.349 4 R4
SELECT L.num as L_Num, L.id as L_Id, R.num as R_Num, R.id as R_Id,
COALESCE(L.window_start, R.window_start) as window_start,
COALESCE(L.window_end, R.window_end) as window_end
FROM (
SELECT * FROM TABLE(TUMBLE(TABLE LeftTable, DESCRIPTOR($rowtime), INTERVAL '5' MINUTES))
) L
FULL JOIN (
SELECT * FROM TABLE(TUMBLE(TABLE RightTable, DESCRIPTOR($rowtime), INTERVAL '5' MINUTES))
) R
ON L.num = R.num AND L.window_start = R.window_start AND L.window_end = R.window_end;
The output resembles:
L_Num L_Id R_Num R_Id window_start window_end
1 L1 NULL NULL 2023-11-03 13:20 2023-11-03 13:25
NULL NULL 2 R2 2023-11-03 13:20 2023-11-03 13:25
3 L3 3 R3 2023-11-03 13:20 2023-11-03 13:25
2 L2 NULL NULL 2023-11-03 13:25 2023-11-03 13:30
NULL NULL 4 R4 2023-11-03 13:25 2023-11-03 13:30
SEMI¶
Semi Window Joins return a row from one left record if there is at least one matching row on the right side within the common window.
SELECT *
FROM (
SELECT * FROM TABLE(TUMBLE(TABLE LeftTable, DESCRIPTOR($rowtime), INTERVAL '5' MINUTES))
) L WHERE L.num IN (
SELECT num FROM (
SELECT * FROM TABLE(TUMBLE(TABLE RightTable, DESCRIPTOR($rowtime), INTERVAL '5' MINUTES))
) R WHERE L.window_start = R.window_start AND L.window_end = R.window_end);
row_time num id window_start window_end window_time
2023-11-03 12:43:57.095 1 L3 2023-11-03 13:40 2023-11-03 13:45 2023-11-03 13:44:59.999
2023-11-03 12:43:54.914 1 L2 2023-11-03 13:40 2023-11-03 13:45 2023-11-03 13:44:59.999
2023-11-03 12:43:56.898 1 L1 2023-11-03 13:40 2023-11-03 13:45 2023-11-03 13:44:59.999
2023-11-03 12:43:59.112 1 L1 2023-11-03 13:40 2023-11-03 13:45 2023-11-03 13:44:59.999
2023-11-03 12:43:59.626 1 L5 2023-11-03 13:40 2023-11-03 13:45 2023-11-03 13:44:59.999
SELECT *
FROM (
SELECT * FROM TABLE(TUMBLE(TABLE LeftTable, DESCRIPTOR($rowtime), INTERVAL '5' MINUTES))
) L WHERE EXISTS (
SELECT * FROM (
SELECT * FROM TABLE(TUMBLE(TABLE RightTable, DESCRIPTOR($rowtime), INTERVAL '5' MINUTES))
) R WHERE L.num = R.num AND L.window_start = R.window_start AND L.window_end = R.window_end);
row_time num id window_start window_end window_time
2023-11-03 12:45:08.329 2 L4 2023-11-03 13:45 2023-11-03 13:50 2023-11-03 13:49:59.999
2023-11-03 12:45:06.702 2 L3 2023-11-03 13:45 2023-11-03 13:50 2023-11-03 13:49:59.999
2023-11-03 12:45:07.024 2 L4 2023-11-03 13:45 2023-11-03 13:50 2023-11-03 13:49:59.999
2023-11-03 12:45:05.581 2 L3 2023-11-03 13:45 2023-11-03 13:50 2023-11-03 13:49:59.999
ANTI¶
Anti Window Joins are the obverse of the Inner Window Join: they contain all of the unjoined rows within each common window.
SELECT *
FROM (
SELECT * FROM TABLE(TUMBLE(TABLE LeftTable, DESCRIPTOR($rowtime), INTERVAL '5' MINUTES))
) L WHERE L.num NOT IN (
SELECT num FROM (
SELECT * FROM TABLE(TUMBLE(TABLE RightTable, DESCRIPTOR($rowtime), INTERVAL '5' MINUTES))
) R WHERE L.window_start = R.window_start AND L.window_end = R.window_end);
row_time num id window_start window_end window_time
2023-11-03 12:23:42.865 1 L1 2023-11-03 13:20 2023-11-03 13:25 2023-11-03 13:24:59.999
2023-11-03 12:23:42.956 1 L5 2023-11-03 13:20 2023-11-03 13:25 2023-11-03 13:24:59.999
2023-11-03 12:23:41.029 2 L1 2023-11-03 13:20 2023-11-03 13:25 2023-11-03 13:24:59.999
2023-11-03 12:23:36.826 1 L1 2023-11-03 13:20 2023-11-03 13:25 2023-11-03 13:24:59.999
2023-11-03 12:23:36.435 1 L4 2023-11-03 13:20 2023-11-03 13:25 2023-11-03 13:24:59.999
SELECT *
FROM (
SELECT * FROM TABLE(TUMBLE(TABLE LeftTable, DESCRIPTOR($rowtime), INTERVAL '5' MINUTES))
) L WHERE NOT EXISTS (
SELECT * FROM (
SELECT * FROM TABLE(TUMBLE(TABLE RightTable, DESCRIPTOR($rowtime), INTERVAL '5' MINUTES))
) R WHERE L.num = R.num AND L.window_start = R.window_start AND L.window_end = R.window_end);
row_time num id window_start window_end window_time
2023-11-03 12:23:14.693 2 L1 2023-11-03 13:20 2023-11-03 13:25 2023-11-03 13:24:59.999
2023-11-03 12:23:19.174 2 L1 2023-11-03 13:20 2023-11-03 13:25 2023-11-03 13:24:59.999
2023-11-03 12:23:11.035 2 L1 2023-11-03 13:20 2023-11-03 13:25 2023-11-03 13:24:59.999
2023-11-03 12:23:11.764 2 L3 2023-11-03 13:20 2023-11-03 13:25 2023-11-03 13:24:59.999
2023-11-03 12:23:16.240 2 L5 2023-11-03 13:20 2023-11-03 13:25 2023-11-03 13:24:59.999
Limitations¶
Limitation on Join clause¶
Currently, the window join requires that the join-on condition contains window-starts equality of input tables and window-ends equality of input tables. In the future, the join on clause could be simplified to include only the window-start equality if the windowing TVF is TUMBLE or HOP.
Limitation on Windowing TVFs of inputs¶
Currently, the windowing TVFs must be the same for left and right inputs. This could be extended in the future, for example, tumbling windows join sliding windows with the same window size.