CREATE MATERIALIZED TABLE Statement in Confluent Cloud for Apache Flink

Confluent Cloud for Apache Flink® enables creating materialized tables that combine a table definition with a continuous query in a single persistent object. Use materialized tables for long-running streaming queries that act as incremental materialized views. For more information, see Materialized Tables.

Syntax

CREATE MATERIALIZED TABLE [IF NOT EXISTS] [catalog_name.][db_name.]table_name
  [(
    { <physical_column_definition> |
      <metadata_column_definition> |
      <computed_column_definition> }[ , ...n]
    [ <watermark_definition> ]
    [ <table_constraint> ][ , ...n]
  )]
  [COMMENT table_comment]
  [DISTRIBUTED BY (column_name1, column_name2, ...) INTO n BUCKETS]
  [WITH (key1=value1, key2=value2, ...)]
  [START_MODE = <start_mode_value>]
  AS <select_query>

Description

The CREATE MATERIALIZED TABLE statement creates a new persistent materialized table in the current or specified catalog. When a materialized table is created, Flink performs the following steps:

  • Creates a backing Apache Kafka® topic for storing query results.

  • Registers the output schema in Schema Registry.

  • Starts a continuous query that processes data from the source tables and writes results to the backing topic.

Unlike a regular CREATE TABLE combined with an INSERT INTO statement, a materialized table is a single declarative object that owns both the table definition and the continuous query. This makes it possible to evolve the pipeline in place using CREATE OR ALTER MATERIALIZED TABLE.

If a materialized table with the same name already exists, the statement fails unless you specify IF NOT EXISTS.

Note

Materialized tables can only be created as new tables. Existing tables cannot be converted to materialized tables.

Explicit and inferred schemas

You can define the schema explicitly by listing column definitions, or let Flink infer the schema from the SELECT query.

  • Explicit schema: Specify columns, types, and optional constraints.

  • Inferred schema: Omit the column list and let Flink derive the schema from the query output.

START_MODE

The optional START_MODE clause controls how much historical data is processed when the materialized table is first created. If omitted, the default is RESUME_OR_FROM_BEGINNING, which processes all available historical data on initial creation.

For the full list of START_MODE values and detailed behavior, see START_MODE.

Usage

Create with explicit schema

CREATE MATERIALIZED TABLE high_value_orders (
  `order_id` STRING,
  `customer_id` INT,
  `price` DOUBLE
) AS
SELECT order_id, customer_id, price
FROM examples.marketplace.orders
WHERE price > 50.00;

Create with inferred schema

CREATE MATERIALIZED TABLE all_orders
AS
SELECT * FROM examples.marketplace.orders;

Create using WITH properties

CREATE MATERIALIZED TABLE orders_avro (
  `order_id` STRING,
  `customer_id` INT,
  `price` DOUBLE
)
WITH (
  'value.format' = 'avro-registry'
)
AS
SELECT order_id, customer_id, price
FROM examples.marketplace.orders;

Create with a JOIN

CREATE MATERIALIZED TABLE customer_orders (
  `order_id` STRING,
  `customer_name` STRING,
  `price` DOUBLE
) AS
SELECT o.order_id, c.name AS customer_name, o.price
FROM examples.marketplace.orders o
JOIN examples.marketplace.customers c ON o.customer_id = c.customer_id;

WITH options

Materialized tables support the same WITH options as CREATE TABLE for configuring the backing Kafka topic. Both the key and value of the expression key1=val1 are string literals.

You can change an existing materialized table’s property values by using ALTER MATERIALIZED TABLE.

changelog.mode

connector

error-handling.log.target

error-handling.mode

kafka.cleanup-policy

kafka.compaction.time

kafka.max-message-size

kafka.retention.size

kafka.retention.time

key.fields-prefix

key.format

key.format.schema-context

scan.bounded.mode

scan.bounded.timestamp-millis

scan.startup.mode

value.fields-include

value.format

value.format.schema-context

Limitations

For the full list of current limitations, see Materialized Tables limitations.