CREATE OR ALTER MATERIALIZED TABLE Statement in Confluent Cloud for Apache Flink
Confluent Cloud for Apache Flink® enables evolving a materialized table in place by using the CREATE OR ALTER MATERIALIZED TABLE statement. This statement creates a new materialized table if it does not exist, or triggers an evolution of an existing one.
An evolution updates the continuous query, schema, or both, while keeping the same output topic. This automates the complex manual process of stopping a statement, carrying over offsets, and migrating downstream consumers.
For details on table definition options like schemas, WITH properties, and DISTRIBUTED BY, see CREATE MATERIALIZED TABLE.
Syntax
CREATE OR ALTER MATERIALIZED TABLE [catalog_name.][db_name.]table_name
[(
{ <physical_column_definition> |
<metadata_column_definition> |
<computed_column_definition> }[ , ...n]
[ <watermark_definition> ]
[ <table_constraint> ][ , ...n]
)]
[COMMENT table_comment]
[DISTRIBUTED BY (column_name1, column_name2, ...) INTO n BUCKETS]
[WITH (key1=value1, key2=value2, ...)]
[START_MODE = <start_mode_value>]
AS <select_query>
Description
When you run CREATE OR ALTER MATERIALIZED TABLE:
If the table does not exist, a new materialized table is created (same behavior as CREATE MATERIALIZED TABLE).
If the table already exists, an evolution is triggered.
How evolution works
An evolution performs an in-place migration:
The existing continuous query is stopped.
A new continuous query is created with the updated query, schema, and configuration.
The new query begins processing data according to the
START_MODEsetting.Results are written to the same output Kafka topic as before.
State handling
All existing Flink processing state, including aggregation counts, join state, and window state, is discarded when an evolution is triggered. A new state is built from scratch based on the reprocessing settings specified by START_MODE.
For stateful queries, like GROUP BY aggregations, the results are recalculated by reprocessing the source data, not by migrating the previous state.
Important behavior
Not idempotent: Running the same
CREATE OR ALTERcommand always triggers a new evolution, even if nothing has changed. Use caution when running this command in automated scripts to avoid unintentional reprocessing.Concurrent evolutions are rejected: If an evolution is already in progress for the same materialized table, a second
CREATE OR ALTERcommand is rejected with an error.No automatic rollback: If the new query fails at runtime, for example, due to a permission error or a UDF failure, the materialized table enters a FAILED state. The previous version is not restored automatically.
START_MODE
The START_MODE clause controls how much historical data is processed when a materialized table is created or evolved. It answers the question: “What data should the new query logic process?”
If START_MODE is omitted, the default is RESUME_OR_FROM_BEGINNING.
Value | Description |
|---|---|
| Reprocesses all available data from the source, starting from the earliest available offset. |
| Processes only new data arriving after the command is executed. No historical data is reprocessed. |
| Reprocesses data starting from a specific absolute timestamp. For example: |
| Reprocesses data from a relative time in the past, evaluated at the time the command is executed. For example: |
| On ALTER (existing table): Attempts to resume from the previous job’s last position. On CREATE (new table): Falls back to |
| On ALTER: Attempts to resume from the previous job’s last position. On CREATE: Falls back to |
| On ALTER: Attempts to resume from the previous job’s last position. On CREATE: Falls back to |
| On ALTER: Attempts to resume from the previous job’s last position. On CREATE: Falls back to |
Behavior on CREATE vs ALTER
The RESUME_OR_* options behave differently depending on whether the materialized table is being created for the first time or being evolved:
On CREATE: No previous job exists, so the
RESUMEpart has no effect. The system uses the fallback behavior (FROM_BEGINNING,FROM_NOW,FROM_TIMESTAMP, orFROM_NOW('<interval>')).On ALTER: The system first attempts to resume from the previous job’s savepoint, starting exactly where the old job stopped. If resume is not possible, for example, because the query change is incompatible, it falls back to the specified mode.
Interaction with source retention
For any option that reprocesses historical data, like FROM_BEGINNING or FROM_TIMESTAMP, if the specified start point is older than the earliest data available in the source, for example, due to topic retention policies, the job starts processing from the earliest available data point. No error is raised.
Examples
Add a column to an existing materialized table
CREATE OR ALTER MATERIALIZED TABLE enriched_orders (
`order_id` STRING,
`customer_id` INT,
`price` DOUBLE,
`product_id` STRING
) AS
SELECT order_id, customer_id, price, product_id
FROM examples.marketplace.orders;
The output topic schema is updated to include product_id. Data is reprocessed based on the START_MODE (default: RESUME_OR_FROM_BEGINNING).
Change query logic
Add a filter to an existing materialized table:
CREATE OR ALTER MATERIALIZED TABLE enriched_orders (
`order_id` STRING,
`customer_id` INT,
`price` DOUBLE,
`product_id` STRING
) AS
SELECT order_id, customer_id, price, product_id
FROM examples.marketplace.orders
WHERE price > 25.00;
Use START_MODE for full reprocessing
Force a full reprocessing of all available historical data:
CREATE OR ALTER MATERIALIZED TABLE enriched_orders
START_MODE = FROM_BEGINNING
AS
SELECT order_id, customer_id, price
FROM examples.marketplace.orders;
Use START_MODE with a relative interval
Reprocess only the last seven days of data:
CREATE OR ALTER MATERIALIZED TABLE enriched_orders
START_MODE = FROM_NOW('7' DAY)
AS
SELECT order_id, customer_id, price
FROM examples.marketplace.orders;
Downstream consumer considerations
Because evolutions discard state and rebuild from scratch, downstream consumers might observe specific behaviors. For detailed guidance, see Downstream consumer impact.
Upsert mode: Keys that are filtered out by the new query logic have no delete message emitted. Old records remain downstream as stale “zombie” data.
Append-only mode: Reprocessed records appear alongside the original records, resulting in duplicates in the output topic.
Retract mode: The new job emits
INSERTmessages for the reprocessed data but does not emitDELETEmessages for the old result set.
Limitations
For the full list of current limitations, see Materialized Tables limitations.