ALTER TABLE Statement

Confluent Cloud for Apache Flink®️ enables changing some properties of an existing table.

Syntax

-- Modify the default watermark Add a column to the existing table
ALTER TABLE [catalog_name].[db_name].[table_name] MODIFY WATERMARK FOR
column-name1 AS column-name1;

-- Set a property on the table
ALTER TABLE [catalog_name].[db_name].[table_name] SET ('key1'='value1', 'key2'='value2', ...)

-- Add a metadata column
ALTER TABLE [catalog_name].[db_name].[table_name] ADD
(metadata_column_name metadata_column_type METADATA VIRTUAL)

Description

Change or remove the watermark, or change the properties of a table.

ALTER TABLE is used to modify the structure or properties of an existing table.

Modify watermark

In the Confluent CLI, run the following commands to modify the default watermark strategy.

  1. Create a table.

    CREATE TABLE orders (
      `user` BIGINT NOT NULL,
      product STRING,
      amount INT,
      ts TIMESTAMP(3),
      PRIMARY KEY(`user`) NOT ENFORCED
    );
    

    Your output should resemble:

    [INFO] Execute statement succeed.
    
  2. View the current table schema and metadata.

    DESCRIBE `orders`;
    

    Your output should resemble:

    +-------------+--------------+----------+-------------+
    | Column Name |  Data Type   | Nullable |   Extras    |
    +-------------+--------------+----------+-------------+
    | user        | BIGINT       | NOT NULL | PRIMARY KEY |
    | product     | STRING       | NULL     |             |
    | amount      | INT          | NULL     |             |
    | ts          | TIMESTAMP(3) | NULL     |             |
    +-------------+--------------+----------+-------------+
    
  3. Change the watermark strategy of the table.

    ALTER TABLE `orders` MODIFY WATERMARK FOR `ts` AS `ts`;
    

    Your output should resemble:

    Statement phase is COMPLETED.
    
  4. Check the new table schema and metadata.

    DESCRIBE `orders`;
    

    Your output should resemble:

    +-------------+------------------------+----------+-------------------+
    | Column Name |       Data Type        | Nullable |      Extras       |
    +-------------+------------------------+----------+-------------------+
    | user        | BIGINT                 | NOT NULL | PRIMARY KEY       |
    | product     | STRING                 | NULL     |                   |
    | amount      | INT                    | NULL     |                   |
    | ts          | TIMESTAMP(3) *ROWTIME* | NULL     | WATERMARK AS `ts` |
    +-------------+------------------------+----------+-------------------+
    

Drop watermark

In the Confluent CLI, run the following commands to remove your custom watermark. This will restore the default watermark strategy.

  1. View the current table schema and metadata.

    DESCRIBE `orders`;
    

    Your output should resemble:

    +-------------+------------------------+----------+-------------------+
    | Column Name |       Data Type        | Nullable |      Extras       |
    +-------------+------------------------+----------+-------------------+
    | user        | BIGINT                 | NOT NULL | PRIMARY KEY       |
    | product     | STRING                 | NULL     |                   |
    | amount      | INT                    | NULL     |                   |
    | ts          | TIMESTAMP(3) *ROWTIME* | NULL     | WATERMARK AS `ts` |
    +-------------+------------------------+----------+-------------------+
    
  2. Remove the watermark strategy of the table.

    ALTER TABLE `orders` DROP WATERMARK;
    

    Your output should resemble:

    Statement phase is COMPLETED.
    
  3. Check the new table schema and metadata.

    DESCRIBE `orders`;
    

    Your output should resemble:

    +-------------+--------------+----------+-------------+
    | Column Name |  Data Type   | Nullable |   Extras    |
    +-------------+--------------+----------+-------------+
    | user        | BIGINT       | NOT NULL | PRIMARY KEY |
    | product     | STRING       | NULL     |             |
    | amount      | INT          | NULL     |             |
    | ts          | TIMESTAMP(3) | NULL     |             |
    +-------------+--------------+----------+-------------+
    

Set properties

Set one or more properties in the specified table. If a property has been set previously, it overrides the previous value with the new value.

In the Confluent CLI, run the following commands to set properties for your table.

  1. View the current table properties.

    SHOW CREATE TABLE `orders`;
    

    Your output should resemble:

    +----------------------------------------------------------------------+
    |                          SHOW CREATE TABLE                           |
    +----------------------------------------------------------------------+
    | CREATE TABLE `catalog`.`database`.`orders` (                         |
    |   `user` BIGINT NOT NULL,                                            |
    |   `product` VARCHAR(2147483647),                                     |
    |   `amount` INT,                                                      |
    |   `ts` TIMESTAMP(3),                                                 |
    |   CONSTRAINT `PK_3599338` PRIMARY KEY (`user`) NOT ENFORCED          |
    | ) WITH (                                                             |
    |   'changelog.mode' = 'upsert',                                       |
    |   'connector' = 'confluent',                                         |
    |   'kafka.cleanup-policy' = 'delete',                                 |
    |   'kafka.max-message-size' = '2097164 bytes',                        |
    |   'kafka.partitions' = '6',                                          |
    |   'kafka.retention.size' = '0 bytes',                                |
    |   'kafka.retention.time' = '604800000 ms',                           |
    |   'key.format' = 'avro-registry',                                    |
    |   'scan.bounded.mode' = 'unbounded',                                 |
    |   'scan.startup.mode' = 'earliest-offset',                           |
    |   'value.format' = 'avro-registry'                                   |
    | )                                                                    |
    |                                                                      |
    +----------------------------------------------------------------------+
    
  2. Set the startup mode to “latest-offset”

    ALTER TABLE `orders` SET ('scan.startup.mode' = 'latest-offset');
    

    Your output should resemble:

    Statement phase is COMPLETED.
    
  3. Check the new table properties.

    SHOW CREATE TABLE `orders`;
    

    Your output should resemble:

    +----------------------------------------------------------------------+
    |                          SHOW CREATE TABLE                           |
    +----------------------------------------------------------------------+
    | CREATE TABLE `catalog`.`database`.`orders` (                         |
    |   `user` BIGINT NOT NULL,                                            |
    |   `product` VARCHAR(2147483647),                                     |
    |   `amount` INT,                                                      |
    |   `ts` TIMESTAMP(3),                                                 |
    |   CONSTRAINT `PK_3599338` PRIMARY KEY (`user`) NOT ENFORCED          |
    | ) WITH (                                                             |
    |   'changelog.mode' = 'upsert',                                       |
    |   'connector' = 'confluent',                                         |
    |   'kafka.cleanup-policy' = 'delete',                                 |
    |   'kafka.max-message-size' = '2097164 bytes',                        |
    |   'kafka.partitions' = '6',                                          |
    |   'kafka.retention.size' = '0 bytes',                                |
    |   'kafka.retention.time' = '604800000 ms',                           |
    |   'key.format' = 'avro-registry',                                    |
    |   'scan.bounded.mode' = 'unbounded',                                 |
    |   'scan.startup.mode' = 'latest-offset',                             |
    |   'value.format' = 'avro-registry'                                   |
    | )                                                                    |
    |                                                                      |
    +----------------------------------------------------------------------+
    

Add metadata column

You can use ALTER TABLE to add additional metadata columns to your table schema. See Metadata columns for a full list of supported metadata columns for Apache Kafka®.

  1. View the current schema.

    DESCRIBE TABLE  `orders`;
    

    Your output should resemble:

    +-------------+--------------+----------+-------------+
    | Column Name |  Data Type   | Nullable |   Extras    |
    +-------------+--------------+----------+-------------+
    | user        | BIGINT       | NOT NULL | PRIMARY KEY |
    | product     | STRING       | NULL     |             |
    | amount      | INT          | NULL     |             |
    | ts          | TIMESTAMP(3) | NULL     |             |
    +-------------+--------------+----------+-------------+
    
  2. Run the following statement to add the Kafka partition as a metadata column:

    ALTER TABLE `orders` ADD (
       `partition` BIGINT METADATA VIRTUAL);
    
  3. View the new schema.

    DESCRIBE TABLE `orders`;
    

    Your output should resemble:

    +-------------+--------------+----------+------------------+
    | Column Name |  Data Type   | Nullable |      Extras      |
    +-------------+--------------+----------+------------------+
    | user        | BIGINT       | NOT NULL | PRIMARY KEY      |
    | product     | STRING       | NULL     |                  |
    | amount      | INT          | NULL     |                  |
    | ts          | TIMESTAMP(3) | NULL     |                  |
    | partition   | BIGINT       | NULL     | METADATA VIRTUAL |
    +-------------+--------------+----------+------------------+
    

Add headers as a metadata column

You can get the headers of a Kafka record as a map of raw bytes by adding a headers virtual metadata column.

  1. Run the following statement to add the Kafka partition as a metadata column:

    ALTER TABLE `orders` ADD (
      `headers` MAP<STRING,STRING> METADATA VIRTUAL);
    
  2. View the new schema.

    DESCRIBE TABLE `orders`;
    

    Your output should resemble:

    +-------------+---------------------+----------+-------------------------+
    | Column Name |      Data Type      | Nullable |         Extras          |
    +-------------+---------------------+----------+-------------------------+
    | user        | BIGINT              | NOT NULL | PRIMARY KEY, BUCKET KEY |
    | product     | STRING              | NULL     |                         |
    | amount      | INT                 | NULL     |                         |
    | ts          | TIMESTAMP(3)        | NULL     |                         |
    | headers     | MAP<STRING, STRING> | NULL     | METADATA VIRTUAL        |
    +-------------+---------------------+----------+-------------------------+