Quick Reference for ksqlDB for Confluent Platform
For detailed descriptions of ksqlDB SQL statements and keywords, see the ksqlDB API reference.
For details on SQL syntax, see the ksqlDB syntax reference.
ADVANCE BY
Specify the duration of a “hop” in a HOPPING window. For more information, see Hopping window.
SELECT [...], aggregate_function
WINDOW HOPPING (SIZE <time_span> <time_units>, ADVANCE BY <time_span> <time_units>) [...]
ALTER SYSTEM
Change a system-level property value (only available in Confluent Cloud).
ALTER SYSTEM 'auto.offset.reset'='earliest';
ALTER STREAM
Add new columns to a stream. This is not supported for streams defined
using queries (CREATE STREAM ... AS).
ALTER STREAM stream_name
ADD [COLUMN] column_name data_type
ADD [COLUMN] ... ...
...
ALTER TABLE
Add new columns to a table. This is not supported for tables defined
using queries (CREATE TABLE ... AS)
ALTER TABLE stream_name
ADD [COLUMN] column_name data_type
ADD [COLUMN] ... ...
...
AND / OR
Logical AND/OR operators in a WHERE clause. For more information, see SELECT.
SELECT column_name(s)
FROM stream_name | table_name
WHERE condition
AND|OR condition
AS
Alias a column, expression, or type. For more information, see Create a table.
SELECT column_name AS column_alias
FROM stream_name | table_name
ASSERT
Assert values, stream, table, or tombstones.
ASSERT NULL VALUES sourceName (columns)? KEY values
ASSERT SCHEMA
Assert the existence or non-existence of a schema.
ASSERT (NOT EXISTS)? SCHEMA (SUBJECT subjectName)? (ID id)? (TIMEOUT timeout)?;
ASSERT TOPIC
Assert the existence or non-existence of a topic.
ASSERT (NOT EXISTS)? TOPIC topicName (WITH properties)? (TIMEOUT timeout)?;
BETWEEN
Constrain a value to a specified range in a WHERE clause.
WHERE expression [NOT] BETWEEN start_expression AND end_expression;
The BETWEEN operator is used to indicate that a certain value must be within a specified range, including boundaries. ksqlDB supports any expression that resolves to a numeric or string value for comparison.
The following push query uses the BETWEEN clause to select only records
that have an event_id between 10 and 20.
SELECT event
FROM events
WHERE event_id BETWEEN 10 AND 20
EMIT CHANGES;
CASE
Select a condition from one or more expressions.
SELECT
CASE
WHEN condition THEN result
[ WHEN … THEN … ]
…
[ WHEN … THEN … ]
[ ELSE result ]
END
FROM stream_name | table_name;
ksqlDB supports a searched form of CASE expression. In this form,
CASE evaluates each boolean condition in WHEN clauses, from left to
right. If a condition is true, CASE returns the corresponding result. If
none of the conditions is true, CASE returns the result from the ELSE
clause. If none of the conditions is true and there is no ELSE clause,
CASE returns null.
The schema for all results must be the same, otherwise ksqlDB rejects the statement.
The following example push query uses a CASE expression.
SELECT
CASE
WHEN orderunits < 2.0 THEN 'small'
WHEN orderunits < 4.0 THEN 'medium'
ELSE 'large'
END AS case_result
FROM orders
EMIT CHANGES;
CAST
Change the type of an expression to a different type.
CAST (expression AS data_type);
You can cast an expression’s type to a new type by using CAST.
The following example query converts a numerical count, which is a
BIGINT, into a suffixed string, which is a VARCHAR. For example, the
integer 5 becomes 5_HELLO.
SELECT page_id, CONCAT(CAST(COUNT(*) AS VARCHAR), '_HELLO')
FROM pageviews_enriched
WINDOW TUMBLING (SIZE 20 SECONDS)
GROUP BY page_id;
CREATE CONNECTOR
Create a new connector in the Kafka Connect cluster. For more information, see CREATE CONNECTOR.
CREATE SOURCE | SINK CONNECTOR connector_name
WITH( property_name = expression [, ...]);
CREATE STREAM
Register a stream on a Kafka topic. For more information, see CREATE STREAM.
CREATE STREAM stream_name ( { column_name data_type [KEY] } [, ...]
WITH ( property_name = expression [, ...] );
CREATE STREAM AS SELECT
Create a new materialized stream and corresponding Kafka topic, and stream the result of the query into the topic. For more information, see CREATE STREAM AS SELECT.
CREATE STREAM stream_name
[WITH ( property_name = expression [, ...] )]
AS SELECT select_expr [, ...]
FROM from_stream
[[ LEFT | FULL | INNER ]
JOIN [join_table | join_stream]
[WITHIN [<size> <timeunit> | (<before_size> <timeunit>, <after_size> <timeunit>)] [GRACE PERIOD <grace_size> <timeunit>]]
ON join_criteria]*
[ WHERE condition ]
[PARTITION BY new_key_expr [, ...]]
EMIT CHANGES;
CREATE TABLE
Register a stream on a Kafka topic. For more information, see CREATE TABLE.
CREATE TABLE table_name ( { column_name data_type (PRIMARY KEY) } [, ...] )
WITH ( property_name = expression [, ...] );
CREATE TABLE AS SELECT
Create a new materialized table and corresponding Kafka topic, and stream the result of the query as a changelog into the topic. For more information, see CREATE TABLE AS SELECT.
CREATE TABLE table_name
[WITH ( property_name = expression [, ...] )]
AS SELECT select_expr [, ...]
FROM from_stream | from_table
[[ LEFT | FULL | INNER ] JOIN [join_table | join_stream] ON join_criteria]*
[ WINDOW window_expression ]
[ WHERE condition ]
[ GROUP BY grouping_expression ]
[ HAVING having_expression ]
EMIT CHANGES;
CREATE TYPE
Alias a complex type declaration. For more information, see CREATE TYPE.
CREATE TYPE <type_name> AS <type>;
DEFINE
Defines a variable. For more information, see DEFINE.
DEFINE <name> = '<value>';
DESCRIBE
List columns in a stream or table along with their data types and other attributes. For more information, see DESCRIBE.
DESCRIBE [EXTENDED] (stream_name | table_name);
DESCRIBE CONNECTOR
List details about a connector. For more information, see DESCRIBE CONNECTOR.
DESCRIBE CONNECTOR connector_name;
DESCRIBE FUNCTION
List details about a function, including input parameters and return type. For more information, see DESCRIBE FUNCTION.
DESCRIBE FUNCTION function_name;
DROP CONNECTOR
Delete a connector from the Connect cluster. For more information, see DROP CONNECTOR.
DROP CONNECTOR connector_name;
DROP STREAM
Drop an existing stream and optionally mark the stream’s source topic for deletion. For more information, see DROP STREAM.
DROP STREAM [IF EXISTS] stream_name [DELETE TOPIC];
DROP TABLE
Drop an existing table and optionally mark the table’s source topic for deletion. For more information, see DROP TABLE.
DROP TABLE [IF EXISTS] table_name [DELETE TOPIC];
DROP TYPE
Remove a type alias from ksqlDB. For more information, see DROP TYPE.
DROP TYPE [IF EXISTS] <type_name> AS <type>;
EMIT CHANGES
Specify a push query with a continuous output refinement in a SELECT statement. For more information, see Push Queries.
CREATE STREAM stream_name
AS SELECT select_expr [, ...]
FROM from_stream
EMIT CHANGES;
EMIT FINAL
Specify a push query with a suppressed output refinement in a SELECT statement on a windowed aggregation. For more information, see Push Queries.
CREATE TABLE table_name
AS SELECT select_expr_with_aggregation [, ...]
FROM from_stream
[ WINDOW window_expression ]
[ GROUP BY grouping_expression ]
EMIT FINAL;
EXPLAIN
Show the execution plan for a SQL expression or running query. For more information, see EXPLAIN.
EXPLAIN (sql_expression | query_id);
FULL JOIN
Select all records when there is a match in the left stream/table or the right stream/table records. Equivalent to FULL OUTER JOIN. For more information, see Join streams and tables.
SELECT column_name(s)
FROM stream_name1 | table_name1
FULL JOIN stream_name2 | table_name2
ON <stream_name1|table_name1>.column_name=<stream_name2|table_name2>.column_name
GRACE PERIOD
Allow events to be accepted for a time period after a window ends. For more information, see Out-of-order events
SELECT orderzip_code, TOPK(order_total, 5) FROM orders
WINDOW TUMBLING (SIZE 1 HOUR, GRACE PERIOD 2 HOURS)
GROUP BY order_zipcode
EMIT CHANGES;
GROUP BY
Group records in a window. Required by the WINDOW clause. Windowing queries must group by the keys that are selected in the query. For more information, see Time and Windows in ksqlDB.
SELECT column_name, aggregate_function(column_name)
FROM table_name
WHERE column_name operator value
GROUP BY column_name
HAVING
Extract records from an aggregation that fulfill a specified condition.
SELECT column_name, aggregate_function(column_name)
FROM table_name
WHERE column_name operator value
GROUP BY column_name
HAVING aggregate_function(column_name) operator value
HEADER
Populate a column with the Kafka record’s last header that matches the key.
CREATE STREAM S (column_name BYTES HEADER('key'))
WITH (kafka_topic='s', format='json');
HEADERS
Populate a column with the full list of the Kafka record’s headers.
CREATE STREAM S (column_name ARRAY<STRUCT<key STRING, value BYTES>> HEADERS)
WITH (kafka_topic='s', format='json');
HOPPING
Group input records into fixed-sized, possibly overlapping windows, based on the timestamps of the records. For more information, see HOPPING.
SELECT WINDOWSTART, WINDOWEND, aggregate_function
FROM from_stream
WINDOW HOPPING window_expression
EMIT CHANGES;
IF EXISTS
Test whether a stream or table is present in ksqlDB.
DROP STREAM [IF EXISTS] stream_name [DELETE TOPIC];
DROP TABLE [IF EXISTS] table_name [DELETE TOPIC];
IN
Specifies multiple OR conditions.
SELECT select_expr [, ...]
FROM from_stream | from_table
WHERE exp IN (exp0, exp1, exp2);
The above is equivalent to:
SELECT select_expr [, ...]
FROM from_stream | from_table
WHERE exp = exp0 OR exp = exp1 OR exp = exp2;
INNER JOIN
Select records in a stream or table that have matching values in another stream or table. For more information, see Join streams and tables.
SELECT column_name(s)
FROM stream_name1 | table_name1
INNER JOIN stream_name2 | table_name2
ON <stream_name1|table_name1>.column_name=<stream_name2|table_name2>.column_name
Tip
See INNER_JOIN in action:
INSERT INTO
Stream the result of a SELECT query into an existing stream and its underlying Kafka topic. For more information, see INSERT INTO.
INSERT INTO stream_name
SELECT select_expr [, ...]
FROM from_stream
[ LEFT | FULL | INNER ]
JOIN [join_table | join_stream]
[WITHIN [<size> <timeunit> | (<before_size> <timeunit>, <after_size> <timeunit>)] [GRACE PERIOD <grace_size> <timeunit>]]
ON join_criteria
[ WHERE condition ]
[ PARTITION BY new_key_expr [, ...] ]
EMIT CHANGES;
INSERT VALUES
Produce a row into an existing stream or table and its underlying Kafka topic based on explicitly specified values. For more information, see INSERT VALUES.
INSERT INTO stream_name|table_name [(column_name [, ...]])]
VALUES (value [,...]);
LEFT JOIN
Select all records from the left stream/table and the matched records from the right stream/table. For more information, see Join streams and tables.
SELECT column_name(s)
FROM stream_name1 | table_name1
LEFT JOIN stream_name2 | table_name2
ON <stream_name1|table_name1>.column_name=<stream_name2|table_name2>.column_name
LIKE
Match a string with the specified pattern.
SELECT select_expr [, ...]
FROM from_stream | from_table
WHERE exp LIKE pattern_string;
The LIKE operator is used for prefix or suffix matching. ksqlDB supports
the % wildcard, which represents zero or more characters.
The following push query uses the % wildcard to match any
user_id that starts with “santa”.
SELECT user_id
FROM users
WHERE user_id LIKE 'santa%'
EMIT CHANGES;
NULLIF
Returns NULL if two expressions are equal, otherwise it returns the first expression.
SELECT NULLIF(col1, col2)
FROM from_stream | from_table
EMIT CHANGES;
PARTITION BY
Repartition a stream. For more information, see Partition Data to Enable Joins.
CREATE STREAM stream_nam
WITH ([...,]
PARTITIONS=number_of_partitions)
AS SELECT select_expr [, ...]
FROM from_stream
PARTITION BY new_key_expr [, ...]
EMIT CHANGES;
PAUSE
Pause a persistent query. For more information, see PAUSE.
PAUSE query_id;
PRINT
Print the contents of Kafka topics to the ksqlDB CLI. For more information, see PRINT.
PRINT topicName [FROM BEGINNING] [INTERVAL interval] [LIMIT limit]
RESUME
End a paused persistent query. For more information, see RESUME.
RESUME query_id;
RIGHT JOIN
Select all records from the right stream/table and the matched records from the left stream/table. For more information, see Join streams and tables.
SELECT column_name(s)
FROM stream_name1 | table_name1
RIGHT JOIN stream_name2 | table_name2
ON <stream_name1|table_name1>.column_name=<stream_name2|table_name2>.column_name
RUN SCRIPT
Execute predefined queries and commands from a file. For more information, see RUN SCRIPT.
RUN SCRIPT <path-to-query-file>;
SELECT (Pull Query)
Pull the current value from a materialized table and terminate. For more information, see SELECT (Pull Query).
SELECT select_expr [, ...]
FROM from_item
[ WHERE where_condition ]
[ AND window_bounds ]
[ LIMIT count ];
SELECT (Push Query)
Push a continuous stream of updates to a stream or table. For more information, see SELECT (Push Query).
SELECT select_expr [, ...]
FROM from_item
[[ LEFT | FULL | INNER ]
JOIN join_item
[WITHIN [<size> <timeunit> | (<before_size> <timeunit>, <after_size> <timeunit>)] [GRACE PERIOD <grace_size> <timeunit>]]
ON join_criteria]*
[ WINDOW window_expression ]
[ WHERE condition ]
[ GROUP BY grouping_expression [, ...] ]
[ HAVING having_expression ]
EMIT CHANGES
[ LIMIT count ];
SESSION
Group input records into a session window. For more information, see SELECT (Push Query).
SELECT WINDOWSTART, WINDOWEND, aggregate_function
FROM from_stream
WINDOW SESSION window_expression
EMIT CHANGES;
SET property
Assign a property value.
SET 'auto.offset.reset'='earliest';
SHOW CONNECTORS
List all connectors in the Connect cluster. For more information, see SHOW CONNECTORS.
SHOW | LIST CONNECTORS;
SHOW FUNCTIONS
List available scalar and aggregate functions available. For more information, see SHOW FUNCTIONS.
SHOW | LIST FUNCTIONS;
SHOW PROPERTIES
List the configuration settings that are currently in effect. For more information, see SHOW PROPERTIES.
SHOW PROPERTIES;
SHOW QUERIES
List queries that are currently running in the cluster. For more information, see SHOW QUERIES.
SHOW | LIST QUERIES [EXTENDED];
SHOW STREAMS
List the currently defined streams. For more information, see SHOW STREAMS.
SHOW | LIST STREAMS [EXTENDED];
SHOW TABLES
List the currently defined tables. For more information, see SHOW TABLES.
SHOW | LIST TABLES [EXTENDED];
SHOW TOPICS
List the available topics in the Kafka cluster that ksqlDB is configured to connect to. For more information, see SHOW TOPICS.
SHOW | LIST [ALL] TOPICS [EXTENDED];
SHOW TYPES
List all custom types and their type definitions. For more information, see SHOW TYPES.
SHOW | LIST TYPES;
SHOW VARIABLES
List all defined variables.
SHOW VARIABLES;
SIZE
Specify the duration of a HOPPING or TUMBLING window. For more information, see Time and Windows in ksqlDB.
SELECT WINDOWSTART, WINDOWEND, aggregate_function
FROM from_stream
WINDOW TUMBLING (SIZE <time_span> <time_units>)
EMIT CHANGES;
SPOOL
Store issued commands and their results in a file. For more information, see SPOOL.
SPOOL <file_name|OFF>
TERMINATE
End a query. For more information, see TERMINATE.
TERMINATE query_id;
TUMBLING
Group input records into fixed-sized, non-overlapping windows based on the timestamps of the records. For more information, see TUMBLING.
SELECT WINDOWSTART, WINDOWEND, aggregate_function
FROM from_stream
WINDOW TUMBLING window_expression
EMIT CHANGES;
UNDEFINE
Undefines a variable.
UNDEFINE name;
UNSET property
Unassign a property value.
UNSET 'auto.offset.reset';
WHERE
Extract records that fulfill a specified condition. For more information, see SELECT.
SELECT column_name(s)
FROM from_stream | from_table
WHERE column_name operator value
WINDOW
Group input records that have the same key into a window, for operations like aggregations and joins. For more information, see WINDOW.
SELECT WINDOWSTART, WINDOWEND, aggregate_function
FROM from_stream
WINDOW window_expression
EMIT CHANGES;
WINDOWSTART / WINDOWEND
Specify the beginning and end bounds of a window. For more information, see WINDOW.
SELECT WINDOWSTART, WINDOWEND, aggregate_function
FROM from_stream
WINDOW window_expression
EMIT CHANGES;
Tip
See WINDOWSTART in action: