CREATE MATERIALIZED TABLE Statement in Confluent Cloud for Apache Flink
Confluent Cloud for Apache Flink® enables creating materialized tables that combine a table definition with a continuous query in a single persistent object. Use materialized tables for long-running streaming queries that act as incremental materialized views. For more information, see Materialized Tables.
Syntax
CREATE MATERIALIZED TABLE [IF NOT EXISTS] [catalog_name.][db_name.]table_name
[(
{ <physical_column_definition> |
<metadata_column_definition> |
<computed_column_definition> }[ , ...n]
[ <watermark_definition> ]
[ <table_constraint> ][ , ...n]
)]
[COMMENT table_comment]
[DISTRIBUTED BY (column_name1, column_name2, ...) INTO n BUCKETS]
[WITH (key1=value1, key2=value2, ...)]
[START_MODE = <start_mode_value>]
AS <select_query>
Description
The CREATE MATERIALIZED TABLE statement creates a new persistent materialized table in the current or specified catalog. When a materialized table is created, Flink performs the following steps:
Creates a backing Apache Kafka® topic for storing query results.
Registers the output schema in Schema Registry.
Starts a continuous query that processes data from the source tables and writes results to the backing topic.
Unlike a regular CREATE TABLE combined with an INSERT INTO statement, a materialized table is a single declarative object that owns both the table definition and the continuous query. This makes it possible to evolve the pipeline in place using CREATE OR ALTER MATERIALIZED TABLE.
If a materialized table with the same name already exists, the statement fails unless you specify IF NOT EXISTS.
Note
Materialized tables can only be created as new tables. Existing tables cannot be converted to materialized tables.
Explicit and inferred schemas
You can define the schema explicitly by listing column definitions, or let Flink infer the schema from the SELECT query.
Explicit schema: Specify columns, types, and optional constraints.
Inferred schema: Omit the column list and let Flink derive the schema from the query output.
START_MODE
The optional START_MODE clause controls how much historical data is processed when the materialized table is first created. If omitted, the default is RESUME_OR_FROM_BEGINNING, which processes all available historical data on initial creation.
For the full list of START_MODE values and detailed behavior, see START_MODE.
Usage
Create with explicit schema
CREATE MATERIALIZED TABLE high_value_orders (
`order_id` STRING,
`customer_id` INT,
`price` DOUBLE
) AS
SELECT order_id, customer_id, price
FROM examples.marketplace.orders
WHERE price > 50.00;
Create with inferred schema
CREATE MATERIALIZED TABLE all_orders
AS
SELECT * FROM examples.marketplace.orders;
Create using WITH properties
CREATE MATERIALIZED TABLE orders_avro (
`order_id` STRING,
`customer_id` INT,
`price` DOUBLE
)
WITH (
'value.format' = 'avro-registry'
)
AS
SELECT order_id, customer_id, price
FROM examples.marketplace.orders;
Create with a JOIN
CREATE MATERIALIZED TABLE customer_orders (
`order_id` STRING,
`customer_name` STRING,
`price` DOUBLE
) AS
SELECT o.order_id, c.name AS customer_name, o.price
FROM examples.marketplace.orders o
JOIN examples.marketplace.customers c ON o.customer_id = c.customer_id;
WITH options
Materialized tables support the same WITH options as CREATE TABLE for configuring the backing Kafka topic. Both the key and value of the expression key1=val1 are string literals.
You can change an existing materialized table’s property values by using ALTER MATERIALIZED TABLE.
Limitations
For the full list of current limitations, see Materialized Tables limitations.