CREATE TABLE Statement

Important

Confluent Cloud for Apache Flink®️ is currently available for Preview. A Preview feature is a Confluent Cloud component that is being introduced to gain early feedback from developers. Preview features can be used for evaluation and non-production testing purposes or to provide feedback to Confluent. The warranty, SLA, and Support Services provisions of your agreement with Confluent do not apply to Preview features. Confluent may discontinue providing Preview releases of the Preview features at any time in Confluent’s sole discretion. Check out Getting Help for questions, feedback and requests.

For SQL features and limitations in the preview program, see Notable Limitations in Public Preview.

Confluent Cloud for Apache Flink®️ enables registering SQL tables on Apache Kafka® topics.

Syntax

CREATE TABLE [IF NOT EXISTS] [catalog_name.][db_name.]table_name
  (
    { <physical_column_definition> | <metadata_column_definition> | <computed_column_definition> }[ , ...n]
    [ <watermark_definition> ]
    [ <table_constraint> ][ , ...n]
  )
  [COMMENT table_comment]
  [PARTITIONED BY (partition_column_name1, partition_column_name2, ...)]
  WITH (key1=value1, key2=value2, ...)

<physical_column_definition>:
  column_name column_type [ <column_constraint> ] [COMMENT column_comment]

<column_constraint>:
  [CONSTRAINT constraint_name] PRIMARY KEY NOT ENFORCED

<table_constraint>:
  [CONSTRAINT constraint_name] PRIMARY KEY (column_name, ...) NOT ENFORCED

<metadata_column_definition>:
  column_name column_type METADATA [ FROM metadata_key ] [ VIRTUAL ]

<watermark_definition>:
  WATERMARK FOR rowtime_column_name AS watermark_strategy_expression

Description

Register a table into the current or specified catalog. When a table is registered, you can use it in SQL queries.

The CREATE TABLE statement always creates a backing Kafka topic as well as the corresponding schema subjects for key and value.

Trying to create a table with a name that exists in the catalog causes an exception.

The table name can be in these formats:

  1. catalog_name.db_name.table_name: The table is registered into the metastore with the catalog named “catalog_name” and the database named “db_name”.
  2. db_name.table_name: The table is registered into the current catalog of the execution table environment and the database named “db_name”.
  3. table_name: The table is registered into the current catalog and the database of the execution table environment.

A table registered with the CREATE TABLE statement can be used as both table source and table sink. Apache Flink® can’t determine whether the table is used as a source or a sink until it’s referenced in a DML query.

The following sections show the options and clauses that are available with the CREATE TABLE statement.

System columns

Confluent Cloud introduces the concept of system columns for Flink tables. System columns build on the METADATA VIRTUAL columns.

METADATA VIRTUAL columns can only be read and are not part of the query-to-sink schema.

System columns aren’t selected in a SELECT * statement, and they’re not shown in DESCRIBE or SHOW CREATE TABLE statements. The result from the DESCRIBE EXTENDED statement does include system columns.

Both inferred and manual tables are provisioned with a set of default system columns.

$rowtime

Currently, only $rowtime TIMESTAMP_LTZ(3) NOT NULL is provided as a system column.

You can use the $rowtime system column to get the timestamp from a Kafka record, because $rowtime is exactly the Kafka record timestamp. If you want to write out $rowtime, you must use the timestamp metadata key.

METADATA columns

You can access the following table metadata as metadata columns in a table definition.

Metadata fields are readable or readable/writable. Read-only columns must be declared VIRTUAL to exclude them during INSERT INTO operations.

Example

The following CREATE TABLE example shows the syntax for exposing metadata fields.

CREATE TABLE t (
  `event_time` TIMESTAMP_LTZ(3) METADATA FROM 'timestamp',
  `partition` BIGINT METADATA VIRTUAL,
  `offset` BIGINT METADATA VIRTUAL,
  `user_id` BIGINT,
  `item_id` BIGINT,
  `behavior` STRING
);

headers

  • Type: MAP NOT NULL
  • Access: readable/writable

Headers of the Kafka record as a map of raw bytes.

leader-epoch

  • Type: INT NULL
  • Access: readable

Leader epoch of the Kafka record, if available.

offset

  • Type: BIGINT NOT NULL
  • Access: readable

Offset of the Kafka record in the partition.

partition

  • Type: INT NOT NULL
  • Access: readable

Partition ID of the Kafka record.

timestamp

  • Type: TIMESTAMP_LTZ(3) NOT NULL
  • Access: readable/writable

Timestamp of the Kafka record.

With timestamp, you can pass event time end-to-end. Otherwise, the sink uses the ingestion time by default.

timestamp-type

  • Type: STRING NOT NULL
  • Access: readable

Timestamp type of the Kafka record.

Valid values are:

  • “NoTimestampType”
  • “CreateTime” (also set when writing metadata)
  • “LogAppendTime”

topic

  • Type: STRING NOT NULL
  • Access: readable

Topic name of the Kafka record.

WATERMARK clause

The WATERMARK clause defines the event-time attributes of a table.

WATERMARK FOR rowtime_column_name AS watermark_strategy_expression

By default, every table has a watermark strategy applied.

The rowtime_column_name defines an existing column that is marked as the event-time attribute of the table. The column must be of type TIMESTAMP(3), and it must be a top-level column in the schema.

The watermark_strategy_expression defines the watermark generation strategy. It allows arbitrary non-query expression, including computed columns, to calculate the watermark. The expression return type must be TIMESTAMP(3), which represents the timestamp since the Unix Epoch.

The returned watermark is emitted only if it’s non-null and its value is larger than the previously emitted local watermark, to respect the contract of ascending watermarks.

The watermark generation expression is evaluated by SQL for every record. The framework emits the largest generated watermark periodically.

No new watermark is emitted if any of the following conditions apply.

  • The current watermark is null.
  • The current watermark is identical to the previous watermark.
  • The value of the returned watermark is smaller than the value of the last emitted watermark.

A watermark is emitted in an interval defined by the pipeline.auto-watermark-interval configuration setting. If the watermark interval is 0ms, the generated watermarks are emitted per-record, if it’s not null and its value is greater than the value of the last emitted watermark.

When you use event-time semantics, your tables must contain an event-time attribute and watermarking strategy.

SQL provides these watermark strategies.

  • Strictly ascending timestamps: Emit a watermark of the maximum observed timestamp so far. Rows that have a timestamp larger than the max timestamp are not late.

    WATERMARK FOR rowtime_column AS rowtime_column
    
  • Ascending timestamps: Emit a watermark of the maximum observed timestamp so far, minus 1. Rows that have a timestamp larger than or equal to the max timestamp are not late.

    WATERMARK FOR rowtime_column AS rowtime_column - INTERVAL '0.001' SECOND
    
  • Bounded out-of-orderness timestamps: Emit watermarks which are the maximum observed timestamp minus the specified delay.

    WATERMARK FOR rowtime_column AS rowtime_column - INTERVAL 'string' timeUnit
    

    The following example shows a “5-seconds delayed” watermark strategy.

    WATERMARK FOR rowtime_column AS rowtime_column - INTERVAL '5' SECOND
    

The following example shows a CREATE TABLE statement that defines an orders table that has a rowtime column named order_time with a watermark strategy with a 5-second delay.

CREATE TABLE orders (
    `user` BIGINT,
    product STRING,
    order_time TIMESTAMP(3),
    WATERMARK FOR order_time AS order_time - INTERVAL '5' SECOND
);

Default watermark strategy

Confluent Cloud provides a default watermark strategy for all tables, whether created automatically from a Kafka topic or from a CREATE TABLE statement.

The default watermark strategy is applied on the $rowtime system column. Watermarks are calculated per Kafka partition.

The default watermark strategy emits watermarks with an interval between the minimum of 50 milliseconds and the maximum of 7 days, based on a moving histogram of observed delays from the maximum seen timestamp.

Because the concrete implementation is provided by Confluent, you see only WATERMARK FOR $rowtime AS SOURCE_WATERMARK() in the declaration.

For more information, see SupportsSourceWatermark.java.

Custom watermark strategies

You can replace the default strategy with a custom strategy at any time via ALTER TABLE.

PRIMARY KEY constraint

A primary key constraint is a hint for SQL to leverage for optimizations which specifies that a column or a set of columns in a table or a view are unique and they do not contain null. No columns in a primary key can be nullable. A primary key uniquely identifies a row in a table.

You can declare a primary key constraint together with a column definition (a column constraint) or as a single line (a table constraint). For both cases, it must be declared as a singleton. If you define more than one primary key constraint in the same statement, SQL throws an exception.

The SQL standard specifies that a constraint can be ENFORCED or NOT ENFORCED, which controls whether the constraint checks are performed on the incoming/outgoing data. SQL doesn’t own the data, so the only mode it supports is NOT ENFORCED. It’s your responsibility to ensure that the query enforces key integrity.

SQL assumes correctness of the primary key by assuming that the column’s nullability is aligned with the columns in primary key. Connectors must ensure that these are aligned.

The PRIMARY KEY constraint partitions the table implicitly by the key column. A Kafka message key is defined either by an implicit PARTITIONED BY clause clause from a PRIMARY KEY constraint or an explicit PARTITIONED BY.

Note

In a CREATE TABLE statement, a primary key constraint alters the column’s nullability, which means that a column with a primary key constraint isn’t nullable.

PARTITIONED BY clause

The PARTITIONED BY clause partitions the created table by the specified columns.

Use PARTITIONED BY to declare key columns in a table explicitly. A Kafka message key is defined either by an explicit PARTITIONED BY clause or an implicit PARTITIONED BY clause from a PRIMARY KEY constraint.

The following SQL declares a table named t that has one key column named key of type INT.

CREATE TABLE t (key INT, value STRING) PARTITIONED BY (key);

If compaction is enabled, the Kafka message key is overloaded with another semantic used for compaction, which influences constraints on the Kafka message key for partitioning.

Important

Open source Flink uses the key.fields option in the WITH clause to define the message key. For the Preview release, SQL in Confluent Cloud doesn’t support this option.

WITH options

Table properties used to create a table source or sink.

Both the key and value of the expression key1=val1 are string literals.

You can change an existing table’s property values by using the ALTER TABLE Statement.

You can set the following properties when you create a table.

changelog.mode kafka.cleanup-policy kafka.max-message-size
kafka.partitions kafka.retention.size kafka.retention.time
key.fields-prefix key.format scan.bounded.mode
value.format scan.bounded.timestamp-millis scan.startup.mode
scan.startup.timestamp-millis value.fields-include  

changelog.mode

Set the changelog mode of the connector. For a deep dive into changelog modes, see dynamic tables.

changelog.mode = [append | upsert | retract]

These are the changelog modes for an inferred table:

  • append (if uncompacted)
  • upsert (if compacted)

These are the changelog modes for a manually created table:

  • retract (without primary key)
  • upsert (with primary key and/or compaction)

Primary key interaction

With a primary key declared, the changelog modes have these properties:

  • append means that every row can be treated as an independent fact.
  • retract means that the combination of +X and -X are related and must be partitioned together.
  • upsert means that all rows with same primary key are related and must be partitioned together

To build indices, primary keys must be partitioned together.

Encoding of changes Default Partitioning without PK Default Partitioning with PK Custom Partitioning without PK Custom Partitioning with PK
Each value is an insertion (+I). round robin hash by PK hash by specified column(s) hash by subset of PK
A special op header represents the change (+I, -U, +U, -D). The header is omitted for insertions. Append queries encoding is the same for all modes. hash by entire value hash by PK hash by specified column(s) hash by subset of PK
If value is null, it represents a deletion (-D). Other values are +U and the engine will normalize the changelog internally. unsupported, PK is mandatory hash by PK unsupported, PK is mandatory if uncompacted, hash by subset of PK; if compacted, unsupported

kafka.cleanup-policy

kafka.cleanup-policy = [delete | compact | delete-compact]

Translates to the Kafka log.cleanup.policy property.

The default is delete.

kafka.max-message-size

kafka.max-message-size = MemorySize

Translates to the Kafka max.message.bytes property.

The default is 2097164 bytes.

kafka.partitions

kafka.partitions = Integer

Translates to the Kafka num.partitions property.

The default is 6.

kafka.retention.size

kafka.retention.size = MemorySize

Translates to the Kafka log.retention.bytes property.

The default is 0 bytes.

kafka.retention.time

kafka.retention.time = Duration

Translates to the Kafka log.retention.ms property.

The default is 7 days.

key.fields-prefix

Specify a custom prefix for all fields of the key format.

key.fields-prefix = String

The key.fields-prefix property defines a custom prefix for all fields of the key format, which avoids name clashes with fields of the value format.

By default, the prefix is empty. If a custom prefix is defined, the table schema property works with prefixed names.

When constructing the data type of the key format, the prefix is removed, and the non-prefixed names are used within the key format.

This option requires that the value.fields-include property is set to EXCEPT_KEY.

The prefix for an inferred table is key_, for non-atomic Schema Registry types and fields that have a name.

key.format

Specify the serialization format of the table’s key fields.

key.format = String

These are the key formats for an inferred table:

  • raw (if no Schema Registry entry)
  • avro-registry (for AVRO Schema Registry entry)
  • json-registry (for JSON Schema Registry entry)
  • proto-registry (for Protobuf Schema Registry entry)

These are the key formats for a manually created table:

  • avro-registry (for Avro Schema Registry entry)
  • json-registry (for JSON Schema Registry entry)
  • proto-registry (for Protobuf Schema Registry entry)

If no format is specified, Avro Schema Registry is used by default. This is only applicable if primary or partition key is defined.

scan.bounded.mode

Specify the bounded mode for the Kafka consumer.

scan.bounded.mode = [latest-offset | group-offsets | timestamp | unbounded]

The following list shows the valid bounded mode values.

  • group-offsets: bounded by committed offsets in Kafka brokers of a specific consumer group. This is evaluated at the start of consumption from a given partition.
  • latest-offset: bounded by latest offsets. This is evaluated at the start of consumption from a given partition.
  • timestamp: bounded by a user-supplied timestamp.

If scan.bounded.mode isn’t set, the default is an unbounded table.

If timestamp is specified, the scan.bounded.timestamp-millis config option is required to specify a specific bounded timestamp in milliseconds since the Unix epoch, January 1, 1970 00:00:00.000 GMT.

scan.bounded.timestamp-millis

End at the specified epoch timestamp (milliseconds) when the timestamp bounded mode is set in the scan.bounded.mode property.

scan.bounded.timestamp-millis = Long

scan.startup.mode

The startup mode for Kafka consumers.

scan.startup.mode = [earliest-offset | latest-offset | group-offsets | timestamp]

The following list shows the valid startup mode values.

  • group-offsets: start from committed offsets in Kafka brokers of a specific consumer group.
  • earliest-offset: start from the earliest offset possible.
  • latest-offset: start from the latest offset.
  • timestamp: start from user-supplied timestamp for each partition.

The default is earliest-offset, which differs from the default in open-source Flink, which is group-offsets.

If timestamp is specified, the option scan.startup.timestamp-millis config option is required, to define a specific startup timestamp in milliseconds since the Unix epoch, January 1, 1970 00:00:00.000 GMT.

scan.startup.timestamp-millis

Start from the specified Unix epoch timestamp (milliseconds) when the timestamp is set in the scan.startup.mode property.

scan.startup.timestamp-millis = Long

value.fields-include

Specify a strategy for handling key columns in the data type of the value format.

value.fields-include = [all, except-key]

The default is except-key.

If all is specified, all physical columns of the table schema are included in the value format, which means that key columns appear in the data type for both the key and value format.

value.format

Specify the format for serializing and deserializing the value part of Kafka messages.

value.format = String

These are the value formats for an inferred table:

  • raw (if no Schema Registry entry)
  • avro-registry (for Avro Schema Registry entry)
  • json-registry (for JSON Schema Registry entry)
  • proto-registry (for Protobuf Schema Registry entry)

These are the value formats for a manually created table:

  • avro-registry (for Avro Schema Registry entry)
  • json-registry (for JSON Schema Registry entry)
  • proto-registry (for Protobuf Schema Registry entry)

If no format is specified, Avro Schema Registry is used by default.

Examples

In the SQL shell or in a Cloud Console workspace, run the following commands to see an example of the CREATE TABLE statement.

  1. Create a table.

    -- Create a users table.
    CREATE TABLE users (
      user_id STRING,
      registertime BIGINT,
      gender STRING,
      regionid STRING
    );
    
    -- Populate the table with mock users data.
    INSERT INTO users VALUES
      ('Thomas A. Anderson', 1677260724, 'male', 'Region_4'),
      ('Trinity', 1677260733, 'female', 'Region_4'),
      ('Morpheus', 1677260742, 'male', 'Region_8'),
      ('Dozer', 1677260823, 'male', 'Region_1'),
      ('Agent Smith', 1677260955, 'male', 'Region_0'),
      ('Persephone', 1677260901, 'female', 'Region_2'),
      ('Niobe', 1677260921, 'female', 'Region_3'),
      ('Zee', 1677260922, 'female', 'Region_5');
    

    Press ENTER to return to the SQL shell. Because INSERT INTO VALUES is a point-in-time statement, it exits after it completes inserting records.