ALTER TABLE Statement in Confluent Cloud for Apache Flink¶
Confluent Cloud for Apache Flink®️ enables changing some properties of an existing table.
Syntax¶
-- Modify the default watermark Add a column to the existing table
ALTER TABLE [catalog_name].[db_name].[table_name] MODIFY WATERMARK FOR
column-name1 AS column-name1;
-- Set a property on the table
ALTER TABLE [catalog_name].[db_name].[table_name] SET ('key1'='value1', 'key2'='value2', ...)
-- Add a metadata column
ALTER TABLE [catalog_name].[db_name].[table_name] ADD
(metadata_column_name metadata_column_type METADATA VIRTUAL)
Description¶
Change or remove the watermark, or change the properties of a table.
ALTER TABLE is used to modify the structure or properties of an existing table.
Modify watermark¶
In the Confluent CLI or in a Cloud Console workspace, run the following commands to modify the default watermark strategy.
Create a table.
CREATE TABLE orders ( `user` BIGINT NOT NULL, product STRING, amount INT, ts TIMESTAMP(3), PRIMARY KEY(`user`) NOT ENFORCED );
Your output should resemble:
[INFO] Execute statement succeed.
View the current table schema and metadata.
DESCRIBE `orders`;
Your output should resemble:
+-------------+--------------+----------+-------------+ | Column Name | Data Type | Nullable | Extras | +-------------+--------------+----------+-------------+ | user | BIGINT | NOT NULL | PRIMARY KEY | | product | STRING | NULL | | | amount | INT | NULL | | | ts | TIMESTAMP(3) | NULL | | +-------------+--------------+----------+-------------+
Change the watermark strategy of the table.
ALTER TABLE `orders` MODIFY WATERMARK FOR `ts` AS `ts`;
Your output should resemble:
Statement phase is COMPLETED.
Check the new table schema and metadata.
DESCRIBE `orders`;
Your output should resemble:
+-------------+------------------------+----------+-------------------+ | Column Name | Data Type | Nullable | Extras | +-------------+------------------------+----------+-------------------+ | user | BIGINT | NOT NULL | PRIMARY KEY | | product | STRING | NULL | | | amount | INT | NULL | | | ts | TIMESTAMP(3) *ROWTIME* | NULL | WATERMARK AS `ts` | +-------------+------------------------+----------+-------------------+
Drop watermark¶
In the Confluent CLI, or in a Cloud Console workspace, run the following commands to remove your custom watermark. This restores the default watermark strategy.
View the current table schema and metadata.
DESCRIBE `orders`;
Your output should resemble:
+-------------+------------------------+----------+-------------------+ | Column Name | Data Type | Nullable | Extras | +-------------+------------------------+----------+-------------------+ | user | BIGINT | NOT NULL | PRIMARY KEY | | product | STRING | NULL | | | amount | INT | NULL | | | ts | TIMESTAMP(3) *ROWTIME* | NULL | WATERMARK AS `ts` | +-------------+------------------------+----------+-------------------+
Remove the watermark strategy of the table.
ALTER TABLE `orders` DROP WATERMARK;
Your output should resemble:
Statement phase is COMPLETED.
Check the new table schema and metadata.
DESCRIBE `orders`;
Your output should resemble:
+-------------+--------------+----------+-------------+ | Column Name | Data Type | Nullable | Extras | +-------------+--------------+----------+-------------+ | user | BIGINT | NOT NULL | PRIMARY KEY | | product | STRING | NULL | | | amount | INT | NULL | | | ts | TIMESTAMP(3) | NULL | | +-------------+--------------+----------+-------------+
Set properties¶
Set one or more properties in the specified table. If a property has been set previously, it overrides the previous value with the new value.
In the Confluent CLI, run the following commands to set properties for your table.
View the current table properties.
SHOW CREATE TABLE `orders`;
Your output should resemble:
+----------------------------------------------------------------------+ | SHOW CREATE TABLE | +----------------------------------------------------------------------+ | CREATE TABLE `catalog`.`database`.`orders` ( | | `user` BIGINT NOT NULL, | | `product` VARCHAR(2147483647), | | `amount` INT, | | `ts` TIMESTAMP(3), | | CONSTRAINT `PK_3599338` PRIMARY KEY (`user`) NOT ENFORCED | | ) WITH ( | | 'changelog.mode' = 'upsert', | | 'connector' = 'confluent', | | 'kafka.cleanup-policy' = 'delete', | | 'kafka.max-message-size' = '2097164 bytes', | | 'kafka.partitions' = '6', | | 'kafka.retention.size' = '0 bytes', | | 'kafka.retention.time' = '604800000 ms', | | 'key.format' = 'avro-registry', | | 'scan.bounded.mode' = 'unbounded', | | 'scan.startup.mode' = 'earliest-offset', | | 'value.format' = 'avro-registry' | | ) | | | +----------------------------------------------------------------------+
Set the startup mode to “latest-offset”
ALTER TABLE `orders` SET ('scan.startup.mode' = 'latest-offset');
Your output should resemble:
Statement phase is COMPLETED.
Check the new table properties.
SHOW CREATE TABLE `orders`;
Your output should resemble:
+----------------------------------------------------------------------+ | SHOW CREATE TABLE | +----------------------------------------------------------------------+ | CREATE TABLE `catalog`.`database`.`orders` ( | | `user` BIGINT NOT NULL, | | `product` VARCHAR(2147483647), | | `amount` INT, | | `ts` TIMESTAMP(3), | | CONSTRAINT `PK_3599338` PRIMARY KEY (`user`) NOT ENFORCED | | ) WITH ( | | 'changelog.mode' = 'upsert', | | 'connector' = 'confluent', | | 'kafka.cleanup-policy' = 'delete', | | 'kafka.max-message-size' = '2097164 bytes', | | 'kafka.partitions' = '6', | | 'kafka.retention.size' = '0 bytes', | | 'kafka.retention.time' = '604800000 ms', | | 'key.format' = 'avro-registry', | | 'scan.bounded.mode' = 'unbounded', | | 'scan.startup.mode' = 'latest-offset', | | 'value.format' = 'avro-registry' | | ) | | | +----------------------------------------------------------------------+
Add metadata columns¶
You can use ALTER TABLE to add additional metadata columns to your table schema. For a full list of supported metadata columns for Apache Kafka®, see Metadata columns.
View the current schema.
DESCRIBE TABLE `orders`;
Your output should resemble:
+-------------+--------------+----------+-------------+ | Column Name | Data Type | Nullable | Extras | +-------------+--------------+----------+-------------+ | user | BIGINT | NOT NULL | PRIMARY KEY | | product | STRING | NULL | | | amount | INT | NULL | | | ts | TIMESTAMP(3) | NULL | | +-------------+--------------+----------+-------------+
Run the following statement to add the Kafka partition as a metadata column:
ALTER TABLE `orders` ADD ( `partition` BIGINT METADATA VIRTUAL);
View the new schema.
DESCRIBE TABLE `orders`;
Your output should resemble:
+-------------+--------------+----------+------------------+ | Column Name | Data Type | Nullable | Extras | +-------------+--------------+----------+------------------+ | user | BIGINT | NOT NULL | PRIMARY KEY | | product | STRING | NULL | | | amount | INT | NULL | | | ts | TIMESTAMP(3) | NULL | | | partition | BIGINT | NULL | METADATA VIRTUAL | +-------------+--------------+----------+------------------+
Add headers as a metadata column¶
You can get the headers of a Kafka record as a map of raw bytes by adding a
headers
virtual metadata column.
Run the following statement to add the Kafka partition as a metadata column:
ALTER TABLE `orders` ADD ( `headers` MAP<BYTES,BYTES> METADATA VIRTUAL);
View the new schema.
DESCRIBE `orders`;
Your output should resemble:
+-------------+-------------------+----------+-------------------------+ | Column Name | Data Type | Nullable | Extras | +-------------+-------------------+----------+-------------------------+ | user | BIGINT | NOT NULL | PRIMARY KEY, BUCKET KEY | | product | STRING | NULL | | | amount | INT | NULL | | | ts | TIMESTAMP(3) | NULL | | | headers | MAP<BYTES, BYTES> | NULL | METADATA VIRTUAL | +-------------+-------------------+----------+-------------------------+
Examples¶
The following examples show frequently encountered scenarios with ALTER TABLE.
Define a watermark for perfectly ordered data¶
Flink guarantees that rows are always emitted before the watermark is generated. The following statements ensure that for perfectly ordered events, meaning events without time-skew, a watermark can be equal to the timestamp or 1 ms less than the timestamp.
CREATE TABLE t_perfect_watermark (i INT);
-- If multiple events can have the same timestamp.
ALTER TABLE t_perfect_watermark
MODIFY WATERMARK FOR $rowtime AS $rowtime - INTERVAL '0.001' SECOND;
-- If a single event can have the timestamp.
ALTER TABLE t_perfect_watermark
MODIFY WATERMARK FOR $rowtime AS $rowtime;
Read and/or write Kafka headers¶
-- Create example topic
CREATE TABLE t_headers (i INT);
-- For read-only (virtual)
ALTER TABLE t_headers ADD headers MAP<BYTES, BYTES> METADATA VIRTUAL;
-- For read and write (persisted). Column becomes mandatory in INSERT INTO.
ALTER TABLE t_headers MODIFY headers MAP<BYTES, BYTES> METADATA;
-- Use implicit casting (origin is always MAP<BYTES, BYTES>)
ALTER TABLE t_headers MODIFY headers MAP<STRING, STRING> METADATA;
-- Insert and read
INSERT INTO t_headers SELECT 42, MAP['k1', 'v1', 'k2', 'v2'];
SELECT * FROM t_headers;
- Properties
- The metadata key is
headers
. If you don’t want to name the column this way, use:other_name MAP<BYTES, BYTES> METADATA FROM 'headers' VIRTUAL
. - Keys of headers must be unique. Currently, multi-key headers are not supported.
- The metadata key is
Read topic from specific offsets¶
-- Create example topic with 1 partition filled with values
CREATE TABLE t_specific_offsets (i INT) DISTRIBUTED INTO 1 BUCKETS;
INSERT INTO t_specific_offsets VALUES (1), (2), (3), (4), (5);
-- Returns 1, 2, 3, 4, 5
SELECT * FROM t_specific_offsets;
-- Changes the scan range
ALTER TABLE t_specific_offsets SET (
'scan.startup.mode' = 'specific-offsets',
'scan.startup.specific-offsets' = 'partition:0,offset:3'
);
-- Returns 4, 5
SELECT * FROM t_specific_offsets;
- Properties
scan.startup.mode
andscan.bounded.mode
control which range in the changelog (Kafka topic) to read.scan.startup.specific-offsets
andscan.bounded.specific-offsets
define offsets per partition.- In the example, only 1 partition is used. For multiple partitions, use the following syntax:
'scan.startup.specific-offsets' = 'partition:0,offset:3; partition:1,offset:42; partition:2,offset:0'
Debug “no output” and no watermark cases¶
The root cause for most “no output” cases is that a time-based operation, for example, TUMBLE, MATCH_RECOGNIZE, and FOR SYSTEM_TIME AS OF, did not receive recent enough watermarks.
The current time of an operator is calculated by the minimum watermark of all inputs, meaning across all tables/topics and their partitions.
If one partition does not emit a watermark, it can affect the entire pipeline.
The following statements may be helpful for debugging issues related to watermarks.
-- example table
CREATE TABLE t_watermark_debugging (k INT, s STRING)
DISTRIBUTED BY (k) INTO 4 BUCKETS;
-- Each value lands in a separate Kafka partition (out of 4).
-- Leave out values to see missing watermarks.
INSERT INTO t_watermark_debugging
VALUES (1, 'Bob'), (2, 'Alice'), (8, 'John'), (15, 'David');
-- If ROW_NUMBER doesn't show results, it's clearly a watermark issue.
SELECT ROW_NUMBER() OVER (ORDER BY $rowtime ASC) AS `number`, *
FROM t_watermark_debugging;
-- Add partition information as metadata column
ALTER TABLE t_watermark_debugging ADD part INT METADATA FROM 'partition' VIRTUAL;
-- Use the CURRENT_WATERMARK() function to check which watermark is calculated
SELECT
*,
part AS `Row Partition`,
$rowtime AS `Row Timestamp`,
CURRENT_WATERMARK($rowtime) AS `Operator Watermark`
FROM t_watermark_debugging;
-- Visualize the highest timestamp per Kafka partition
-- Due to the table declaration (with 4 buckets), this query should show 4 rows.
-- If not, the missing partitions might be the cause for watermark issues.
SELECT part AS `Partition`, MAX($rowtime) AS `Max Timestamp in Partition`
FROM t_watermark_debugging
GROUP BY part;
-- A workaround could be to not use the system watermark:
ALTER TABLE t_watermark_debugging
MODIFY WATERMARK FOR $rowtime AS $rowtime - INTERVAL '2' SECOND;
-- Or for perfect input data:
ALTER TABLE t_watermark_debugging
MODIFY WATERMARK FOR $rowtime AS $rowtime - INTERVAL '0.001' SECOND;
-- Add "fresh" data while the above statements with
-- ROW_NUMBER() or CURRENT_WATERMARK() are running.
INSERT INTO t_watermark_debugging VALUES
(1, 'Fresh Bob'),
(2, 'Fresh Alice'),
(8, 'Fresh John'),
(15, 'Fresh David');
The debugging examples above won’t solve everything but may help in finding the root cause.
The system watermark strategy is smart and excludes idle Kafka partitions from the watermark calculation after some time, but at least one partition must produce new data for the “logical clock” with watermarks.
Typically, root causes are:
- Idle Kafka partitions
- No data in Kafka partitions
- Not enough data in Kafka partitions
- Watermark strategy is too conservative
- No fresh data after warm up with historical data for progressing the logical clock
Handle idle partitions for missing watermarks¶
Idle partitions often cause missing watermarks. Also, no data in a partition or infrequent data can be a root cause.
-- Create a topic with 4 partitions.
CREATE TABLE t_watermark_idle (k INT, s STRING)
DISTRIBUTED BY (k) INTO 4 BUCKETS;
-- Avoid the "not enough data" problem by using a custom watermark.
-- The watermark strategy is still coarse-grained enough for this example.
ALTER TABLE t_watermark_idle
MODIFY WATERMARK FOR $rowtime AS $rowtime - INTERVAL '2' SECONDS;
-- Each value lands in a separate Kafka partition, and partition 1 is empty.
INSERT INTO t_watermark_idle
VALUES
(1, 'Bob in partition 0'),
(2, 'Alice in partition 3'),
(8, 'John in partition 2');
-- Thread 1: Start a streaming job.
SELECT ROW_NUMBER() OVER (ORDER BY $rowtime ASC) AS `number`, *
FROM t_watermark_idle;
-- Thread 2: Insert some data immediately -> Thread 1 still without results.
INSERT INTO t_watermark_idle
VALUES (1, 'Another Bob in partition 0 shortly after');
-- Thread 2: Insert some data after 15s -> Thread 1 should show results.
INSERT INTO t_watermark_idle
VALUES (1, 'Another Bob in partition 0 after 15s')
Within the first 15 seconds, all partitions contribute to the watermark calculation, so the first INSERT INTO has no effect because partition 1 is still empty.
After 15 seconds, all partitions are marked as idle. No partition contributes to the watermark calculation. But when the second INSERT INTO is executed, it becomes the main driving partition for the logical clock.
The global watermark jumps to “second INSERT INTO - 2 seconds”.
In the following code, the sql.tables.scan.idle-timeout
configuration overrides
the default idle-detection algorithm, so even an immediate INSERT INTO can be
the main driving partition for the logical clock, because all other partitions
are marked as idle after 1 second.
-- Thread 1: Start a streaming job.
-- Lower the idle timeout further.
SET 'sql.tables.scan.idle-timeout' = '1s';
SELECT ROW_NUMBER() OVER (ORDER BY $rowtime ASC) AS `number`, *
FROM t_watermark_idle;
-- Thread 2: Insert some data immediately -> Thread 1 should show results.
INSERT INTO t_watermark_idle
VALUES (1, 'Another Bob in partition 0 shortly after');
Inferred tables schema evolution¶
You can use the ALTER TABLE statement to evolve schemas for inferred tables.
The following examples show output from the SHOW CREATE TABLE statement called on the resulting table.
Schema Registry columns overlap with computed/metadata columns¶
For the following value schema in Schema Registry:
{
"type": "record",
"name": "TestRecord",
"fields": [
{
"name": "uid",
"type": "int"
}
]
}
Evolve a table by adding metadata:
ALTER TABLE t_metadata_overlap ADD `timestamp` TIMESTAMP_LTZ(3) NOT NULL METADATA;
SHOW CREATE TABLE returns the following output:
CREATE TABLE t_metadata_overlap` (
`key` VARBINARY(2147483647),
`uid` INT NOT NULL,
`timestamp` TIMESTAMP(3) WITH LOCAL TIME ZONE NOT NULL METADATA
) DISTRIBUTED BY HASH(`key`) INTO 6 BUCKETS
WITH (
...
)
- Properties
Schema Registry says there is a timestamp physical column, but Flink says there is timestamp metadata column.
In this case, metadata columns and computed columns have precedence, and Confluent Cloud for Apache Flink removes the physical column from the schema.
Because Confluent Cloud for Apache Flink advertises FULL_TRANSITIVE mode, queries still work, and the physical column is set to NULL in the payload:
INSERT INTO t_metadata_overlap SELECT CAST(NULL AS BYTES), 42, TO_TIMESTAMP_LTZ(0, 3);
Evolve the table by renaming metadata:
ALTER TABLE t_metadata_overlap DROP `timestamp`;
ALTER TABLE t_metadata_overlap
ADD message_timestamp TIMESTAMP_LTZ(3) METADATA FROM 'timestamp';
SELECT * FROM t_metadata_overlap;
SHOW CREATE TABLE returns the following output:
CREATE TABLE `t_metadata_overlap` (
`key` VARBINARY(2147483647),
`uid` INT NOT NULL,
`timestamp` VARCHAR(2147483647),
`message_timestamp` TIMESTAMP(3) WITH LOCAL TIME ZONE METADATA FROM 'timestamp'
) DISTRIBUTED BY HASH(`key`) INTO 6 BUCKETS
WITH (
...
)
- Properties
- Now, both physical and metadata columns appear and can be accessed for reading and writing.
Enrich a column that has no Schema Registry information¶
For the following value schema in Schema Registry:
{
"type": "record",
"name": "TestRecord",
"fields": [
{
"name": "uid",
"type": "int"
}
]
}
SHOW CREATE TABLE returns the following output:
CREATE TABLE `t_enrich_raw_key` (
`key` VARBINARY(2147483647),
`uid` INT NOT NULL
) DISTRIBUTED BY HASH(`key`) INTO 6 BUCKETS
WITH (
'changelog.mode' = 'append',
'connector' = 'confluent',
'key.format' = 'raw',
'value.format' = 'avro-registry'
...
)
- Properties
- Schema Registry provides only information for the value part.
- Because the
key
part is not backed by Schema Registry, thekey.format
israw
. - The default data type of
raw
is BYTES, but you can change this by using the ALTER TABLE statement.
Evolve the table by giving a raw format column a specific type:
ALTER TABLE t_enrich_raw_key MODIFY key STRING;
SHOW CREATE TABLE returns the following output:
CREATE TABLE `t_enrich_raw_key` (
`key` STRING,
`uid` INT NOT NULL
) DISTRIBUTED BY HASH(`key`) INTO 6 BUCKETS
WITH (
'changelog.mode' = 'append',
'connector' = 'confluent',
'key.format' = 'raw',
'value.format' = 'avro-registry'
...
)
- Properties
- Only changes to simple, atomic types, like INT, BYTES, and STRING are supported, where the binary representation is clear.
- For more complex modifications, use Schema Registry.
- In multi-cluster scenarios, the ALTER TABLE statement must be executed for
every cluster, because the data type for
key
is stored in the Flink regional metastore.