ALTER TABLE Statement

Important

Confluent Cloud for Apache Flink®️ is currently available for Preview. A Preview feature is a Confluent Cloud component that is being introduced to gain early feedback from developers. Preview features can be used for evaluation and non-production testing purposes or to provide feedback to Confluent. The warranty, SLA, and Support Services provisions of your agreement with Confluent do not apply to Preview features. Confluent may discontinue providing Preview releases of the Preview features at any time in Confluent’s sole discretion. Check out Getting Help for questions, feedback and requests.

For Flink SQL features and limitations in the preview program, see Notable Limitations in Public Preview.

Confluent Cloud for Apache Flink®️ enables changing some properties of an existing table.

Syntax

-- Modify the default watermark Add a column to the existing table
ALTER TABLE [catalog_name].[db_name].[table_name] MODIFY WATERMARK FOR
column-name1 AS column-name1;

-- Set a property on the table
ALTER TABLE [catalog_name].[db_name].[table_name] SET ('key1'='value1', 'key2'='value2', ...)

-- Add a metadata column
ALTER TABLE [catalog_name].[db_name].[table_name] ADD
(metadata_column_name metadata_column_type METADATA VIRTUAL)

Description

Change or remove the watermark, or change the properties of a table.

ALTER TABLE is used to modify the structure or properties of an existing table.

Modify watermark

In the Confluent CLI, run the following commands to modify the default watermark strategy.

  1. Create a table.

    CREATE TABLE orders (
      `user` BIGINT NOT NULL,
      product STRING,
      amount INT,
      ts TIMESTAMP(3),
      PRIMARY KEY(`user`) NOT ENFORCED
    );
    

    Your output should resemble:

    [INFO] Execute statement succeed.
    
  2. View the current table schema and metadata.

    DESCRIBE `orders`;
    

    Your output should resemble:

    +-------------+--------------+----------+-------------+
    | Column Name |  Data Type   | Nullable |   Extras    |
    +-------------+--------------+----------+-------------+
    | user        | BIGINT       | NOT NULL | PRIMARY KEY |
    | product     | STRING       | NULL     |             |
    | amount      | INT          | NULL     |             |
    | ts          | TIMESTAMP(3) | NULL     |             |
    +-------------+--------------+----------+-------------+
    
  3. Change the watermark strategy of the table.

    ALTER TABLE `orders` ADD WATERMARK FOR `ts` AS `ts`;
    

    Your output should resemble:

    Statement phase is COMPLETED.
    
  4. Check the new table schema and metadata.

    DESCRIBE `orders`;
    

    Your output should resemble:

    +-------------+------------------------+----------+-------------------+
    | Column Name |       Data Type        | Nullable |      Extras       |
    +-------------+------------------------+----------+-------------------+
    | user        | BIGINT                 | NOT NULL | PRIMARY KEY       |
    | product     | STRING                 | NULL     |                   |
    | amount      | INT                    | NULL     |                   |
    | ts          | TIMESTAMP(3) *ROWTIME* | NULL     | WATERMARK AS `ts` |
    +-------------+------------------------+----------+-------------------+
    

Drop watermark

In the Confluent CLI, run the following commands to remove your custom watermark. This will restore the default watermark strategy.

  1. View the current table schema and metadata.

    DESCRIBE `orders`;
    

    Your output should resemble:

    +-------------+------------------------+----------+-------------------+
    | Column Name |       Data Type        | Nullable |      Extras       |
    +-------------+------------------------+----------+-------------------+
    | user        | BIGINT                 | NOT NULL | PRIMARY KEY       |
    | product     | STRING                 | NULL     |                   |
    | amount      | INT                    | NULL     |                   |
    | ts          | TIMESTAMP(3) *ROWTIME* | NULL     | WATERMARK AS `ts` |
    +-------------+------------------------+----------+-------------------+
    
  2. Remove the watermark strategy of the table.

    ALTER TABLE `orders` DROP WATERMARK;
    

    Your output should resemble:

    Statement phase is COMPLETED.
    
  3. Check the new table schema and metadata.

    DESCRIBE `orders`;
    

    Your output should resemble:

    +-------------+--------------+----------+-------------+
    | Column Name |  Data Type   | Nullable |   Extras    |
    +-------------+--------------+----------+-------------+
    | user        | BIGINT       | NOT NULL | PRIMARY KEY |
    | product     | STRING       | NULL     |             |
    | amount      | INT          | NULL     |             |
    | ts          | TIMESTAMP(3) | NULL     |             |
    +-------------+--------------+----------+-------------+
    

Set properties

Set one or more properties in the specified table. If a property has been set previously, it overrides the previous value with the new value.

In the Confluent CLI, run the following commands to set properties for your table.

  1. View the current table properties.

    SHOW CREATE TABLE `orders`;
    

    Your output should resemble:

    +----------------------------------------------------------------------+
    |                          SHOW CREATE TABLE                           |
    +----------------------------------------------------------------------+
    | CREATE TABLE `catalog`.`database`.`orders` (                         |
    |   `user` BIGINT NOT NULL,                                            |
    |   `product` VARCHAR(2147483647),                                     |
    |   `amount` INT,                                                      |
    |   `ts` TIMESTAMP(3),                                                 |
    |   CONSTRAINT `PK_3599338` PRIMARY KEY (`user`) NOT ENFORCED          |
    | ) WITH (                                                             |
    |   'changelog.mode' = 'upsert',                                       |
    |   'connector' = 'confluent',                                         |
    |   'kafka.cleanup-policy' = 'delete',                                 |
    |   'kafka.max-message-size' = '2097164 bytes',                        |
    |   'kafka.partitions' = '6',                                          |
    |   'kafka.retention.size' = '0 bytes',                                |
    |   'kafka.retention.time' = '604800000 ms',                           |
    |   'key.format' = 'avro-registry',                                    |
    |   'scan.bounded.mode' = 'unbounded',                                 |
    |   'scan.startup.mode' = 'earliest-offset',                           |
    |   'value.format' = 'avro-registry'                                   |
    | )                                                                    |
    |                                                                      |
    +----------------------------------------------------------------------+
    
  2. Set the startup mode to “latest-offset”

    ALTER TABLE `orders` SET ('scan.startup.mode' = 'latest-offset');
    

    Your output should resemble:

    Statement phase is COMPLETED.
    
  3. Check the new table properties.

    SHOW CREATE TABLE `orders`;
    

    Your output should resemble:

    +----------------------------------------------------------------------+
    |                          SHOW CREATE TABLE                           |
    +----------------------------------------------------------------------+
    | CREATE TABLE `catalog`.`database`.`orders` (                         |
    |   `user` BIGINT NOT NULL,                                            |
    |   `product` VARCHAR(2147483647),                                     |
    |   `amount` INT,                                                      |
    |   `ts` TIMESTAMP(3),                                                 |
    |   CONSTRAINT `PK_3599338` PRIMARY KEY (`user`) NOT ENFORCED          |
    | ) WITH (                                                             |
    |   'changelog.mode' = 'upsert',                                       |
    |   'connector' = 'confluent',                                         |
    |   'kafka.cleanup-policy' = 'delete',                                 |
    |   'kafka.max-message-size' = '2097164 bytes',                        |
    |   'kafka.partitions' = '6',                                          |
    |   'kafka.retention.size' = '0 bytes',                                |
    |   'kafka.retention.time' = '604800000 ms',                           |
    |   'key.format' = 'avro-registry',                                    |
    |   'scan.bounded.mode' = 'unbounded',                                 |
    |   'scan.startup.mode' = 'latest-offset',                             |
    |   'value.format' = 'avro-registry'                                   |
    | )                                                                    |
    |                                                                      |
    +----------------------------------------------------------------------+
    

Add Metadata Column

You can use ALTER TABLE to add additional meta columns to your table schema. See METADATA columns for a full list of supported metadata columns for Kafka.

  1. View the current schema.

    DESCRIBE TABLE  `orders`;
    

    Your output should resemble:

    +-------------+--------------+----------+-------------+
    | Column Name |  Data Type   | Nullable |   Extras    |
    +-------------+--------------+----------+-------------+
    | user        | BIGINT       | NOT NULL | PRIMARY KEY |
    | product     | STRING       | NULL     |             |
    | amount      | INT          | NULL     |             |
    | ts          | TIMESTAMP(3) | NULL     |             |
    +-------------+--------------+----------+-------------+
    
  2. Add the Kafka partition as a metadata column

    ALTER TABLE `orders` ADD (
       `partition` BIGINT METADATA VIRTUAL
    );
    
  3. View the new schema.

    DESCRIBE TABLE  `orders`;
    

    Your output should resemble:

    +-------------+--------------+----------+------------------+
    | Column Name |  Data Type   | Nullable |      Extras      |
    +-------------+--------------+----------+------------------+
    | user        | BIGINT       | NOT NULL | PRIMARY KEY      |
    | product     | STRING       | NULL     |                  |
    | amount      | INT          | NULL     |                  |
    | ts          | TIMESTAMP(3) | NULL     |                  |
    | partition   | BIGINT       | NULL     | METADATA VIRTUAL |
    +-------------+--------------+----------+------------------+