Evolve a Streaming Pipeline with Confluent Cloud for Apache Flink
Confluent Cloud for Apache Flink® enables evolving streaming pipelines in place by using materialized tables, persistent objects that combine a table definition with a continuous query. In this guide, you create a materialized table with a simple query and then evolve it by adding a new column, all without recreating the table or switching output topics.
This guide uses the examples.marketplace.orders table, which is a read-only source of sample data available in every Flink environment.
This guide shows the following steps:
Prerequisites
Access to Confluent Cloud.
The FlinkDeveloper role or a higher role, such as EnvironmentAdmin. For more information, refer to Grant Role-Based Access in Confluent Cloud for Apache Flink.
A provisioned Flink compute pool.
Step 1: Create a materialized table
Log in to Confluent Cloud and navigate to your Flink workspace.
Create a materialized table that selects a subset of columns from the
examples.marketplace.orderstable.CREATE MATERIALIZED TABLE my_orders AS SELECT order_id, price FROM examples.marketplace.orders;
Verify the materialized table was created by describing its schema.
DESCRIBE my_orders;
Your output should resemble:
+----------+--------+------+-----+--------+-----------+ | name | type | null | key | extras | watermark | +----------+--------+------+-----+--------+-----------+ | order_id | STRING | TRUE | | | | | price | DOUBLE | TRUE | | | | +----------+--------+------+-----+--------+-----------+
Query the materialized table to verify data is flowing.
SELECT * FROM my_orders;
Step 2: Evolve by adding a column
Now, evolve the materialized table to include the customer_id column by using CREATE OR ALTER MATERIALIZED TABLE.
Run the following statement to add the
customer_idcolumn.CREATE OR ALTER MATERIALIZED TABLE my_orders AS SELECT order_id, customer_id, price FROM examples.marketplace.orders;
Flink stops the previous query and starts a new one with the updated projection. Results continue to be written to the same output topic.
Verify the schema has been updated.
DESCRIBE my_orders;
Your output should now include the
customer_idcolumn:+-------------+---------+------+-----+--------+-----------+ | name | type | null | key | extras | watermark | +-------------+---------+------+-----+--------+-----------+ | order_id | STRING | TRUE | | | | | customer_id | INT | TRUE | | | | | price | DOUBLE | TRUE | | | | +-------------+---------+------+-----+--------+-----------+
Query the materialized table to verify the new column is populated.
SELECT * FROM my_orders;
Step 3: (Optional) Control reprocessing with START_MODE
By default, CREATE OR ALTER uses RESUME_OR_FROM_BEGINNING, which attempts to resume from the previous job’s position. You can control reprocessing behavior explicitly with the START_MODE clause.
For example, to force a full reprocessing of all available historical data:
CREATE OR ALTER MATERIALIZED TABLE my_orders
START_MODE = FROM_BEGINNING
AS
SELECT order_id, customer_id, price
FROM examples.marketplace.orders;
For the full list of START_MODE values, refer to START_MODE.
Clean up
Remove the materialized table when you are done.
DROP MATERIALIZED TABLE IF EXISTS my_orders;