Evolve a Streaming Pipeline with Confluent Cloud for Apache Flink

Confluent Cloud for Apache Flink® enables evolving streaming pipelines in place by using materialized tables, persistent objects that combine a table definition with a continuous query. In this guide, you create a materialized table with a simple query and then evolve it by adding a new column, all without recreating the table or switching output topics.

This guide uses the examples.marketplace.orders table, which is a read-only source of sample data available in every Flink environment.

This guide shows the following steps:

Prerequisites

Step 1: Create a materialized table

  1. Log in to Confluent Cloud and navigate to your Flink workspace.

  2. Create a materialized table that selects a subset of columns from the examples.marketplace.orders table.

    CREATE MATERIALIZED TABLE my_orders AS
    SELECT order_id, price
    FROM examples.marketplace.orders;
    
  3. Verify the materialized table was created by describing its schema.

    DESCRIBE my_orders;
    

    Your output should resemble:

    +----------+--------+------+-----+--------+-----------+
    |     name |   type | null | key | extras | watermark |
    +----------+--------+------+-----+--------+-----------+
    | order_id | STRING | TRUE |     |        |           |
    |    price | DOUBLE | TRUE |     |        |           |
    +----------+--------+------+-----+--------+-----------+
    
  4. Query the materialized table to verify data is flowing.

    SELECT * FROM my_orders;
    

Step 2: Evolve by adding a column

Now, evolve the materialized table to include the customer_id column by using CREATE OR ALTER MATERIALIZED TABLE.

  1. Run the following statement to add the customer_id column.

    CREATE OR ALTER MATERIALIZED TABLE my_orders AS
    SELECT order_id, customer_id, price
    FROM examples.marketplace.orders;
    

    Flink stops the previous query and starts a new one with the updated projection. Results continue to be written to the same output topic.

  2. Verify the schema has been updated.

    DESCRIBE my_orders;
    

    Your output should now include the customer_id column:

    +-------------+---------+------+-----+--------+-----------+
    |        name |    type | null | key | extras | watermark |
    +-------------+---------+------+-----+--------+-----------+
    |    order_id |  STRING | TRUE |     |        |           |
    | customer_id |     INT | TRUE |     |        |           |
    |       price |  DOUBLE | TRUE |     |        |           |
    +-------------+---------+------+-----+--------+-----------+
    
  3. Query the materialized table to verify the new column is populated.

    SELECT * FROM my_orders;
    

Step 3: (Optional) Control reprocessing with START_MODE

By default, CREATE OR ALTER uses RESUME_OR_FROM_BEGINNING, which attempts to resume from the previous job’s position. You can control reprocessing behavior explicitly with the START_MODE clause.

For example, to force a full reprocessing of all available historical data:

CREATE OR ALTER MATERIALIZED TABLE my_orders
START_MODE = FROM_BEGINNING
AS
SELECT order_id, customer_id, price
FROM examples.marketplace.orders;

For the full list of START_MODE values, refer to START_MODE.

Clean up

Remove the materialized table when you are done.

DROP MATERIALIZED TABLE IF EXISTS my_orders;