Table Operations in Confluent Manager for Apache Flink
Confluent Manager for Apache Flink supports the following table operation statements:
CREATE TABLE: Creates a table and a backing topic in the database’s Kafka cluster.ALTER TABLE: Customizes a table (inferred or manually created) backed by an Kafka topic.DROP TABLE: Drops a table (inferred or manually created) and deletes the backing topic in the database’s Kafka cluster.
Prerequisites
Before you can run table operation statements against a database, the following prerequisites must be met:
The environment from which you submit a table operation statement must have edit permissions for the database. For that, the environment name needs to be configured in the database’s
ddlEnvironmentslist. For details on how to create and configure databases, see Manage Flink Catalogs and Databases for Confluent Manager for Apache Flink.The database Kafka credentials mapped to the environment must allow creating and deleting topics on the Kafka cluster. The credentials also must allow reading from and writing to the metadata topic (
_confluent_sr_catalog).The catalog Schema Registry credentials mapped to the environment must allow reading, creating and deleting subjects and schemas.
Tables
Tables in Confluent Manager for Apache Flink are backed by Kafka topics. The schema of a table’s physical columns is stored in Schema Registry as key and value schemas. The key and value schemas are linked to the topic using the TopicNameStrategy, which derives the Schema Registry subject names from the topic name:
Value schema subject:
{topic-name}-valueKey schema subject:
{topic-name}-key
Tables can be configured with table options to control the behavior of Flink sources and sinks, such as how to read from or write to the Kafka topic.
Each table has a changelog mode that controls how Flink interprets the records in the Kafka topic.
Inferred Tables
All Kafka topics are automatically exposed as inferred tables. CMF implements the same inference mechanism as Flink SQL on Confluent Cloud. The key and value schemas are resolved using the TopicNameStrategy (subject names {topic-name}-key and {topic-name}-value). If no schema information can be derived from Schema Registry, the inferred table is configured with raw key format, value format, or both, which exposes the raw bytes as a single column of type VARBINARY.
Inferred tables have a default configuration that can be adjusted using ALTER TABLE statements. By default, each inferred table includes:
A
$rowtimemetadata column of typeTIMESTAMP_WITH_LOCAL_TIME_ZONE(3) NOT NULLthat exposes the Kafka record timestamp.A default watermark strategy:
WATERMARK FOR $rowtime AS $rowtime, which uses the Kafka record timestamp directly with no lateness allowance.A changelog mode based on the topic’s cleanup policy:
Non-compacted topics (
cleanup.policy=delete):changelog.mode = 'append'Compacted topics (
cleanup.policy=compact):changelog.mode = 'upsert'with a primary key automatically derived from the key schema columns.
Changelog Modes
The changelog mode controls how Flink interprets records in an Kafka topic. You can configure the changelog mode using the changelog.mode table option.
append: Treats the topic as an append-only log. Each record represents a new row in the table. This is the default mode for non-compacted topics.upsert: Treats the topic as an upsert log for a table with a primary key. Each record represents either a new row (the record has a new key), an updated row (the record key is not new), or a deleted row (the record’s value isnull) in the table. This is the default mode for compacted topics.retract: Treats the topic as a changelog for a table. Each record is annotated with a flag marking the record as an insert, update, or delete operation that is applied to the table.
Table Schema
A table schema consists of physical columns, computed columns, metadata columns, a watermark strategy, and an optional primary key.
Physical columns
Physical columns represent the data stored in the Kafka topic. They are defined by the key and value schemas in Schema Registry. Physical columns cannot be added, modified, or dropped using ALTER TABLE. Schema evolution for physical columns must be performed through Schema Registry.
Computed columns
Computed columns are virtual columns that derive their values from expressions over other columns. They are evaluated at query time and are not persisted to the Kafka topic. Computed columns can be added using ALTER TABLE.
Metadata columns
Metadata columns expose Kafka record metadata as table columns. Metadata columns can be read-only or read-write. Read-only metadata columns must be declared as VIRTUAL to exclude them during INSERT INTO operations.
Every inferred table includes a default $rowtime metadata column of type TIMESTAMP_WITH_LOCAL_TIME_ZONE(3) NOT NULL that exposes the Kafka record timestamp.
The following additional metadata columns can be added using ALTER TABLE:
Metadata Key | SQL Type | Description |
|---|---|---|
|
| The name of the Kafka topic. |
|
| The partition number of the record. |
|
| The offset of the record in its partition. |
|
| The timestamp of the Kafka record. |
|
| The timestamp type of the record. |
|
| The leader epoch of the record (nullable). |
|
| The headers of the Kafka record. |
Watermark strategy
Watermarks define how Flink tracks event time for time-based operations such as windowed aggregations. By default, inferred tables use the watermark strategy WATERMARK FOR $rowtime AS $rowtime, which uses the Kafka record timestamp directly with no lateness allowance.
You can set a custom watermark strategy to control the trade-off between latency and completeness. For details, see Modify watermark.
Primary key
You can declare a primary key on one or more columns. Primary keys in Flink are NOT ENFORCED, meaning they serve as hints for query optimization but are not validated at runtime. For tables with changelog.mode = 'upsert', a primary key is required and is automatically derived from the key schema columns for inferred tables.
Table Options
The following table options can be configured using CREATE TABLE and ALTER TABLE statements.
Changelog mode option
changelog.mode configures the changelog mode of a table. Possible values are append, upsert, and retract.
Key and value options
value.formatThe serialization format for the Kafka message value. Possible values are
avro-registry,json-registry,proto-registry, andraw.key.formatThe serialization format for the Kafka message key. Possible values are
avro-registry,json-registry,proto-registry, andraw.key.fields-prefixA prefix to prepend to key field names to avoid naming conflicts with value fields.
value.fields-includeControls which fields are included in the message value. Supported values:
ALL: All fields are included in the value (default).EXCEPT_KEY: All fields except key fields are included in the value.
Scan options
scan.startup.modeDetermines where to start reading from the Kafka topic. Supported values:
earliest-offset: Start from the earliest available offset.latest-offset: Start from the latest offset.specific-offsets: Start from specific partition offsets.timestamp: Start from a specific timestamp.
scan.startup.specific-offsetsSpecifies the starting offset for each partition when
scan.startup.modeisspecific-offsets. Format:partition:0,offset:42;partition:1,offset:300.scan.startup.timestamp-millisSpecifies the starting timestamp in milliseconds when
scan.startup.modeistimestamp.scan.bounded.modeConfigures bounded reading from a Kafka topic. Supported values:
unbounded: Read continuously (default).latest-offset: Read up to the latest offset at the time the query starts.specific-offsets: Read up to specific partition offsets.timestamp: Read up to a specific timestamp.
scan.bounded.specific-offsetsSpecifies the ending offset for each partition when
scan.bounded.modeisspecific-offsets. Format:partition:0,offset:42;partition:1,offset:300.scan.bounded.timestamp-millisSpecifies the ending timestamp in milliseconds when
scan.bounded.modeistimestamp.kafka.consumer.isolation-levelThe isolation level for consuming messages. Supported values:
read_committed: Only read committed transactional messages.read_uncommitted: Read all messages, including uncommitted transactional messages.
Sink options
The following options control how messages are written to Kafka topics.
Note
The following sink options are specific to Confluent Platform and are not available in Confluent Cloud.
sink.buffer-flush.intervalThe interval at which the sink flushes buffered records to Kafka.
sink.buffer-flush.max-rowsThe maximum number of rows to buffer before flushing to Kafka.
kafka.producer.transaction.timeout.msThe transaction timeout in milliseconds for the Kafka producer.
Create options
The following options configure the backing Kafka topic and can only be specified at table creation time.
Note
The following options can only be set during CREATE TABLE and cannot be modified with ALTER TABLE.
kafka.cleanup-policyThe cleanup policy for the Kafka topic. Possible values are
delete,compact, anddelete-compact.kafka.compaction.timeThe minimum compaction lag time for compacted topics.
kafka.max-message-sizeThe maximum message size for the Kafka topic.
kafka.producer.compression.typeThe compression type for the Kafka producer. Possible values are
gzip,snappy,lz4,zstd, anduncompressed.kafka.replication-factorThe replication factor for the Kafka topic. A value of
-1uses the broker’s default replication factor.kafka.retention.sizeThe retention size in bytes for the Kafka topic.
kafka.retention.timeThe retention time for the Kafka topic.
Table hints
Table hints allow you to override table options on a per-query basis without modifying the persisted table configuration. Hints are specified inline in a query using the /*+ OPTIONS(...) */ syntax after the table name.
Table hints are useful for options that are not persisted by the catalog, such as setting the Kafka consumer group using properties.group.id. They can also be used to temporarily override persisted options for a specific query.
SELECT /*+ OPTIONS('scan.startup.mode' = 'group-offsets', 'properties.group.id' = 'my-group') */ *
FROM orders;
INSERT INTO output_table
SELECT * FROM orders /*+ OPTIONS('scan.startup.mode' = 'earliest-offset') */;
In the first example, the query reads from the orders table starting from the committed offsets of the consumer group my-group. In the second example, an INSERT INTO statement reads from orders starting from the earliest offset.
Note
The scan.startup.mode value group-offsets and the scan.bounded.mode value group-offsets require a consumer group ID. Because the catalog does not persist the properties.group.id option, you must set it using a table hint.
CREATE TABLE
CREATE TABLE creates a new table and a backing Kafka topic in the database’s Kafka cluster. The table’s physical column schemas are registered as key and value subjects in Schema Registry.
Important
A CREATE TABLE statement is only accepted if the statement’s environment has DDL permissions for the database. For details, see the Prerequisites section.
Syntax
CREATE TABLE [IF NOT EXISTS] [catalog_name.][db_name.]table_name (
column_name column_type [, ...]
[, PRIMARY KEY (column_name [, ...]) NOT ENFORCED]
[, WATERMARK FOR rowtime_column AS watermark_strategy_expression]
)
[DISTRIBUTED BY (column_name [, ...]) INTO n BUCKETS]
[DISTRIBUTED BY HASH(column_name [, ...]) INTO n BUCKETS]
[DISTRIBUTED INTO n BUCKETS]
[WITH (
key1='value1' [, key2='value2', ...]
)]
IF NOT EXISTS: Prevents an error if a table with the same name already exists.PRIMARY KEY: Declares a primary key constraint. Required when usingkey.formatorchangelog.mode = 'upsert'.DISTRIBUTED BY (columns) INTO n BUCKETS: Distributes records across partitions based on the specified columns, using the Kafka producer’s default partitioning strategy.DISTRIBUTED BY HASH(columns) INTO n BUCKETS: Distributes records across partitions based on a hash of the specified columns.DISTRIBUTED INTO n BUCKETS: Sets the number of partitions for the backing Kafka topic without specifying a distribution strategy. If the partition is not specified, it defaults to 6 buckets.WITH: Specifies table options such aschangelog.mode,key.format, andvalue.format.
Note
The only supported connector is confluent, which is used by default.
Example
Create a table with a primary key in upsert mode:
CREATE TABLE users (
user_id STRING NOT NULL,
name STRING,
email STRING,
PRIMARY KEY (user_id) NOT ENFORCED
)
DISTRIBUTED BY HASH(user_id) INTO 6 BUCKETS
WITH (
'changelog.mode' = 'upsert',
'key.format' = 'json-registry',
'value.format' = 'avro-registry'
);
Create a simple append-mode table:
CREATE TABLE events (
event_id STRING,
event_type STRING,
payload STRING
)
DISTRIBUTED INTO 12 BUCKETS
WITH (
'value.format' = 'json-registry'
);
ALTER TABLE
ALTER TABLE customizes a table’s schema, watermark strategy, primary key, or table options.
Important
An ALTER TABLE statement does not affect running statements. Modifications to a table only take effect for statements that are started or restarted after the ALTER TABLE statement is executed.
Syntax
ALTER TABLE [catalog_name.][db_name.]table_name {
ADD (column_name column_type METADATA [FROM metadata_key] VIRTUAL)
| ADD (column_name AS computed_column_expression)
| ADD PRIMARY KEY (column_name [, ...]) NOT ENFORCED
| MODIFY PRIMARY KEY (column_name [, ...]) NOT ENFORCED
| DROP PRIMARY KEY
| DROP (column_name [, ...])
| MODIFY WATERMARK FOR rowtime_column AS watermark_strategy_expression
| DROP WATERMARK
| SET (key1='value1' [, key2='value2', ...])
| RESET (key1 [, key2, ...])
}
Add computed columns
Computed columns are virtual columns that derive their values from expressions over existing columns. They are evaluated at query time and do not require additional storage.
ALTER TABLE orders ADD total_value AS quantity * price;
Add metadata columns
Metadata columns expose Kafka message properties, such as headers, timestamps, or offsets, as table columns. You must declare read-only metadata columns as VIRTUAL to exclude them during INSERT INTO operations.
ALTER TABLE t ADD headers MAP<STRING, BYTES> METADATA VIRTUAL;
ALTER TABLE t ADD `my_offset` BIGINT METADATA FROM 'offset' VIRTUAL;
Add primary key
You can declare a primary key on one or more columns. Primary keys in Flink are NOT ENFORCED. It means they serve as hints for query optimization but are not validated at runtime.
ALTER TABLE t ADD PRIMARY KEY (k, v1) NOT ENFORCED;
Modify primary key
You can change the primary key of a table.
ALTER TABLE t MODIFY PRIMARY KEY (k, v2) NOT ENFORCED;
Drop primary key
Removes the primary key declaration from the table.
ALTER TABLE t DROP PRIMARY KEY;
Drop columns
You can drop computed columns and metadata columns that were previously added using ALTER TABLE. Physical columns cannot be dropped.
ALTER TABLE t DROP (total_value);
ALTER TABLE t DROP (headers, my_offset);
Modify watermark
Watermarks define how Flink tracks event time for time-based operations such as windowed aggregations. You can modify the watermark strategy to control the trade-off between latency and completeness.
ALTER TABLE clicks MODIFY WATERMARK FOR `$rowtime` AS `$rowtime` - INTERVAL '5' SECOND;
Drop watermark
Removes a custom watermark configuration, and restores the default behavior.
ALTER TABLE t DROP WATERMARK;
Set table options
Sets one or more table options. For the full list of supported options, see Table Options.
ALTER TABLE t SET ('changelog.mode' = 'upsert');
Reset table options
Resets one or more table options to their default values.
ALTER TABLE t RESET ('changelog.mode');
Example
The following examples show how to create ALTER TABLE statements using the REST API and the Confluent CLI.
Create an ALTER TABLE statement using the REST API:
curl -v -H "Content-Type: application/json" \
-X POST http://cmf:8080/cmf/api/v1/environments/env-1/statements \
-d @/path/to/alter-stmt.json
The following is an example JSON document for an ALTER TABLE statement resource:
{
"apiVersion": "cmf.confluent.io/v1",
"kind": "Statement",
"metadata": {
"name": "alter-orders"
},
"spec": {
"statement": "ALTER TABLE orders ADD total_value AS quantity * price;",
"properties": {
"sql.current-catalog": "kafka-cat",
"sql.current-database": "kafka-db"
},
"computePoolName": "pool",
"stopped": false
}
}
Create an ALTER TABLE statement using the Confluent CLI:
confluent --environment env-1 --compute-pool pool flink statement create alter-orders \
--catalog kafka-cat --database kafka-db \
--sql "ALTER TABLE orders ADD computed_total AS quantity * price;"
DROP TABLE
DROP TABLE drops a table and deletes the backing Kafka topic and the associated Schema Registry subjects ({topic-name}-key and {topic-name}-value).
Important
A DROP TABLE statement is only accepted if the statement’s environment has DDL permissions for the database. For details, see the Prerequisites section.
Dropping a table is irreversible. This permanently deletes the Kafka topic and all its data.
Syntax
DROP TABLE [IF EXISTS] [catalog_name.][db_name.]table_name
IF EXISTS: Prevents an error if the table does not exist.
Example
DROP TABLE users;
DROP TABLE IF EXISTS events;