CREATE TABLE Statement¶
Confluent Cloud for Apache Flink®️ enables registering SQL tables on Apache Kafka® topics by using the CREATE TABLE statement. With Flink tables, you can run SQL queries on streaming data in Kafka topics.
Syntax¶
CREATE TABLE [IF NOT EXISTS] [catalog_name.][db_name.]table_name
(
{ <physical_column_definition> | <metadata_column_definition> | <computed_column_definition> }[ , ...n]
[ <watermark_definition> ]
[ <table_constraint> ][ , ...n]
)
[COMMENT table_comment]
[DISTRIBUTED BY (partition_column_name1, partition_column_name2, ...) INTO n BUCKETS]
WITH (key1=value1, key2=value2, ...)
[LIKE source_table ( <like_options> )]
<physical_column_definition>:
column_name column_type [ <column_constraint> ] [COMMENT column_comment]
<metadata_column_definition>:
column_name column_type METADATA [ FROM metadata_key ] [ VIRTUAL ]
<computed_column_definition>:
column_name AS computed_column_expression [COMMENT column_comment]
<watermark_definition>:
WATERMARK FOR rowtime_column_name AS watermark_strategy_expression
<table_constraint>:
[CONSTRAINT constraint_name] PRIMARY KEY (column_name, ...) NOT ENFORCED
<like_options>:
{
{ INCLUDING | EXCLUDING } { ALL | CONSTRAINTS | PARTITIONS } |
{ INCLUDING | EXCLUDING | OVERWRITING } { GENERATED | OPTIONS | WATERMARKS }
}
Description¶
Register a table into the current or specified catalog. When a table is registered, you can use it in SQL queries.
The CREATE TABLE statement always creates a backing Kafka topic as well as the corresponding schema subjects for key and value.
Trying to create a table with a name that exists in the catalog causes an exception.
The table name can be in these formats:
catalog_name.db_name.table_name
: The table is registered with the catalog named “catalog_name” and the database named “db_name”.db_name.table_name
: The table is registered into the current catalog of the execution table environment and the database named “db_name”.table_name
: The table is registered into the current catalog and the database of the execution table environment.
A table registered with the CREATE TABLE statement can be used as both table source and table sink. Flink can’t determine whether the table is used as a source or a sink until it’s referenced in a DML query.
The following sections show the options and clauses that are available with the CREATE TABLE statement.
- Physical / Regular Columns
- Metadata columns
- Computed columns
- System columns
- Watermark clause
- PRIMARY KEY constraint
- DISTRIBUTED BY clause
- LIKE
- WITH options
For examples of creating tables with various combinations of properties, see CREATE TABLE examples.
Example¶
This following CREATE TABLE statement registers a table named t1
in the
current catalog. Also, it creates a backing Kafka topic and corresponding
value-schema. By default, the table is registered as append-only, uses AVRO
serializers, and reads from the earliest offset.
CREATE TABLE t1 (
`id` BIGINT,
`name` STRING,
`age` INT,
`salary` DECIMAL(10,2),
`active` BOOLEAN,
`created_at` TIMESTAMP_LTZ(3)
);
You can override defaults by specifying WITH options. The following SQL registers the table in retraction mode, so you can use the table to sink the results of a streaming join.
CREATE TABLE t2 (
`id` BIGINT,
`name` STRING,
`age` INT,
`salary` DECIMAL(10,2),
`active` BOOLEAN,
`created_at` TIMESTAMP_LTZ(3)
) WITH (
'changelog.mode' = 'retract'
);
Physical / Regular Columns¶
Physical or regular columns are the columns that define the structure of the table and the data types of its fields.
Each physical column is defined by a name and a data type, and optionally, a column constraint. You can use the column constraint to specify additional properties of the column, such as whether it is a unique key.
Example¶
The following SQL shows how to declare physical columns of various types in a
table named t1
. For available column types, see Data Types.
CREATE TABLE t1 (
`id` BIGINT,
`name` STRING,
`age` INT,
`salary` DECIMAL(10,2),
`active` BOOLEAN,
`created_at` TIMESTAMP_LTZ(3)
);
Metadata columns¶
You can access the following table metadata as metadata columns in a table definition.
Use the METADATA keyword to declare a metadata column.
Metadata fields are readable or readable/writable. Read-only columns must be declared VIRTUAL to exclude them during INSERT INTO operations.
Metadata columns are not registered in Schema Registry.
Example¶
The following CREATE TABLE statement shows the syntax for exposing metadata fields.
CREATE TABLE t (
`user_id` BIGINT,
`item_id` BIGINT,
`behavior` STRING,
`event_time` TIMESTAMP_LTZ(3) METADATA FROM 'timestamp',
`partition` BIGINT METADATA VIRTUAL,
`offset` BIGINT METADATA VIRTUAL
);
Available metadata¶
headers¶
- Type: MAP NOT NULL
- Access: readable/writable
Headers of the Kafka record as a map of raw bytes.
timestamp¶
- Type: TIMESTAMP_LTZ(3) NOT NULL
- Access: readable/writable
Timestamp of the Kafka record.
With timestamp
, you can pass
event time end-to-end. Otherwise,
the sink uses the ingestion time by default.
timestamp-type¶
- Type: STRING NOT NULL
- Access: readable
Timestamp type of the Kafka record.
Valid values are:
- “NoTimestampType”
- “CreateTime” (also set when writing metadata)
- “LogAppendTime”
Computed columns¶
Computed columns are virtual columns that are not stored in the table but are computed on the fly based on the values of other columns. These virtual columns are not registered in Schema Registry.
A computed column is defined by using an expression that references one or more physical or metadata columns in the table. The expression can use arithmetic operators, functions, and other SQL constructs to manipulate the values of the physical and metadata columns and compute the value of the computed column.
Example¶
The following CREATE TABLE statement shows the syntax for declaring a
full_name
computed column by concatenating a first_name
column and a
last_name
column.
CREATE TABLE t (
`id` BIGINT,
`first_name` STRING,
`last_name` STRING,
`full_name` AS CONCAT(first_name, ' ', last_name)
);
System columns¶
Confluent Cloud for Apache Flink introduces system columns for Flink tables. System columns build on the metadata columns.
System columns can only be read and are not part of the query-to-sink schema.
System columns aren’t selected in a SELECT *
statement, and they’re not
shown in DESCRIBE
or SHOW CREATE TABLE
statements. The result from the
DESCRIBE EXTENDED
statement does include system columns.
Both inferred and manual tables are provisioned with a set of default system columns.
$rowtime¶
Currently, $rowtime TIMESTAMP_LTZ(3) NOT NULL
is provided as a system
column.
You can use the $rowtime
system column to get the timestamp from a Kafka
record, because $rowtime
is exactly the Kafka record timestamp. If you want
to write out $rowtime
, you must use the
timestamp metadata key.
PRIMARY KEY constraint¶
A primary key constraint is a hint for Flink SQL to leverage for optimizations which specifies that a column or a set of columns in a table or a view are unique and they do not contain null.
A primary key uniquely identifies a row in a table. No columns in a primary key can be nullable.
You can declare a primary key constraint together with a column definition (a column constraint) or as a single line (a table constraint). In both cases, it must be declared as a singleton. If you define more than one primary key constraint in the same statement, Flink SQL throws an exception.
The SQL standard specifies that a constraint can be ENFORCED
or
NOT ENFORCED
, which controls whether the constraint checks are performed
on the incoming/outgoing data. Flink SQL doesn’t own the data, so the
only mode it supports is NOT ENFORCED
. It’s your responsibility to ensure
that the query enforces key integrity.
Flink SQL assumes correctness of the primary key by assuming that the column’s nullability is aligned with the columns in primary key. Connectors must ensure that these are aligned.
The PRIMARY KEY
constraint partitions the table implicitly by the key column.
A Kafka message key is defined either by an implicit DISTRIBUTED BY clause
clause from a PRIMARY KEY constraint or an explicit DISTRIBUTED BY
.
Note
In a CREATE TABLE statement, a primary key constraint alters the column’s nullability, which means that a column with a primary key constraint isn’t nullable.
Example¶
The following SQL statement creates a table named latest_page_per_ip
with
a primary key defined on ip
. This statement creates a Kafka topic, a
value-schema, and a key-schema. The value-schema contains the definitions for
page_url
and ts
, while the key-schema contains the definition for
ip
.
CREATE TABLE latest_page_per_ip (
`ip` STRING,
`page_url` STRING,
`ts` TIMESTAMP_LTZ(3),
PRIMARY KEY(`ip`) NOT ENFORCED
);
DISTRIBUTED BY clause¶
The DISTRIBUTED BY
clause buckets the created table by the specified
columns.
Bucketing enables a file-like structure with a small, human-enumerable key
space. It groups rows that have “infinite” key space, like user_id
, usually
by using a hash function, for example:
bucket = hash(user_id) % number_of_buckets
Kafka partitions map 1:1 to SQL buckets. The n
BUCKETS are used for the
number of partitions when creating a topic.
If n
is not defined, the default is 6.
- The number of buckets is fixed.
- A bucket is identifiable regardless of partition.
- Bucketing is good in long-term storage for reading across partitions based on
a large key space, for example,
user_id
. - Also, bucketing is good for short-term storage for load balancing.
Every mode comes with a default distribution, so DISTRIBUTED BY is required only
by power users. In most cases, a simple CREATE TABLE t (schema);
is sufficient.
- For upsert mode, the bucket key must be equal to primary key.
- For append/retract mode, the bucket key can be a subset of the primary key.
- The bucket key can be undefined, which corresponds to a “connector defined” distribution: round robin for append, and hash-by-row for retract.
Custom distributions are possible, but currently only custom hash distributions are supported.
Example¶
The following SQL declares a table named t_dist
that has one key column
named key
and 4 Kafka partitions.
CREATE TABLE t_dist (k INT, s STRING) DISTRIBUTED BY (k) INTO 4 BUCKETS;
PARTITIONED BY clause¶
Deprecated Use the DISTRIBUTED BY clause instead.
The PARTITIONED BY
clause partitions the created table by
the specified columns.
Use PARTITIONED BY
to declare key columns in a table explicitly. A Kafka
message key is defined either by an explicit PARTITIONED BY
clause or an
implicit PARTITIONED BY
clause from a PRIMARY KEY constraint.
Example¶
The following SQL declares a table named t
that has one key column named
key
of type INT.
CREATE TABLE t (partition_key INT, example_value STRING) PARTITIONED BY (partition_key);
If compaction is enabled, the Kafka message key is overloaded with another semantic used for compaction, which influences constraints on the Kafka message key for partitioning.
Watermark clause¶
The WATERMARK
clause defines the
event-time attributes of a table.
Watermarks in Flink are used to track the progress of event time and provide a way to trigger time-based operations. In other words, a watermark means, “I have seen all records until this point in time”.
Default watermark strategy¶
Confluent Cloud for Apache Flink provides a default watermark strategy for all tables, whether created automatically from a Kafka topic or from a CREATE TABLE statement.
The default watermark strategy is applied on the $rowtime
system column.
Watermarks are calculated per Kafka partition, and at least 250 events are required per partition.
If a delay of longer than 7 days can occur, choose a custom watermark strategy.
Because the concrete implementation is provided by Confluent, you see only
WATERMARK FOR $rowtime AS SOURCE_WATERMARK()
in the declaration.
Custom watermark strategies¶
You can replace the default strategy with a custom strategy at any time by using ALTER TABLE.
Watermark strategy reference¶
WATERMARK FOR rowtime_column_name AS watermark_strategy_expression
The rowtime_column_name
defines an existing column that is marked as
the event-time attribute of the table. The column must be of type
TIMESTAMP(3)
, and it must be a top-level column in the schema.
The watermark_strategy_expression
defines the watermark generation
strategy. It allows arbitrary non-query expressions, including computed
columns, to calculate the watermark. The expression return type must be
TIMESTAMP(3)
, which represents the timestamp since the Unix Epoch.
The returned watermark is emitted only if it’s non-null and its value is larger than the previously emitted local watermark, to respect the contract of ascending watermarks.
The watermark generation expression is evaluated by Flink SQL for every record. The framework emits the largest generated watermark periodically.
No new watermark is emitted if any of the following conditions apply.
- The current watermark is null.
- The current watermark is identical to the previous watermark.
- The value of the returned watermark is smaller than the value of the last emitted watermark.
When you use event-time semantics, your tables must contain an event-time attribute and watermarking strategy.
Flink SQL provides these watermark strategies.
Strictly ascending timestamps: Emit a watermark of the maximum observed timestamp so far. Rows that have a timestamp larger than the max timestamp are not late.
WATERMARK FOR rowtime_column AS rowtime_column
Ascending timestamps: Emit a watermark of the maximum observed timestamp so far, minus 1. Rows that have a timestamp larger than or equal to the max timestamp are not late.
WATERMARK FOR rowtime_column AS rowtime_column - INTERVAL '0.001' SECOND
Bounded out-of-orderness timestamps: Emit watermarks which are the maximum observed timestamp minus the specified delay.
WATERMARK FOR rowtime_column AS rowtime_column - INTERVAL 'string' timeUnit
The following example shows a “5-seconds delayed” watermark strategy.
WATERMARK FOR rowtime_column AS rowtime_column - INTERVAL '5' SECOND
Example¶
The following CREATE TABLE statement defines an orders
table that has a
rowtime column named order_time
with a watermark strategy with a 5-second
delay.
CREATE TABLE orders (
`user` BIGINT,
`product` STRING,
`order_time` TIMESTAMP(3),
WATERMARK FOR `order_time` AS `order_time` - INTERVAL '5' SECOND
);
LIKE¶
The CREATE TABLE LIKE clause enables creating a new table with the same schema as an existing table. It is a combination of SQL features and can be used to extend or exclude certain parts of the original table. The clause must be defined at the top-level of a CREATE statement and applies to multiple parts of the table definition.
Use the LIKE options to control the merging logic of table features. You can control the merging behavior of:
- CONSTRAINTS - Constraints such as primary key. and unique keys.
- GENERATED - Computed columns.
- METADATA - Metadata columns.
- OPTIONS - Table options.
- PARTITIONS - Partition options.
- WATERMARKS - Watermark strategies.
with three different merging strategies:
- INCLUDING - Includes the feature of the source table and fails on duplicate entries, for example, if an option with the same key exists in both tables.
- EXCLUDING - Does not include the given feature of the source table.
- OVERWRITING - Includes the feature of the source table, overwrites duplicate entries of the source table with properties of the new table. For example, if an option with the same key exists in both tables, the option from the current statement is used.
Additionally, you can use the INCLUDING/EXCLUDING ALL option to specify what should be the strategy if no specific strategy is defined. For example, if you use EXCLUDING ALL INCLUDING WATERMARKS, only the watermarks are included from the source table.
If you provide no LIKE options, INCLUDING ALL OVERWRITING OPTIONS is used as a default.
Example¶
The following CREATE TABLE statement defines a table named t
that has 5
physical columns and three metadata columns.
CREATE TABLE t (
`user_id` BIGINT,
`item_id` BIGINT,
`price` DOUBLE,
`behavior` STRING,
`created_at` TIMESTAMP(3),
`price_with_tax` AS `price` * 1.19,
`event_time` TIMESTAMP_LTZ(3) METADATA FROM 'timestamp',
`partition` BIGINT METADATA VIRTUAL,
`offset` BIGINT METADATA VIRTUAL
);
You can run the following CREATE TABLE LIKE statement to define table
t_derived
, which contains the physical and computed columns of t
,
drops the metadata and default watermark strategy, and applies a custom
watermark strategy on event_time
.
CREATE TABLE t_derived (
WATERMARK FOR `created_at` AS `created_at` - INTERVAL '5' SECOND
)
LIKE t (
EXCLUDING WATERMARKS
EXCLUDING METADATA
);
WITH options¶
Table properties used to create a table source or sink.
Both the key and value of the expression key1=val1
are string literals.
You can change an existing table’s property values by using the ALTER TABLE Statement.
You can set the following properties when you create a table.
changelog.mode¶
Set the changelog mode of the connector. For a deep dive into changelog modes, see dynamic tables.
changelog.mode = [append | upsert | retract]
These are the changelog modes for an inferred table:
append
(if uncompacted)upsert
(if compacted)
These are the changelog modes for a manually created table:
retract
(without primary key)upsert
(with primary key and/or compaction)
Primary key interaction¶
With a primary key declared, the changelog modes have these properties:
append
means that every row can be treated as an independent fact.retract
means that the combination of+X
and-X
are related and must be partitioned together.upsert
means that all rows with same primary key are related and must be partitioned together
To build indices, primary keys must be partitioned together.
Encoding of changes | Default Partitioning without PK | Default Partitioning with PK | Custom Partitioning without PK | Custom Partitioning with PK |
---|---|---|---|---|
Each value is an insertion (+I). | round robin | hash by PK | hash by specified column(s) | hash by subset of PK |
A special op header represents the change (+I, -U, +U, -D). The header is
omitted for insertions. Append queries encoding is the same for all modes. |
hash by entire value | hash by PK | hash by specified column(s) | hash by subset of PK |
If value is null , it represents a deletion (-D). Other values are +U and
the engine will normalize the changelog internally. |
unsupported, PK is mandatory | hash by PK | unsupported, PK is mandatory | unsupported |
kafka.cleanup-policy¶
kafka.cleanup-policy = [delete | compact | delete-compact]
Translates to the Kafka log.cleanup.policy
property.
The default is delete
.
kafka.max-message-size¶
kafka.max-message-size = MemorySize
Translates to the Kafka max.message.bytes
property.
The default is 2097164 bytes.
kafka.partitions¶
kafka.partitions = Integer
Deprecated Use the DISTRIBUTED BY clause to set Kafka partitions.
Translates to the Kafka num.partitions
property.
The default is 6.
kafka.retention.size¶
kafka.retention.size = MemorySize
Translates to the Kafka log.retention.bytes
property.
The default is 0 bytes.
kafka.retention.time¶
kafka.retention.time = Duration
Translates to the Kafka log.retention.ms
property.
The default is 7 days
.
key.fields-prefix¶
Specify a custom prefix for all fields of the key format.
key.fields-prefix = String
The key.fields-prefix
property defines a custom prefix for all fields of
the key format, which avoids name clashes with fields of the value format.
By default, the prefix is empty. If a custom prefix is defined, the table schema property works with prefixed names.
When constructing the data type of the key format, the prefix is removed, and the non-prefixed names are used within the key format.
This option requires that the
value.fields-include property is set to
EXCEPT_KEY
.
The prefix for an inferred table is key_
, for non-atomic Schema Registry types and
fields that have a name.
key.format¶
Specify the serialization format of the table’s key fields.
key.format = String
These are the key formats for an inferred table:
raw
(if no Schema Registry entry)avro-registry
(for AVRO Schema Registry entry)json-registry
(for JSON Schema Registry entry)proto-registry
(for Protobuf Schema Registry entry)
These are the key formats for a manually created table:
avro-registry
(for Avro Schema Registry entry)json-registry
(for JSON Schema Registry entry)proto-registry
(for Protobuf Schema Registry entry)
If no format is specified, Avro Schema Registry is used by default. This is only applicable if primary or partition key is defined.
scan.bounded.mode¶
Specify the bounded mode for the Kafka consumer.
scan.bounded.mode = [latest-offset | group-offsets | timestamp | unbounded]
The following list shows the valid bounded mode values.
group-offsets
: bounded by committed offsets in Kafka brokers of a specific consumer group. This is evaluated at the start of consumption from a given partition.latest-offset
: bounded by latest offsets. This is evaluated at the start of consumption from a given partition.timestamp
: bounded by a user-supplied timestamp.
If scan.bounded.mode
isn’t set, the default is an unbounded table.
If timestamp
is specified, the
scan.bounded.timestamp-millis config option
is required to specify a specific bounded timestamp in milliseconds since the
Unix epoch, January 1, 1970 00:00:00.000 GMT
.
scan.bounded.timestamp-millis¶
End at the specified epoch timestamp (milliseconds) when the timestamp
bounded mode is set in the scan.bounded.mode property.
scan.bounded.timestamp-millis = Long
scan.startup.mode¶
The startup mode for Kafka consumers.
scan.startup.mode = [earliest-offset | latest-offset | group-offsets | timestamp]
The following list shows the valid startup mode values.
group-offsets
: start from committed offsets in Kafka brokers of a specific consumer group.earliest-offset
: start from the earliest offset possible.latest-offset
: start from the latest offset.timestamp
: start from user-supplied timestamp for each partition.
The default is earliest-offset
, which differs from the default in
open-source Flink, which is group-offsets
.
If timestamp
is specified, the option scan.startup.timestamp-millis
config option is required, to define a specific startup timestamp in milliseconds
since the Unix epoch, January 1, 1970 00:00:00.000 GMT.
scan.startup.timestamp-millis¶
Start from the specified Unix epoch timestamp (milliseconds) when the
timestamp
is set in the scan.startup.mode
property.
scan.startup.timestamp-millis = Long
value.fields-include¶
Specify a strategy for handling key columns in the data type of the value format.
value.fields-include = [all, except-key]
The default is except-key
.
If all
is specified, all physical columns of the table schema are included
in the value format, which means that key columns appear in the data type for
both the key and value format.
value.format¶
Specify the format for serializing and deserializing the value part of Kafka messages.
value.format = String
These are the value formats for an inferred table:
raw
(if no Schema Registry entry)avro-registry
(for Avro Schema Registry entry)json-registry
(for JSON Schema Registry entry)proto-registry
(for Protobuf Schema Registry entry)
These are the value formats for a manually created table:
avro-registry
(for Avro Schema Registry entry)json-registry
(for JSON Schema Registry entry)proto-registry
(for Protobuf Schema Registry entry)
If no format is specified, Avro Schema Registry is used by default.
CREATE TABLE examples¶
The following examples show how to create Flink tables with various options.
Minimal table¶
CREATE TABLE t_minimal (s STRING);
- Properties
- Append changelog mode.
- No Schema Registry key.
- Round robin distribution.
- 6 Kafka partitions.
- The
$rowtime
column and system watermark are added implicitly.
Table with a primary key¶
- Syntax
CREATE TABLE t_pk (k INT PRIMARY KEY NOT ENFORCED, s STRING);
- Properties
- Upsert changelog mode.
- The primary key defines an implicit DISTRIBUTED BY(k).
k
is the Schema Registry key.- Hash distribution on
k
. - The table has 6 Kafka partitions.
k
is declared as being unique, meaning no duplicate rows.k
must not contain NULLs, meaning implicit NOT NULL.- The
$rowtime
column and system watermark are added implicitly.
Table with a primary key in append mode¶
- Syntax
CREATE TABLE t_pk_append (k INT PRIMARY KEY NOT ENFORCED, s STRING) DISTRIBUTED INTO 4 BUCKETS WITH ('changelog.mode' = 'append');
- Properties
- Append changelog mode.
k
is the Schema Registry key.- Hash distribution on
k
. - The table has 4 Kafka partitions.
k
is declared as being unique, meaning no duplicate rows.k
must not contain NULLs, meaning implicit NOT NULL.- The
$rowtime
column and system watermark are added implicitly.
Table with hash distribution¶
- Syntax
CREATE TABLE t_dist (k INT, s STRING) DISTRIBUTED BY (k) INTO 4 BUCKETS;
- Properties
- Upsert changelog mode.
k
is the Schema Registry key.- Hash distribution on
k
. - The table has 4 Kafka partitions.
- The
$rowtime
column and system watermark are added implicitly.
Complex table with all concepts combined¶
- Syntax
CREATE TABLE t_complex (k1 INT, k2 INT, PRIMARY KEY (k1, k2) NOT ENFORCED, s STRING) COMMENT 'My complex table' DISTRIBUTED BY HASH(k1) INTO 4 BUCKETS WITH ('changelog.mode' = 'append');
- Properties
- Append changelog mode.
k1
is the Schema Registry key.- Hash distribution on
k1
. k2
is treated as a value column and is stored in the value part of Schema Registry.- The table has 4 Kafka partitions.
k1
andk2
are declared as being unique, meaning no duplicates.k
andk2
must not contain NULLs, meaning implicit NOT NULL.- The
$rowtime
column and system watermark are added implicitly. - An additional comment is added.
Table with legacy syntax for specifying a distribution¶
- Syntax
CREATE TABLE t_partitioned_by (k INT, s STRING) PARTITIONED BY (k) WITH ('kafka.partitions' = '4');
- Properties
- Do not use this syntax or the deprecated
kafka.partitions
option. - This syntax existed before the General Availability release of Confluent Cloud for Apache Flink.
- The number of Kafka partitions was hidden in options but is crucial for the semantics of the table.
- Do not use this syntax or the deprecated
Table with overlapping names in key/value of Schema Registry but disjoint data¶
- Syntax
CREATE TABLE t_disjoint (from_key_k INT, k STRING) DISTRIBUTED BY (from_key_k) WITH ('key.fields-prefix' = 'from_key_');
- Properties
- Append changelog mode.
- Hash distribution on
from_key_k
. - The key prefix
from_key_
is defined and is stripped before storing the schema in Schema Registry.- Therefore,
k
is the Schema Registry key of type INT. - Also,
k
is the Schema Registry value of type STRING.
- Therefore,
- Both key and value store disjoint data, so they can have different data types
Create with overlapping names in key/value of Schema Registry but joint data¶
- Syntax
CREATE TABLE t_joint (k INT, v STRING) DISTRIBUTED BY (k) WITH ('value.fields-include' = 'all');
- Properties
- Append changelog mode.
- Hash distribution on
k
. - By default, the key is never included in the value in Schema Registry.
- By setting
'value.fields-include' = 'all'
, the value contains the full table schema- Therefore,
k
is the Schema Registry key. - Also,
k, v
is the Schema Registry value.
- Therefore,
- The payload of
k
is stored twice in the Kafka message, because key and value store joint data and they have the same data type fork
.
Table with metadata columns for writing a Kafka message timestamp¶
- Syntax
CREATE TABLE t_metadata_write (name STRING, ts TIMESTAMP_LTZ(3) NOT NULL METADATA FROM 'timestamp') DISTRIBUTED INTO 1 BUCKETS;
- Properties
- Adds the
ts
metadata column, which isn’t part of Schema Registry but instead is a pure Flink concept. - In contrast with
$rowtime
, which is declared as a METADATA VIRTUAL column,ts
is selected in a SELECT * statement and is writable.
- Adds the
The following examples show how to fill Kafka messages with an instant.
INSERT INTO t (ts, name) SELECT NOW(), 'Alice';
INSERT INTO t (ts, name) SELECT TO_TIMESTAMP_LTZ(0, 3), 'Bob';
SELECT $rowtime, * FROM t;