EXPLAIN Statement in Confluent Cloud for Apache Flink

Confluent Cloud for Apache Flink® enables viewing and analyzing the query plans of Flink SQL statements.

Syntax

EXPLAIN { <query_statement> | <insert_statement> | <statement_set> | CREATE TABLE ... AS SELECT ... }

<statement_set>:
STATEMENT SET
BEGIN
  -- one or more INSERT INTO statements
  { INSERT INTO <select_statement>; }+
END;

Description

The EXPLAIN statement provides detailed information about how Flink executes a specified query or INSERT statement. EXPLAIN shows:

  • The optimized physical execution plan
  • If the changelog mode is not append-only, details about the changelog mode per operator
  • Upsert keys and primary keys where applicable
  • Table source and sink details

This information is valuable for understanding query performance, optimizing complex queries, and debugging unexpected results.

Example Queries

Basic Query Analysis

This example analyzes a query finding users who clicked but never placed an order:

EXPLAIN
SELECT c.*
FROM `examples`.`marketplace`.`clicks` c
LEFT JOIN (
  SELECT DISTINCT customer_id
  FROM `examples`.`marketplace`.`orders`
) o ON c.user_id = o.customer_id
WHERE o.customer_id IS NULL;

The output shows the physical plan and operator details:

== Physical Plan ==
StreamPhysicalSink [11]
  +- StreamPhysicalCalc [10]
    +- StreamPhysicalJoin [9]
      +- StreamPhysicalExchange [3]
      :  +- StreamPhysicalCalc [2]
      :    +- StreamPhysicalTableSourceScan [1]
      +- StreamPhysicalExchange [8]
        +- StreamPhysicalGroupAggregate [7]
          +- StreamPhysicalExchange [6]
            +- StreamPhysicalCalc [5]
              +- StreamPhysicalTableSourceScan [4]

== Physical Details ==
[1] StreamPhysicalTableSourceScan
Table: `examples`.`marketplace`.`clicks`

[4] StreamPhysicalTableSourceScan
Table: `examples`.`marketplace`.`orders`

[7] StreamPhysicalGroupAggregate
Changelog mode: retract
Upsert key: (customer_id)

[8] StreamPhysicalExchange
Changelog mode: retract
Upsert key: (customer_id)

[9] StreamPhysicalJoin
Changelog mode: retract

[10] StreamPhysicalCalc
Changelog mode: retract

[11] StreamPhysicalSink
Table: Foreground
Changelog mode: retract

Note that the [11] StreamPhysicalSink Table: Foreground in the output indicates this is a preview execution plan. For more accurate optimization analysis, it’s recommended to test queries using either the final target table or CREATE TABLE AS statements, which will determine the optimal primary key and changelog mode for your specific use case.

Creating Tables

This example shows creating a new table from a query:

EXPLAIN
CREATE TABLE clicks_without_orders AS
SELECT c.*
FROM `examples`.`marketplace`.`clicks` c
LEFT JOIN (
  SELECT DISTINCT customer_id
  FROM `examples`.`marketplace`.`orders`
) o ON c.user_id = o.customer_id
WHERE o.customer_id IS NULL;

The output includes sink information for the new table:

== Physical Plan ==
StreamPhysicalSink [11]
  +- StreamPhysicalCalc [10]
    +- StreamPhysicalJoin [9]
      +- StreamPhysicalExchange [3]
      :  +- StreamPhysicalCalc [2]
      :    +- StreamPhysicalTableSourceScan [1]
      +- StreamPhysicalExchange [8]
        +- StreamPhysicalGroupAggregate [7]
          +- StreamPhysicalExchange [6]
            +- StreamPhysicalCalc [5]
              +- StreamPhysicalTableSourceScan [4]

== Physical Details ==
[1] StreamPhysicalTableSourceScan
Table: `examples`.`marketplace`.`clicks`

[4] StreamPhysicalTableSourceScan
Table: `examples`.`marketplace`.`orders`

[7] StreamPhysicalGroupAggregate
Changelog mode: retract
Upsert key: (customer_id)

[8] StreamPhysicalExchange
Changelog mode: retract
Upsert key: (customer_id)

[9] StreamPhysicalJoin
Changelog mode: retract

[10] StreamPhysicalCalc
Changelog mode: retract

[11] StreamPhysicalSink
Table: `catalog`.`database`.`clicks_without_orders`
Changelog mode: retract

Inserting Values

This example shows inserting static values:

EXPLAIN
INSERT INTO orders VALUES
  (1, 1001, '2023-02-24', 50.0),
  (2, 1002, '2023-02-25', 60.0),
  (3, 1003, '2023-02-26', 70.0);

The output shows a simple insertion plan:

== Physical Plan ==
StreamPhysicalSink [3]
  +- StreamPhysicalCalc [2]
    +- StreamPhysicalValues [1]

== Physical Details ==
[3] StreamPhysicalSink
Table: `catalog`.`database`.`orders`

Multiple Operations

This example demonstrates operation reuse across multiple inserts:

EXPLAIN STATEMENT SET
BEGIN
  INSERT INTO low_orders SELECT * from `orders` where price < 100;
  INSERT INTO high_orders SELECT * from `orders` where price > 100;
END;

The output shows table scan reuse:

== Physical Plan ==
StreamPhysicalSink [3]
  +- StreamPhysicalCalc [2]
    +- StreamPhysicalTableSourceScan [1]
StreamPhysicalSink [5]
  +- StreamPhysicalCalc [4]
    +- (reused) [1]

== Physical Details ==
[1] StreamPhysicalTableSourceScan
Table: `catalog`.`database`.`orders`

[3] StreamPhysicalSink
Table: `catalog`.`database`.`low_orders`

[5] StreamPhysicalSink
Table: `catalog`.`database`.`high_orders`

Window Functions

This example shows window functions and self-joins:

EXPLAIN
WITH windowed_customers AS (
  SELECT * FROM TABLE(
    TUMBLE(TABLE `examples`.`marketplace`.`customers`, DESCRIPTOR($rowtime), INTERVAL '1' MINUTE)
  )
)
SELECT
    c1.window_start,
    c1.city,
    COUNT(DISTINCT c1.customer_id) as unique_customers,
    COUNT(c2.customer_id) as total_connections
FROM
    windowed_customers c1
    JOIN windowed_customers c2
    ON c1.city = c2.city
    AND c1.customer_id < c2.customer_id
    AND c1.window_start = c2.window_start
GROUP BY
    c1.window_start,
    c1.city
HAVING
    COUNT(DISTINCT c1.customer_id) > 5;

The output shows the complex processing required for windowed aggregations:

== Physical Plan ==
StreamPhysicalSink [14]
  +- StreamPhysicalCalc [13]
    +- StreamPhysicalGroupAggregate [12]
      +- StreamPhysicalExchange [11]
        +- StreamPhysicalCalc [10]
          +- StreamPhysicalJoin [9]
            +- StreamPhysicalExchange [8]
            :  +- StreamPhysicalCalc [7]
            :    +- StreamPhysicalWindowTableFunction [6]
            :      +- StreamPhysicalCalc [5]
            :        +- StreamPhysicalChangelogNormalize [4]
            :          +- StreamPhysicalExchange [3]
            :            +- StreamPhysicalCalc [2]
            :              +- StreamPhysicalTableSourceScan [1]
            +- (reused) [8]

== Physical Details ==
[1] StreamPhysicalTableSourceScan
Table: `examples`.`marketplace`.`customers`
Primary key: (customer_id)
Changelog mode: upsert
Upsert key: (customer_id)

[2] StreamPhysicalCalc
Changelog mode: upsert
Upsert key: (customer_id)

[3] StreamPhysicalExchange
Changelog mode: upsert
Upsert key: (customer_id)

[4] StreamPhysicalChangelogNormalize
Changelog mode: retract
Upsert key: (customer_id)

[5] StreamPhysicalCalc
Changelog mode: retract
Upsert key: (customer_id)

[6] StreamPhysicalWindowTableFunction
Changelog mode: retract

[7] StreamPhysicalCalc
Changelog mode: retract

[8] StreamPhysicalExchange
Changelog mode: retract

[9] StreamPhysicalJoin
Changelog mode: retract

[10] StreamPhysicalCalc
Changelog mode: retract

[11] StreamPhysicalExchange
Changelog mode: retract

[12] StreamPhysicalGroupAggregate
Changelog mode: retract
Upsert key: (window_start, city)

[13] StreamPhysicalCalc
Changelog mode: retract
Upsert key: (window_start, city)

[14] StreamPhysicalSink
Table: Foreground
Changelog mode: retract
Upsert key: (window_start, city)

Understanding the Output

Reading Physical Plans

The physical plan shows how Flink executes your query. Each operation is numbered and indented to show its position in the execution flow. Indentation indicates data flow, with each operator passing results to its parent.

Changelog Modes

Changelog modes show how operators handle data modifications:

  • When no changelog mode appears, the operator uses “append” mode (insert-only)
  • “upsert” mode enables inserts and updates using an upsert key
  • “retract” mode supports inserts, updates, and deletes

Operators change changelog modes when different update patterns are needed, such as when moving from streaming reads to aggregations.

Data Movement

The physical details section shows how data moves between operators. Watch for:

  • Exchange operators indicating data redistribution
  • Changes in upsert keys showing where data must be reshuffled
  • Operator reuse marked by “(reused)” references

Optimizing Query Performance

Minimizing Data Movement

Data shuffling impacts performance. When examining EXPLAIN output:

  • Look for exchange operators and upsert key changes
  • Consider keeping compatible partitioning keys through your query
  • Watch for opportunities to reduce data redistribution

Pay special attention to data skew when designing your queries. If a particular key value appears much more frequently than others, it can lead to uneven processing where a single parallel instance becomes overwhelmed handling that key’s data. Consider strategies like adding additional dimensions to your keys or pre-aggregating hot keys to distribute the workload more evenly.

Using Operator Reuse

Flink automatically reuses operators when possible. In EXPLAIN output:

  • Look for “(reused)” references showing optimization
  • Consider restructuring queries to enable more reuse
  • Verify that similar operations share scan results

Optimizing Sink Configuration

When working with sinks in upsert mode, it’s crucial to align your primary and upsert keys for optimal performance:

  • Whenever possible, configure the primary key to be identical to the upsert key
  • Having different primary and upsert keys in upsert mode can lead to significant performance degradation
  • If you must use different keys, carefully evaluate the performance impact and consider restructuring your query to align these keys