EXPLAIN Statement in Confluent Cloud for Apache Flink¶
Confluent Cloud for Apache Flink® enables viewing and analyzing the query plans of Flink SQL statements.
Syntax¶
EXPLAIN { <query_statement> | <insert_statement> | <statement_set> | CREATE TABLE ... AS SELECT ... }
<statement_set>:
STATEMENT SET
BEGIN
-- one or more INSERT INTO statements
{ INSERT INTO <select_statement>; }+
END;
Description¶
The EXPLAIN statement provides detailed information about how Flink executes a specified query or INSERT statement. EXPLAIN shows:
- The optimized physical execution plan
- If the changelog mode is not append-only, details about the changelog mode per operator
- Upsert keys and primary keys where applicable
- Table source and sink details
This information is valuable for understanding query performance, optimizing complex queries, and debugging unexpected results.
Example Queries¶
Basic Query Analysis¶
This example analyzes a query finding users who clicked but never placed an order:
EXPLAIN
SELECT c.*
FROM `examples`.`marketplace`.`clicks` c
LEFT JOIN (
SELECT DISTINCT customer_id
FROM `examples`.`marketplace`.`orders`
) o ON c.user_id = o.customer_id
WHERE o.customer_id IS NULL;
The output shows the physical plan and operator details:
== Physical Plan ==
StreamPhysicalSink [11]
+- StreamPhysicalCalc [10]
+- StreamPhysicalJoin [9]
+- StreamPhysicalExchange [3]
: +- StreamPhysicalCalc [2]
: +- StreamPhysicalTableSourceScan [1]
+- StreamPhysicalExchange [8]
+- StreamPhysicalGroupAggregate [7]
+- StreamPhysicalExchange [6]
+- StreamPhysicalCalc [5]
+- StreamPhysicalTableSourceScan [4]
== Physical Details ==
[1] StreamPhysicalTableSourceScan
Table: `examples`.`marketplace`.`clicks`
[4] StreamPhysicalTableSourceScan
Table: `examples`.`marketplace`.`orders`
[7] StreamPhysicalGroupAggregate
Changelog mode: retract
Upsert key: (customer_id)
[8] StreamPhysicalExchange
Changelog mode: retract
Upsert key: (customer_id)
[9] StreamPhysicalJoin
Changelog mode: retract
[10] StreamPhysicalCalc
Changelog mode: retract
[11] StreamPhysicalSink
Table: Foreground
Changelog mode: retract
Note that the [11] StreamPhysicalSink Table: Foreground
in the output indicates
this is a preview execution plan. For more accurate optimization analysis, it’s
recommended to test queries using either the final target table or CREATE TABLE AS
statements, which will determine the optimal primary key and changelog mode for
your specific use case.
Creating Tables¶
This example shows creating a new table from a query:
EXPLAIN
CREATE TABLE clicks_without_orders AS
SELECT c.*
FROM `examples`.`marketplace`.`clicks` c
LEFT JOIN (
SELECT DISTINCT customer_id
FROM `examples`.`marketplace`.`orders`
) o ON c.user_id = o.customer_id
WHERE o.customer_id IS NULL;
The output includes sink information for the new table:
== Physical Plan ==
StreamPhysicalSink [11]
+- StreamPhysicalCalc [10]
+- StreamPhysicalJoin [9]
+- StreamPhysicalExchange [3]
: +- StreamPhysicalCalc [2]
: +- StreamPhysicalTableSourceScan [1]
+- StreamPhysicalExchange [8]
+- StreamPhysicalGroupAggregate [7]
+- StreamPhysicalExchange [6]
+- StreamPhysicalCalc [5]
+- StreamPhysicalTableSourceScan [4]
== Physical Details ==
[1] StreamPhysicalTableSourceScan
Table: `examples`.`marketplace`.`clicks`
[4] StreamPhysicalTableSourceScan
Table: `examples`.`marketplace`.`orders`
[7] StreamPhysicalGroupAggregate
Changelog mode: retract
Upsert key: (customer_id)
[8] StreamPhysicalExchange
Changelog mode: retract
Upsert key: (customer_id)
[9] StreamPhysicalJoin
Changelog mode: retract
[10] StreamPhysicalCalc
Changelog mode: retract
[11] StreamPhysicalSink
Table: `catalog`.`database`.`clicks_without_orders`
Changelog mode: retract
Inserting Values¶
This example shows inserting static values:
EXPLAIN
INSERT INTO orders VALUES
(1, 1001, '2023-02-24', 50.0),
(2, 1002, '2023-02-25', 60.0),
(3, 1003, '2023-02-26', 70.0);
The output shows a simple insertion plan:
== Physical Plan ==
StreamPhysicalSink [3]
+- StreamPhysicalCalc [2]
+- StreamPhysicalValues [1]
== Physical Details ==
[3] StreamPhysicalSink
Table: `catalog`.`database`.`orders`
Multiple Operations¶
This example demonstrates operation reuse across multiple inserts:
EXPLAIN STATEMENT SET
BEGIN
INSERT INTO low_orders SELECT * from `orders` where price < 100;
INSERT INTO high_orders SELECT * from `orders` where price > 100;
END;
The output shows table scan reuse:
== Physical Plan ==
StreamPhysicalSink [3]
+- StreamPhysicalCalc [2]
+- StreamPhysicalTableSourceScan [1]
StreamPhysicalSink [5]
+- StreamPhysicalCalc [4]
+- (reused) [1]
== Physical Details ==
[1] StreamPhysicalTableSourceScan
Table: `catalog`.`database`.`orders`
[3] StreamPhysicalSink
Table: `catalog`.`database`.`low_orders`
[5] StreamPhysicalSink
Table: `catalog`.`database`.`high_orders`
Window Functions¶
This example shows window functions and self-joins:
EXPLAIN
WITH windowed_customers AS (
SELECT * FROM TABLE(
TUMBLE(TABLE `examples`.`marketplace`.`customers`, DESCRIPTOR($rowtime), INTERVAL '1' MINUTE)
)
)
SELECT
c1.window_start,
c1.city,
COUNT(DISTINCT c1.customer_id) as unique_customers,
COUNT(c2.customer_id) as total_connections
FROM
windowed_customers c1
JOIN windowed_customers c2
ON c1.city = c2.city
AND c1.customer_id < c2.customer_id
AND c1.window_start = c2.window_start
GROUP BY
c1.window_start,
c1.city
HAVING
COUNT(DISTINCT c1.customer_id) > 5;
The output shows the complex processing required for windowed aggregations:
== Physical Plan ==
StreamPhysicalSink [14]
+- StreamPhysicalCalc [13]
+- StreamPhysicalGroupAggregate [12]
+- StreamPhysicalExchange [11]
+- StreamPhysicalCalc [10]
+- StreamPhysicalJoin [9]
+- StreamPhysicalExchange [8]
: +- StreamPhysicalCalc [7]
: +- StreamPhysicalWindowTableFunction [6]
: +- StreamPhysicalCalc [5]
: +- StreamPhysicalChangelogNormalize [4]
: +- StreamPhysicalExchange [3]
: +- StreamPhysicalCalc [2]
: +- StreamPhysicalTableSourceScan [1]
+- (reused) [8]
== Physical Details ==
[1] StreamPhysicalTableSourceScan
Table: `examples`.`marketplace`.`customers`
Primary key: (customer_id)
Changelog mode: upsert
Upsert key: (customer_id)
[2] StreamPhysicalCalc
Changelog mode: upsert
Upsert key: (customer_id)
[3] StreamPhysicalExchange
Changelog mode: upsert
Upsert key: (customer_id)
[4] StreamPhysicalChangelogNormalize
Changelog mode: retract
Upsert key: (customer_id)
[5] StreamPhysicalCalc
Changelog mode: retract
Upsert key: (customer_id)
[6] StreamPhysicalWindowTableFunction
Changelog mode: retract
[7] StreamPhysicalCalc
Changelog mode: retract
[8] StreamPhysicalExchange
Changelog mode: retract
[9] StreamPhysicalJoin
Changelog mode: retract
[10] StreamPhysicalCalc
Changelog mode: retract
[11] StreamPhysicalExchange
Changelog mode: retract
[12] StreamPhysicalGroupAggregate
Changelog mode: retract
Upsert key: (window_start, city)
[13] StreamPhysicalCalc
Changelog mode: retract
Upsert key: (window_start, city)
[14] StreamPhysicalSink
Table: Foreground
Changelog mode: retract
Upsert key: (window_start, city)
Understanding the Output¶
Reading Physical Plans¶
The physical plan shows how Flink executes your query. Each operation is numbered and indented to show its position in the execution flow. Indentation indicates data flow, with each operator passing results to its parent.
Changelog Modes¶
Changelog modes show how operators handle data modifications:
- When no changelog mode appears, the operator uses “append” mode (insert-only)
- “upsert” mode enables inserts and updates using an upsert key
- “retract” mode supports inserts, updates, and deletes
Operators change changelog modes when different update patterns are needed, such as when moving from streaming reads to aggregations.
Data Movement¶
The physical details section shows how data moves between operators. Watch for:
- Exchange operators indicating data redistribution
- Changes in upsert keys showing where data must be reshuffled
- Operator reuse marked by “(reused)” references
Optimizing Query Performance¶
Minimizing Data Movement¶
Data shuffling impacts performance. When examining EXPLAIN output:
- Look for exchange operators and upsert key changes
- Consider keeping compatible partitioning keys through your query
- Watch for opportunities to reduce data redistribution
Pay special attention to data skew when designing your queries. If a particular key value appears much more frequently than others, it can lead to uneven processing where a single parallel instance becomes overwhelmed handling that key’s data. Consider strategies like adding additional dimensions to your keys or pre-aggregating hot keys to distribute the workload more evenly.
Using Operator Reuse¶
Flink automatically reuses operators when possible. In EXPLAIN output:
- Look for “(reused)” references showing optimization
- Consider restructuring queries to enable more reuse
- Verify that similar operations share scan results
Optimizing Sink Configuration¶
When working with sinks in upsert mode, it’s crucial to align your primary and upsert keys for optimal performance:
- Whenever possible, configure the primary key to be identical to the upsert key
- Having different primary and upsert keys in upsert mode can lead to significant performance degradation
- If you must use different keys, carefully evaluate the performance impact and consider restructuring your query to align these keys