CREATE OR ALTER MATERIALIZED TABLE Statement in Confluent Cloud for Apache Flink

Confluent Cloud for Apache Flink® enables evolving a materialized table in place by using the CREATE OR ALTER MATERIALIZED TABLE statement. This statement creates a new materialized table if it does not exist, or triggers an evolution of an existing one.

An evolution updates the continuous query, schema, or both, while keeping the same output topic. This automates the complex manual process of stopping a statement, carrying over offsets, and migrating downstream consumers.

For details on table definition options like schemas, WITH properties, and DISTRIBUTED BY, see CREATE MATERIALIZED TABLE.

Syntax

CREATE OR ALTER MATERIALIZED TABLE [catalog_name.][db_name.]table_name
  [(
    { <physical_column_definition> |
      <metadata_column_definition> |
      <computed_column_definition> }[ , ...n]
    [ <watermark_definition> ]
    [ <table_constraint> ][ , ...n]
  )]
  [COMMENT table_comment]
  [DISTRIBUTED BY (column_name1, column_name2, ...) INTO n BUCKETS]
  [WITH (key1=value1, key2=value2, ...)]
  [START_MODE = <start_mode_value>]
  AS <select_query>

Description

When you run CREATE OR ALTER MATERIALIZED TABLE:

  • If the table does not exist, a new materialized table is created (same behavior as CREATE MATERIALIZED TABLE).

  • If the table already exists, an evolution is triggered.

How evolution works

An evolution performs an in-place migration:

  1. The existing continuous query is stopped.

  2. A new continuous query is created with the updated query, schema, and configuration.

  3. The new query begins processing data according to the START_MODE setting.

  4. Results are written to the same output Kafka topic as before.

State handling

All existing Flink processing state, including aggregation counts, join state, and window state, is discarded when an evolution is triggered. A new state is built from scratch based on the reprocessing settings specified by START_MODE.

For stateful queries, like GROUP BY aggregations, the results are recalculated by reprocessing the source data, not by migrating the previous state.

Important behavior

  • Not idempotent: Running the same CREATE OR ALTER command always triggers a new evolution, even if nothing has changed. Use caution when running this command in automated scripts to avoid unintentional reprocessing.

  • Concurrent evolutions are rejected: If an evolution is already in progress for the same materialized table, a second CREATE OR ALTER command is rejected with an error.

  • No automatic rollback: If the new query fails at runtime, for example, due to a permission error or a UDF failure, the materialized table enters a FAILED state. The previous version is not restored automatically.

START_MODE

The START_MODE clause controls how much historical data is processed when a materialized table is created or evolved. It answers the question: “What data should the new query logic process?”

If START_MODE is omitted, the default is RESUME_OR_FROM_BEGINNING.

Value

Description

FROM_BEGINNING

Reprocesses all available data from the source, starting from the earliest available offset.

FROM_NOW

Processes only new data arriving after the command is executed. No historical data is reprocessed.

FROM_TIMESTAMP('...')

Reprocesses data starting from a specific absolute timestamp. For example: FROM_TIMESTAMP('2026-03-01T00:00:00Z').

FROM_NOW('...')

Reprocesses data from a relative time in the past, evaluated at the time the command is executed. For example: FROM_NOW('7' DAY).

RESUME_OR_FROM_BEGINNING (default)

On ALTER (existing table): Attempts to resume from the previous job’s last position. On CREATE (new table): Falls back to FROM_BEGINNING and reprocesses all available data.

RESUME_OR_FROM_NOW

On ALTER: Attempts to resume from the previous job’s last position. On CREATE: Falls back to FROM_NOW and processes only new data.

RESUME_OR_FROM_TIMESTAMP('...')

On ALTER: Attempts to resume from the previous job’s last position. On CREATE: Falls back to FROM_TIMESTAMP.

RESUME_OR_FROM_NOW('...')

On ALTER: Attempts to resume from the previous job’s last position. On CREATE: Falls back to FROM_NOW('<interval>').

Behavior on CREATE vs ALTER

The RESUME_OR_* options behave differently depending on whether the materialized table is being created for the first time or being evolved:

  • On CREATE: No previous job exists, so the RESUME part has no effect. The system uses the fallback behavior (FROM_BEGINNING, FROM_NOW, FROM_TIMESTAMP, or FROM_NOW('<interval>')).

  • On ALTER: The system first attempts to resume from the previous job’s savepoint, starting exactly where the old job stopped. If resume is not possible, for example, because the query change is incompatible, it falls back to the specified mode.

Interaction with source retention

For any option that reprocesses historical data, like FROM_BEGINNING or FROM_TIMESTAMP, if the specified start point is older than the earliest data available in the source, for example, due to topic retention policies, the job starts processing from the earliest available data point. No error is raised.

Examples

Add a column to an existing materialized table

CREATE OR ALTER MATERIALIZED TABLE enriched_orders (
  `order_id` STRING,
  `customer_id` INT,
  `price` DOUBLE,
  `product_id` STRING
) AS
SELECT order_id, customer_id, price, product_id
FROM examples.marketplace.orders;

The output topic schema is updated to include product_id. Data is reprocessed based on the START_MODE (default: RESUME_OR_FROM_BEGINNING).

Change query logic

Add a filter to an existing materialized table:

CREATE OR ALTER MATERIALIZED TABLE enriched_orders (
  `order_id` STRING,
  `customer_id` INT,
  `price` DOUBLE,
  `product_id` STRING
) AS
SELECT order_id, customer_id, price, product_id
FROM examples.marketplace.orders
WHERE price > 25.00;

Use START_MODE for full reprocessing

Force a full reprocessing of all available historical data:

CREATE OR ALTER MATERIALIZED TABLE enriched_orders
START_MODE = FROM_BEGINNING
AS
SELECT order_id, customer_id, price
FROM examples.marketplace.orders;

Use START_MODE with a relative interval

Reprocess only the last seven days of data:

CREATE OR ALTER MATERIALIZED TABLE enriched_orders
START_MODE = FROM_NOW('7' DAY)
AS
SELECT order_id, customer_id, price
FROM examples.marketplace.orders;

Downstream consumer considerations

Because evolutions discard state and rebuild from scratch, downstream consumers might observe specific behaviors. For detailed guidance, see Downstream consumer impact.

  • Upsert mode: Keys that are filtered out by the new query logic have no delete message emitted. Old records remain downstream as stale “zombie” data.

  • Append-only mode: Reprocessed records appear alongside the original records, resulting in duplicates in the output topic.

  • Retract mode: The new job emits INSERT messages for the reprocessed data but does not emit DELETE messages for the old result set.

Limitations

For the full list of current limitations, see Materialized Tables limitations.