Important

You are viewing documentation for an older version of Confluent Platform. For the latest, click here.

Create a KSQL Table

In KSQL, you create tables from Apache Kafka® topics, and you create tables of query results from other tables or streams.

  • Use the CREATE TABLE statement to create a table from a Kafka topic.
  • Use the CREATE TABLE AS SELECT statement to create a table with query results from an existing table or stream.

Note

Creating streams is similar to creating tables. For more information, see Create a KSQL Stream.

Create a Table from a Kafka Topic

Use the CREATE TABLE statement to create a table from an underlying Kafka topic. The Kafka topic must exist already in your Kafka cluster.

The following examples show how to create tables from a Kafka topic, named users. To see these examples in action, create the users topic by following the procedure in Writing Streaming Queries Against Apache Kafka® Using KSQL (Docker).

Create a Table with Selected Columns

The following example creates a table that has four columns from the users topic: registertime, userid, gender, and regionid. Also, the userid field is assigned as the table’s KEY property.

Note

The KEY field is required for all KSQL tables. For more information, see Key Requirements.

KSQL can’t infer the topic’s data format, so you must provide the format of the values that are stored in the topic. In this example, the data format is JSON. Other options are Avro and DELIMITED.

In the KSQL CLI, paste the following CREATE TABLE statement:

CREATE TABLE users \
  (registertime BIGINT, \
   userid VARCHAR, \
   gender VARCHAR, \
   regionid VARCHAR) \
  WITH (KAFKA_TOPIC = 'users', \
        VALUE_FORMAT='JSON', \
        KEY = 'userid');

Your output should resemble:

 Message
---------------
 Table created
---------------

Inspect the table by using the SHOW TABLES and DESCRIBE statements:

SHOW TABLES;

Your output should resemble:

 Table Name | Kafka Topic | Format | Windowed
----------------------------------------------
 USERS      | users       | JSON   | false
----------------------------------------------

Get the schema for the table:

DESCRIBE users;

Your output should resemble:

Name                 : USERS
 Field        | Type
------------------------------------------
 ROWTIME      | BIGINT           (system)
 ROWKEY       | VARCHAR(STRING)  (system)
 REGISTERTIME | BIGINT
 USERID       | VARCHAR(STRING)
 GENDER       | VARCHAR(STRING)
 REGIONID     | VARCHAR(STRING)
------------------------------------------
For runtime statistics and query details run: DESCRIBE EXTENDED <Stream,Table>;

Create a continuous streaming query on the users table by using the SELECT statement:

SELECT * FROM users;

Your output should resemble:

1541439611069 | User_2 | 1498028899054 | User_2 | MALE | Region_1
1541439611320 | User_6 | 1505677113995 | User_6 | FEMALE | Region_7
1541439611396 | User_5 | 1491338621627 | User_5 | OTHER | Region_2
1541439611536 | User_9 | 1492621173463 | User_9 | FEMALE | Region_3
^CQuery terminated

Press CTRL+C to stop printing the query results.

The table values update continuously with the most recent records, because the underlying users topic receives new messages continuously.

Create a KSQL Table with Streaming Query Results

Use the CREATE TABLE AS SELECT statement to create a KSQL table that contains the results of a SELECT query from another table or stream.

CREATE TABLE AS SELECT creates a new KSQL table with a corresponding Kafka topic and streams the result of the SELECT query as a changelog into the topic. KSQL creates a persistent query that runs continuously until you terminate it explicitly.

To stream the result of a SELECT query into an existing table and its underlying topic, use the INSERT INTO statement.

The following KSQL statement creates a users_female table that contains results from a persistent query for users that have gender set to FEMALE:

CREATE TABLE users_female AS \
  SELECT userid, gender, regionid FROM users \
  WHERE gender='FEMALE';

Your output should resemble:

 Message
---------------------------
 Table created and running
---------------------------

Inspect the table by using the SHOW TABLES and PRINT statements:

SHOW TABLES;

Your output should resemble:

 Table Name   | Kafka Topic  | Format | Windowed
-------------------------------------------------
 USERS        | users        | JSON   | false
 USERS_FEMALE | USERS_FEMALE | JSON   | false
-------------------------------------------------

Print some rows in the table:

PRINT users_female;

Your output should resemble:

Format:JSON
{"ROWTIME":1541458112639,"ROWKEY":"User_5","USERID":"User_5","GENDER":"FEMALE","REGIONID":"Region_4"}
{"ROWTIME":1541458112857,"ROWKEY":"User_2","USERID":"User_2","GENDER":"FEMALE","REGIONID":"Region_7"}
{"ROWTIME":1541458112838,"ROWKEY":"User_9","USERID":"User_9","GENDER":"FEMALE","REGIONID":"Region_4"}
^CTopic printing ceased

Press CTRL+C to stop printing the table.

Note

The query continues to run after you stop printing the table.

Use the SHOW QUERIES statement to view the query that KSQL created for the users_female table:

SHOW QUERIES;

Your output should resemble:

 Query ID            | Kafka Topic  | Query String
-----------------------------------------------------------------------------------------------------------------------------------------
 CTAS_USERS_FEMALE_0 | USERS_FEMALE | CREATE TABLE users_female AS   SELECT userid, gender, regionid FROM users   WHERE gender='FEMALE';
-----------------------------------------------------------------------------------------------------------------------------------------
For detailed information on a Query run: EXPLAIN <Query ID>;

A persistent query that’s created by the CREATE TABLE AS SELECT statement has the string CTAS in its ID, for example, CTAS_USERS_FEMALE_0.

Create a KSQL Table from a KSQL Stream

Use the CREATE TABLE AS SELECT statement to create a table from a stream. Creating a table from a stream requires aggregation, so you need to include a function like COUNT(*) in the SELECT clause.

CREATE TABLE pageviews_table AS                            \
  SELECT viewtime, userid, pageid, COUNT(*) AS TOTAL       \
  FROM pageviews_original WINDOW TUMBLING (SIZE 1 MINUTES) \
  GROUP BY viewtime, userid, pageid;

Your output should resemble:

 Message
---------------------------
 Table created and running
---------------------------
ksql>

Inspect the table by using a SELECT statement.

SELECT * FROM pageviews_table;

Your output should resemble:

1557183929488 | 1557183929488|+|User_9|+|Page_39 : Window{start=1557183900000 end=-} | 1557183929488 | User_9 | Page_39 | 1
1557183930211 | 1557183930211|+|User_1|+|Page_79 : Window{start=1557183900000 end=-} | 1557183930211 | User_1 | Page_79 | 1
1557183930687 | 1557183930687|+|User_9|+|Page_34 : Window{start=1557183900000 end=-} | 1557183930687 | User_9 | Page_34 | 1
1557183929786 | 1557183929786|+|User_5|+|Page_12 : Window{start=1557183900000 end=-} | 1557183929786 | User_5 | Page_12 | 1
1557183931095 | 1557183931095|+|User_3|+|Page_43 : Window{start=1557183900000 end=-} | 1557183931095 | User_3 | Page_43 | 1
1557183930184 | 1557183930184|+|User_1|+|Page_29 : Window{start=1557183900000 end=-} | 1557183930184 | User_1 | Page_29 | 1
1557183930727 | 1557183930726|+|User_6|+|Page_93 : Window{start=1557183900000 end=-} | 1557183930726 | User_6 | Page_93 | 1
^CQuery terminated
ksql>

Delete a KSQL Table

Use the DROP TABLE statement to delete a table. If you created the table by using CREATE TABLE AS SELECT, you must first terminate the corresponding persistent query.

Use the TERMINATE statement to stop the CTAS_USERS_FEMALE_0 query:

TERMINATE CTAS_USERS_FEMALE_0;

Your output should resemble:

 Message
-------------------
 Query terminated.
-------------------

Use the DROP TABLE statement to delete the users_female table:

DROP TABLE users_female;

Your output should resemble:

 Message
-----------------------------------
 Source USERS_FEMALE was dropped.
-----------------------------------