CREATE TABLE statement in ksqlDB for Confluent Platform

Synopsis

CREATE [OR REPLACE] [SOURCE] TABLE [IF NOT EXISTS] table_name
  ( { column_name data_type [PRIMARY KEY] } [, ...] )
  WITH ( property_name = expression [, ...] );

Description

Create a new table with the specified columns and properties.

Creating a table registers it on an underlying Apache Kafka® topic, so you can use SQL statements to perform operations like joins and aggregations on the topic’s data. The table is said to be backed by the topic.

Important

Registering a table on a topic by using the CREATE TABLE statement is distinct from using the CREATE TABLE AS SELECT statement, which creates a table from the result of a SELECT query.

Specify CREATE OR REPLACE to replace an existing table with a new query that resumes from the same processing point as the previously existing query.

If you provide the IF NOT EXISTS clause, the statement won’t fail if a table with the same name already exists. Instead, ksqlDB returns a warning, A table with the same name already exists.

For more information, see Stream Processing.

PRIMARY KEY

A ksqlDB table works much like tables in other SQL systems. A table has zero or more rows. Each row is identified by its PRIMARY KEY. A row’s PRIMARY KEY can’t be NULL.

Important

You must declare a PRIMARY KEY when you create a table on a Kafka topic.

If an incoming message in the underlying Kafka topic has the same key as an existing row, it replaces the existing row in the table. But if the message’s value is NULL, it deletes the row.

Tip

A message that has a NULL value is known as a tombstone, because it causes the existing row to be deleted.

This situation is handled differently by a ksqlDB STREAM, as shown in the following summary.

  STREAM TABLE
Key column type KEY PRIMARY KEY
NON NULL key constraint No Yes: A message in the Kafka topic with a NULL PRIMARY KEY is ignored.
Unique key constraint No: A message with the same key as another has no special meaning. Yes: A later message with the same key replaces earlier messages in the table.
Tombstones No: A message with a NULL value is ignored. Yes: A NULL message value is treated as a tombstone. Any existing row with a matching key is deleted from the table.

Columns

Each column in a table is defined by the following syntax:

  • column_name: the name of the column. If unquoted, the name must be a valid SQL identifier and ksqlDB converts it to uppercase. The name can be quoted if case needs to be preserved or if the name is not a valid SQL identifier, for example `mixedCaseId` or `$with@invalid!chars`.
  • data_type: the SQL type of the column. Columns can be any of the data types supported by ksqlDB.
  • HEADERS or HEADER('<key>'): columns that are populated by the Kafka message’s header should be marked as HEADERS or HEADER('<key>') columns. If a column is marked by HEADERS, it contains the full list of header keys and values. If a column is marked by HEADER('<key>'), it contains the last header that matches the key, or NULL if that key is not in the list of headers.
  • PRIMARY KEY: columns that are stored in the Kafka message’s key should be marked as PRIMARY KEY columns. If a column is unmarked, ksqlDB loads it from the Kafka message’s value. Unlike a stream’s KEY column, a table’s PRIMARY KEY column(s) are NOT NULL. Any records in the Kafka topic with NULL key columns are dropped.

Serialization

For supported Serialization Formats, ksqlDB can integrate with Confluent Schema Registry to help ensure the correct message format for a table.

ksqlDB can use Schema Inference to define columns automatically in your CREATE TABLE statements, so you don’t need to declare them manually. Also, ksqlDB can use Schema Inference With ID to define columns automatically and enable using a physical schema for data serialization.

Note

  • To use the Avro, Protobuf, or JSON_SR formats, you must enable Schema Registry and set ksql.schema.registry.url in the ksqlDB Server configuration file. For more information, see Configure ksqlDB for Avro, Protobuf, and JSON schemas.
  • The JSON format doesn’t require Schema Registry to be enabled. - Avro and Protobuf field names are not case sensitive in ksqlDB. This matches the ksqlDB column name behavior.

ROWTIME

Each row within the table has a ROWTIME pseudo column, which represents the last modified time of the row. The timestamp is used by ksqlDB during windowing operations and during joins, where data from each side of a join is processed in time order.

The ROWTIME timestamp has an accuracy of milliseconds.

By default, ROWTIME is populated from the corresponding Kafka message timestamp. Set TIMESTAMP in the WITH clause to populate ROWTIME from a column in the Kafka message key or value.

For more information, see Time and Windows in ksqlDB Queries.

Partitioning

Assign the PARTITIONS property in the WITH clause to specify the number of partitions in the table’s backing topic.

Partitioning tables is especially important for stateful or otherwise intensive queries. For more information, see Parallelization.

ROWPARTITION and ROWOFFSET

Like ROWTIME, ROWPARTITION and ROWOFFSET are pseudo columns. They represent the partition and offset of the source topic.

For example, if you issue a push query on a table backed by topic x that specifies ROWPARTITION or ROWOFFSET in the SELECT clause, the push query’s projection contains the partition and offset information of the underlying messages in topic x.

Source tables

Provide the SOURCE clause to enable running pull queries on the table.

The SOURCE clause runs an internal query for the table to create a materialized state that’s used by pull queries. You can’t terminate this query manually. Terminate it by dropping the table with the DROP TABLE statement.

When you create a SOURCE table, the table is created as read-only. For a read-only table, INSERT, DELETE TOPIC, and DROP TABLE statements aren’t permitted.

To disable the SOURCE table feature, set ksql.source.table.materialization.enabled to false in the ksqlDB Server properties file.

Table properties

Use the WITH clause to specify details about your table. The WITH clause supports the following properties.

FORMAT

The serialization format of both the message key and value in the topic. For supported formats, see Serialization Formats.

You can’t use the FORMAT property with the KEY_FORMAT or VALUE_FORMAT properties in the same CREATE TABLE statement.

KAFKA_TOPIC (required)

The name of the Kafka topic that backs the table.

The topic must already exist in Kafka, or you must specify PARTITIONS when you create the topic. The statement fails if the topic exists already with different partition or replica counts.

KEY_FORMAT

The serialization format of the message key in the topic. For supported formats, see Serialization Formats.

If not supplied, the system default is used, defined by ksql.persistence.default.format.key. If the default is also not set, the statement is rejected as invalid.

You can’t use the KEY_FORMAT property with the FORMAT property in the same CREATE TABLE statement.

KEY_PROTOBUF_NULLABLE_REPRESENTATION

In the default configuration, primitive fields in protobuf do not distinguish null from the default values (such as zero, empty string). To enable the use of a protobuf schema that can make this distinction, set KEY_PROTOBUF_NULLABLE_REPRESENTATION to either OPTIONAL or WRAPPER. The schema will be used to serialize keys for the table created by this CREATE statement. For more details, see the corresponding section in the Serialization Formats documentation.

KEY_SCHEMA_ID

The schema ID of the key schema in Schema Registry.

The schema is used for schema inference and data serialization.

For more information, see Schema Inference With Schema ID.

PARTITIONS

The number of partitions in the backing topic. You must set this property if you create a table without an existing topic, and the statement fails if the topic doesn’t exist.

You can’t change the number of partitions on an existing table. To change the partition count, you must drop the table and create it again.

REPLICAS

The number of replicas in the backing topic. If this property isn’t set, but PARTITIONS is set, the default Kafka cluster configuration for replicas is used for creating a new topic.

TIMESTAMP

By default, the ROWTIME pseudo column is the timestamp of the message in the Kafka topic.

You can use the TIMESTAMP property to override ROWTIME with the contents of the specified column within the Kafka message, similar to timestamp extractors in the Kafka Streams API.

Time-based operations, like windowing, process a record according to the timestamp in ROWTIME.

Timestamps have an accuracy of milliseconds.

TIMESTAMP_FORMAT

Use with the TIMESTAMP property to specify the type and format of the timestamp column.

  • If set, the TIMESTAMP column must be of type varchar and have a format that can be parsed with the Java DateTimeFormatter.
  • If not set, the ksqlDB timestamp column must be of type bigint or timestamp.

If your timestamp format has characters that require single quotes, escape them with successive single quotes, '', for example: 'yyyy-MM-dd''T''HH:mm:ssX'.

For more information, see Timestamp formats.

VALUE_DELIMITER

Set the delimiter string to use when VALUE_FORMAT is set to DELIMITED.

You can use a single character as a delimiter. The default is ','.

For space-delimited and tab-delimited values, use the special values SPACE or TAB instead of the actual space or tab characters.

VALUE_FORMAT

The serialization format of the message value in the topic. For supported formats, see Serialization Formats.

If VALUE_FORMAT isn’t provided, the system default is used, defined by ksql.persistence.default.format.value. If the default is also not set, the statement is rejected as invalid.

You can’t use the VALUE_FORMAT property with the FORMAT property in the same CREATE TABLE statement.

VALUE_PROTOBUF_NULLABLE_REPRESENTATION

In the default configuration, primitive fields in protobuf do not distinguish null from the default values (such as zero, empty string). To enable the use of a protobuf schema that can make this distinction, set VALUE_PROTOBUF_NULLABLE_REPRESENTATION to either OPTIONAL or WRAPPER. The schema will be used to serialize values for the table created by this CREATE statement. For more details, see the corresponding section in the Serialization Formats documentation.

VALUE_SCHEMA_ID

The schema ID of the value schema in Schema Registry. The schema is used for schema inference and data serialization. For more information, see Schema Inference With Schema ID.

WRAP_SINGLE_VALUE

Specifies how ksqlDB deserializes the value of messages in the backing topic that contain only a single column.

  • If set to true, ksqlDB expects the column to have been serialized as a named column within a record.
  • If set to false, ksqlDB expects the column to have been serialized as an anonymous value.
  • If not supplied, the system default is used, defined by the ksql.persistence.wrap.single.values configuration property and defaulting to true.

Note

  • Be careful when you have a single-column schema where the value can be NULL, because NULL values have a special meaning in ksqlDB.
  • Supplying this property for formats that don’t support wrapping, for example DELIMITED, or when the value schema has multiple columns, causes an error.

For more information, see Single field unwrapping.

Examples

-- table with declared columns:
CREATE TABLE users (
     id BIGINT PRIMARY KEY,
     usertimestamp BIGINT,
     gender VARCHAR,
     region_id VARCHAR
   ) WITH (
     KAFKA_TOPIC = 'my-users-topic',
     VALUE_FORMAT = 'JSON'
   );
-- table with value columns loaded from Schema Registry:
CREATE TABLE users (
     id BIGINT PRIMARY KEY
   ) WITH (
     KAFKA_TOPIC = 'my-users-topic',
     VALUE_FORMAT = 'JSON_SR'
   );
-- table with value columns loaded from Schema Registry with VALUE_SCHEMA_ID:
CREATE TABLE users (
     id BIGINT PRIMARY KEY
   ) WITH (
     KAFKA_TOPIC = 'my-users-topic',
     VALUE_FORMAT = 'JSON_SR',
     VALUE_SCHEMA_ID =2
   );
-- table with key columns loaded from Schema Registry with KEY_SCHEMA_ID:
CREATE TABLE users (
     usertimestamp BIGINT,
     gender VARCHAR,
     region_id VARCHAR
   ) WITH (
     KAFKA_TOPIC = 'my-users-topic',
     KEY_FORMAT = 'AVRO',
     KEY_SCHEMA_ID = 1,
     VALUE_FORMAT = 'JSON_SR'
   );