Table Operations in Confluent Manager for Apache Flink

Confluent Manager for Apache Flink supports the following table operation statements:

  • CREATE TABLE: Creates a table and a backing topic in the database’s Kafka cluster.

  • ALTER TABLE: Customizes a table (inferred or manually created) backed by an Kafka topic.

  • DROP TABLE: Drops a table (inferred or manually created) and deletes the backing topic in the database’s Kafka cluster.

Prerequisites

Before you can run table operation statements against a database, the following prerequisites must be met:

  • The environment from which you submit a table operation statement must have edit permissions for the database. For that, the environment name needs to be configured in the database’s ddlEnvironments list. For details on how to create and configure databases, see Manage Flink Catalogs and Databases for Confluent Manager for Apache Flink.

  • The database Kafka credentials mapped to the environment must allow creating and deleting topics on the Kafka cluster. The credentials also must allow reading from and writing to the metadata topic (_confluent_sr_catalog).

  • The catalog Schema Registry credentials mapped to the environment must allow reading, creating and deleting subjects and schemas.

Tables

Tables in Confluent Manager for Apache Flink are backed by Kafka topics. The schema of a table’s physical columns is stored in Schema Registry as key and value schemas. The key and value schemas are linked to the topic using the TopicNameStrategy, which derives the Schema Registry subject names from the topic name:

  • Value schema subject: {topic-name}-value

  • Key schema subject: {topic-name}-key

Tables can be configured with table options to control the behavior of Flink sources and sinks, such as how to read from or write to the Kafka topic.

Each table has a changelog mode that controls how Flink interprets the records in the Kafka topic.

Inferred Tables

All Kafka topics are automatically exposed as inferred tables. CMF implements the same inference mechanism as Flink SQL on Confluent Cloud. The key and value schemas are resolved using the TopicNameStrategy (subject names {topic-name}-key and {topic-name}-value). If no schema information can be derived from Schema Registry, the inferred table is configured with raw key format, value format, or both, which exposes the raw bytes as a single column of type VARBINARY.

Inferred tables have a default configuration that can be adjusted using ALTER TABLE statements. By default, each inferred table includes:

  • A $rowtime metadata column of type TIMESTAMP_WITH_LOCAL_TIME_ZONE(3) NOT NULL that exposes the Kafka record timestamp.

  • A default watermark strategy: WATERMARK FOR $rowtime AS $rowtime, which uses the Kafka record timestamp directly with no lateness allowance.

  • A changelog mode based on the topic’s cleanup policy:

    • Non-compacted topics (cleanup.policy=delete): changelog.mode = 'append'

    • Compacted topics (cleanup.policy=compact): changelog.mode = 'upsert' with a primary key automatically derived from the key schema columns.

Changelog Modes

The changelog mode controls how Flink interprets records in an Kafka topic. You can configure the changelog mode using the changelog.mode table option.

  • append: Treats the topic as an append-only log. Each record represents a new row in the table. This is the default mode for non-compacted topics.

  • upsert: Treats the topic as an upsert log for a table with a primary key. Each record represents either a new row (the record has a new key), an updated row (the record key is not new), or a deleted row (the record’s value is null) in the table. This is the default mode for compacted topics.

  • retract: Treats the topic as a changelog for a table. Each record is annotated with a flag marking the record as an insert, update, or delete operation that is applied to the table.

Table Schema

A table schema consists of physical columns, computed columns, metadata columns, a watermark strategy, and an optional primary key.

Physical columns

Physical columns represent the data stored in the Kafka topic. They are defined by the key and value schemas in Schema Registry. Physical columns cannot be added, modified, or dropped using ALTER TABLE. Schema evolution for physical columns must be performed through Schema Registry.

Computed columns

Computed columns are virtual columns that derive their values from expressions over other columns. They are evaluated at query time and are not persisted to the Kafka topic. Computed columns can be added using ALTER TABLE.

Metadata columns

Metadata columns expose Kafka record metadata as table columns. Metadata columns can be read-only or read-write. Read-only metadata columns must be declared as VIRTUAL to exclude them during INSERT INTO operations.

Every inferred table includes a default $rowtime metadata column of type TIMESTAMP_WITH_LOCAL_TIME_ZONE(3) NOT NULL that exposes the Kafka record timestamp.

The following additional metadata columns can be added using ALTER TABLE:

Metadata Key

SQL Type

Description

topic

STRING NOT NULL

The name of the Kafka topic.

partition

INT NOT NULL

The partition number of the record.

offset

BIGINT NOT NULL

The offset of the record in its partition.

timestamp

TIMESTAMP_WITH_LOCAL_TIME_ZONE(3) NOT NULL

The timestamp of the Kafka record.

timestamp-type

STRING NOT NULL

The timestamp type of the record.

leader-epoch

INT

The leader epoch of the record (nullable).

headers

MAP<STRING, BYTES> NOT NULL

The headers of the Kafka record.

Watermark strategy

Watermarks define how Flink tracks event time for time-based operations such as windowed aggregations. By default, inferred tables use the watermark strategy WATERMARK FOR $rowtime AS $rowtime, which uses the Kafka record timestamp directly with no lateness allowance.

You can set a custom watermark strategy to control the trade-off between latency and completeness. For details, see Modify watermark.

Primary key

You can declare a primary key on one or more columns. Primary keys in Flink are NOT ENFORCED, meaning they serve as hints for query optimization but are not validated at runtime. For tables with changelog.mode = 'upsert', a primary key is required and is automatically derived from the key schema columns for inferred tables.

Table Options

The following table options can be configured using CREATE TABLE and ALTER TABLE statements.

Changelog mode option

changelog.mode configures the changelog mode of a table. Possible values are append, upsert, and retract.

Key and value options

value.format

The serialization format for the Kafka message value. Possible values are avro-registry, json-registry, proto-registry, and raw.

key.format

The serialization format for the Kafka message key. Possible values are avro-registry, json-registry, proto-registry, and raw.

key.fields-prefix

A prefix to prepend to key field names to avoid naming conflicts with value fields.

value.fields-include

Controls which fields are included in the message value. Supported values:

  • ALL: All fields are included in the value (default).

  • EXCEPT_KEY: All fields except key fields are included in the value.

Scan options

scan.startup.mode

Determines where to start reading from the Kafka topic. Supported values:

  • earliest-offset: Start from the earliest available offset.

  • latest-offset: Start from the latest offset.

  • specific-offsets: Start from specific partition offsets.

  • timestamp: Start from a specific timestamp.

scan.startup.specific-offsets

Specifies the starting offset for each partition when scan.startup.mode is specific-offsets. Format: partition:0,offset:42;partition:1,offset:300.

scan.startup.timestamp-millis

Specifies the starting timestamp in milliseconds when scan.startup.mode is timestamp.

scan.bounded.mode

Configures bounded reading from a Kafka topic. Supported values:

  • unbounded: Read continuously (default).

  • latest-offset: Read up to the latest offset at the time the query starts.

  • specific-offsets: Read up to specific partition offsets.

  • timestamp: Read up to a specific timestamp.

scan.bounded.specific-offsets

Specifies the ending offset for each partition when scan.bounded.mode is specific-offsets. Format: partition:0,offset:42;partition:1,offset:300.

scan.bounded.timestamp-millis

Specifies the ending timestamp in milliseconds when scan.bounded.mode is timestamp.

kafka.consumer.isolation-level

The isolation level for consuming messages. Supported values:

  • read_committed: Only read committed transactional messages.

  • read_uncommitted: Read all messages, including uncommitted transactional messages.

Sink options

The following options control how messages are written to Kafka topics.

Note

The following sink options are specific to Confluent Platform and are not available in Confluent Cloud.

sink.buffer-flush.interval

The interval at which the sink flushes buffered records to Kafka.

sink.buffer-flush.max-rows

The maximum number of rows to buffer before flushing to Kafka.

kafka.producer.transaction.timeout.ms

The transaction timeout in milliseconds for the Kafka producer.

Create options

The following options configure the backing Kafka topic and can only be specified at table creation time.

Note

The following options can only be set during CREATE TABLE and cannot be modified with ALTER TABLE.

kafka.cleanup-policy

The cleanup policy for the Kafka topic. Possible values are delete, compact, and delete-compact.

kafka.compaction.time

The minimum compaction lag time for compacted topics.

kafka.max-message-size

The maximum message size for the Kafka topic.

kafka.producer.compression.type

The compression type for the Kafka producer. Possible values are gzip, snappy, lz4, zstd, and uncompressed.

kafka.replication-factor

The replication factor for the Kafka topic. A value of -1 uses the broker’s default replication factor.

kafka.retention.size

The retention size in bytes for the Kafka topic.

kafka.retention.time

The retention time for the Kafka topic.

Table hints

Table hints allow you to override table options on a per-query basis without modifying the persisted table configuration. Hints are specified inline in a query using the /*+ OPTIONS(...) */ syntax after the table name.

Table hints are useful for options that are not persisted by the catalog, such as setting the Kafka consumer group using properties.group.id. They can also be used to temporarily override persisted options for a specific query.

SELECT /*+ OPTIONS('scan.startup.mode' = 'group-offsets', 'properties.group.id' = 'my-group') */ *
FROM orders;

INSERT INTO output_table
SELECT * FROM orders /*+ OPTIONS('scan.startup.mode' = 'earliest-offset') */;

In the first example, the query reads from the orders table starting from the committed offsets of the consumer group my-group. In the second example, an INSERT INTO statement reads from orders starting from the earliest offset.

Note

The scan.startup.mode value group-offsets and the scan.bounded.mode value group-offsets require a consumer group ID. Because the catalog does not persist the properties.group.id option, you must set it using a table hint.

CREATE TABLE

CREATE TABLE creates a new table and a backing Kafka topic in the database’s Kafka cluster. The table’s physical column schemas are registered as key and value subjects in Schema Registry.

Important

A CREATE TABLE statement is only accepted if the statement’s environment has DDL permissions for the database. For details, see the Prerequisites section.

Syntax

CREATE TABLE [IF NOT EXISTS] [catalog_name.][db_name.]table_name (
  column_name column_type [, ...]
  [, PRIMARY KEY (column_name [, ...]) NOT ENFORCED]
  [, WATERMARK FOR rowtime_column AS watermark_strategy_expression]
)
[DISTRIBUTED BY (column_name [, ...]) INTO n BUCKETS]
[DISTRIBUTED BY HASH(column_name [, ...]) INTO n BUCKETS]
[DISTRIBUTED INTO n BUCKETS]
[WITH (
  key1='value1' [, key2='value2', ...]
)]
  • IF NOT EXISTS: Prevents an error if a table with the same name already exists.

  • PRIMARY KEY: Declares a primary key constraint. Required when using key.format or changelog.mode = 'upsert'.

  • DISTRIBUTED BY (columns) INTO n BUCKETS: Distributes records across partitions based on the specified columns, using the Kafka producer’s default partitioning strategy.

  • DISTRIBUTED BY HASH(columns) INTO n BUCKETS: Distributes records across partitions based on a hash of the specified columns.

  • DISTRIBUTED INTO n BUCKETS: Sets the number of partitions for the backing Kafka topic without specifying a distribution strategy. If the partition is not specified, it defaults to 6 buckets.

  • WITH: Specifies table options such as changelog.mode, key.format, and value.format.

Note

The only supported connector is confluent, which is used by default.

Example

Create a table with a primary key in upsert mode:

CREATE TABLE users (
  user_id STRING NOT NULL,
  name STRING,
  email STRING,
  PRIMARY KEY (user_id) NOT ENFORCED
)
DISTRIBUTED BY HASH(user_id) INTO 6 BUCKETS
WITH (
  'changelog.mode' = 'upsert',
  'key.format' = 'json-registry',
  'value.format' = 'avro-registry'
);

Create a simple append-mode table:

CREATE TABLE events (
  event_id STRING,
  event_type STRING,
  payload STRING
)
DISTRIBUTED INTO 12 BUCKETS
WITH (
  'value.format' = 'json-registry'
);

ALTER TABLE

ALTER TABLE customizes a table’s schema, watermark strategy, primary key, or table options.

Important

An ALTER TABLE statement does not affect running statements. Modifications to a table only take effect for statements that are started or restarted after the ALTER TABLE statement is executed.

Syntax

ALTER TABLE [catalog_name.][db_name.]table_name {
   ADD (column_name column_type METADATA [FROM metadata_key] VIRTUAL)
 | ADD (column_name AS computed_column_expression)
 | ADD PRIMARY KEY (column_name [, ...]) NOT ENFORCED
 | MODIFY PRIMARY KEY (column_name [, ...]) NOT ENFORCED
 | DROP PRIMARY KEY
 | DROP (column_name [, ...])
 | MODIFY WATERMARK FOR rowtime_column AS watermark_strategy_expression
 | DROP WATERMARK
 | SET (key1='value1' [, key2='value2', ...])
 | RESET (key1 [, key2, ...])
}

Add computed columns

Computed columns are virtual columns that derive their values from expressions over existing columns. They are evaluated at query time and do not require additional storage.

ALTER TABLE orders ADD total_value AS quantity * price;

Add metadata columns

Metadata columns expose Kafka message properties, such as headers, timestamps, or offsets, as table columns. You must declare read-only metadata columns as VIRTUAL to exclude them during INSERT INTO operations.

ALTER TABLE t ADD headers MAP<STRING, BYTES> METADATA VIRTUAL;
ALTER TABLE t ADD `my_offset` BIGINT METADATA FROM 'offset' VIRTUAL;

Add primary key

You can declare a primary key on one or more columns. Primary keys in Flink are NOT ENFORCED. It means they serve as hints for query optimization but are not validated at runtime.

ALTER TABLE t ADD PRIMARY KEY (k, v1) NOT ENFORCED;

Modify primary key

You can change the primary key of a table.

ALTER TABLE t MODIFY PRIMARY KEY (k, v2) NOT ENFORCED;

Drop primary key

Removes the primary key declaration from the table.

ALTER TABLE t DROP PRIMARY KEY;

Drop columns

You can drop computed columns and metadata columns that were previously added using ALTER TABLE. Physical columns cannot be dropped.

ALTER TABLE t DROP (total_value);
ALTER TABLE t DROP (headers, my_offset);

Modify watermark

Watermarks define how Flink tracks event time for time-based operations such as windowed aggregations. You can modify the watermark strategy to control the trade-off between latency and completeness.

ALTER TABLE clicks MODIFY WATERMARK FOR `$rowtime` AS `$rowtime` - INTERVAL '5' SECOND;

Drop watermark

Removes a custom watermark configuration, and restores the default behavior.

ALTER TABLE t DROP WATERMARK;

Set table options

Sets one or more table options. For the full list of supported options, see Table Options.

ALTER TABLE t SET ('changelog.mode' = 'upsert');

Reset table options

Resets one or more table options to their default values.

ALTER TABLE t RESET ('changelog.mode');

Example

The following examples show how to create ALTER TABLE statements using the REST API and the Confluent CLI.

Create an ALTER TABLE statement using the REST API:

curl -v -H "Content-Type: application/json" \
 -X POST http://cmf:8080/cmf/api/v1/environments/env-1/statements \
 -d @/path/to/alter-stmt.json

The following is an example JSON document for an ALTER TABLE statement resource:

{
  "apiVersion": "cmf.confluent.io/v1",
  "kind": "Statement",
  "metadata": {
    "name": "alter-orders"
  },
  "spec": {
    "statement": "ALTER TABLE orders ADD total_value AS quantity * price;",
    "properties": {
      "sql.current-catalog": "kafka-cat",
      "sql.current-database": "kafka-db"
    },
    "computePoolName": "pool",
    "stopped": false
  }
}

Create an ALTER TABLE statement using the Confluent CLI:

confluent --environment env-1 --compute-pool pool flink statement create alter-orders \
  --catalog kafka-cat --database kafka-db \
  --sql "ALTER TABLE orders ADD computed_total AS quantity * price;"

DROP TABLE

DROP TABLE drops a table and deletes the backing Kafka topic and the associated Schema Registry subjects ({topic-name}-key and {topic-name}-value).

Important

A DROP TABLE statement is only accepted if the statement’s environment has DDL permissions for the database. For details, see the Prerequisites section.

Dropping a table is irreversible. This permanently deletes the Kafka topic and all its data.

Syntax

DROP TABLE [IF EXISTS] [catalog_name.][db_name.]table_name
  • IF EXISTS: Prevents an error if the table does not exist.

Example

DROP TABLE users;

DROP TABLE IF EXISTS events;