ALTER MATERIALIZED TABLE Statement in Confluent Cloud for Apache Flink

Confluent Cloud for Apache Flink® enables modifying an existing materialized table by using the ALTER MATERIALIZED TABLE statement. You can change table properties, add columns, or evolve the continuous query.

Syntax

ALTER MATERIALIZED TABLE [catalog_name.][db_name.]table_name {
   ADD (metadata_column_name metadata_column_type METADATA [FROM metadata_key] VIRTUAL [COMMENT column_comment])
 | ADD (computed_column_name AS computed_column_expression [COMMENT column_comment])
 | MODIFY WATERMARK FOR rowtime_column_name AS watermark_strategy_expression
 | DROP WATERMARK
 | SET (key1='value1' [, key2='value2', ...])
 | RESET (key1 [, key2, ...])
}

ALTER MATERIALIZED TABLE [catalog_name.][db_name.]table_name
  [(
    { <physical_column_definition> |
      <metadata_column_definition> |
      <computed_column_definition> }[ , ...n]
    [ <watermark_definition> ]
    [ <table_constraint> ][ , ...n]
  )]
  [COMMENT table_comment]
  [DISTRIBUTED BY (column_name1, column_name2, ...) INTO n BUCKETS]
  [WITH (key1=value1, key2=value2, ...)]
  [START_MODE = <start_mode_value>]
  AS <select_query>

Description

ALTER MATERIALIZED TABLE supports two categories of changes:

Property changes

The following changes modify table metadata without triggering an evolution of the continuous query:

Physical columns cannot be added, modified, or dropped within Flink directly, but schemas can be evolved in Schema Registry.

Query evolution

ALTER MATERIALIZED TABLE can also change the AS SELECT query, triggering a full evolution of the materialized table. This works the same way as CREATE OR ALTER MATERIALIZED TABLE, except that the materialized table must already exist.

When an evolution is triggered:

  • The existing continuous query is stopped.

  • All Flink processing state is discarded.

  • A new continuous query starts with the updated query.

  • Results are written to the same output topic.

  • The START_MODE clause controls data reprocessing behavior.

For the full list of START_MODE values, see START_MODE.

Examples

Add a metadata column to expose Apache Kafka® timestamps:

ALTER MATERIALIZED TABLE enriched_orders
ADD (`kafka_ts` TIMESTAMP_LTZ(3) METADATA FROM 'timestamp' VIRTUAL);

Add a computed column:

ALTER MATERIALIZED TABLE enriched_orders
ADD (`price_with_vat` AS price * 1.21);

Change a table property:

ALTER MATERIALIZED TABLE enriched_orders
SET ('kafka.retention.time' = '30 d');

Add a watermark definition:

ALTER MATERIALIZED TABLE enriched_orders
MODIFY WATERMARK FOR `event_time` AS `event_time` - INTERVAL '5' SECOND;

Evolve the query:

ALTER MATERIALIZED TABLE enriched_orders (
  `order_id` STRING,
  `customer_id` INT,
  `price` DOUBLE,
  `product_id` STRING
) AS
SELECT order_id, customer_id, price, product_id
FROM examples.marketplace.orders;