Quick Reference for ksqlDB for Confluent Platform¶
For detailed descriptions of ksqlDB SQL statements and keywords, see the ksqlDB API reference.
For details on SQL syntax, see the ksqlDB syntax reference.
ADVANCE BY¶
Specify the duration of a “hop” in a HOPPING window. For more information, see Hopping window.
SELECT [...], aggregate_function
WINDOW HOPPING (SIZE <time_span> <time_units>, ADVANCE BY <time_span> <time_units>) [...]
ALTER SYSTEM¶
Change a system-level property value (only available in Confluent Cloud).
ALTER SYSTEM 'auto.offset.reset'='earliest';
ALTER STREAM¶
Add new columns to a stream. This is not supported for streams defined
using queries (CREATE STREAM ... AS
).
ALTER STREAM stream_name
ADD [COLUMN] column_name data_type
ADD [COLUMN] ... ...
...
ALTER TABLE¶
Add new columns to a table. This is not supported for tables defined
using queries (CREATE TABLE ... AS
)
ALTER TABLE stream_name
ADD [COLUMN] column_name data_type
ADD [COLUMN] ... ...
...
AND / OR¶
Logical AND/OR operators in a WHERE clause. For more information, see SELECT.
SELECT column_name(s)
FROM stream_name | table_name
WHERE condition
AND|OR condition
AS¶
Alias a column, expression, or type. For more information, see Create a table.
SELECT column_name AS column_alias
FROM stream_name | table_name
ASSERT¶
Assert values, stream, table, or tombstones.
ASSERT NULL VALUES sourceName (columns)? KEY values
ASSERT SCHEMA¶
Assert the existence or non-existence of a schema.
ASSERT (NOT EXISTS)? SCHEMA (SUBJECT subjectName)? (ID id)? (TIMEOUT timeout)?;
ASSERT TOPIC¶
Assert the existence or non-existence of a topic.
ASSERT (NOT EXISTS)? TOPIC topicName (WITH properties)? (TIMEOUT timeout)?;
BETWEEN¶
Constrain a value to a specified range in a WHERE clause.
WHERE expression [NOT] BETWEEN start_expression AND end_expression;
The BETWEEN operator is used to indicate that a certain value must be within a specified range, including boundaries. ksqlDB supports any expression that resolves to a numeric or string value for comparison.
The following push query uses the BETWEEN clause to select only records
that have an event_id
between 10 and 20.
SELECT event
FROM events
WHERE event_id BETWEEN 10 AND 20
EMIT CHANGES;
CASE¶
Select a condition from one or more expressions.
SELECT
CASE
WHEN condition THEN result
[ WHEN … THEN … ]
…
[ WHEN … THEN … ]
[ ELSE result ]
END
FROM stream_name | table_name;
ksqlDB supports a searched
form of CASE expression. In this form,
CASE evaluates each boolean condition
in WHEN clauses, from left to
right. If a condition is true, CASE returns the corresponding result. If
none of the conditions is true, CASE returns the result from the ELSE
clause. If none of the conditions is true and there is no ELSE clause,
CASE returns null.
The schema for all results must be the same, otherwise ksqlDB rejects the statement.
The following example push query uses a CASE expression.
SELECT
CASE
WHEN orderunits < 2.0 THEN 'small'
WHEN orderunits < 4.0 THEN 'medium'
ELSE 'large'
END AS case_result
FROM orders
EMIT CHANGES;
CAST¶
Change the type of an expression to a different type.
CAST (expression AS data_type);
You can cast an expression’s type to a new type by using CAST.
The following example query converts a numerical count, which is a
BIGINT, into a suffixed string, which is a VARCHAR. For example, the
integer 5
becomes 5_HELLO
.
SELECT page_id, CONCAT(CAST(COUNT(*) AS VARCHAR), '_HELLO')
FROM pageviews_enriched
WINDOW TUMBLING (SIZE 20 SECONDS)
GROUP BY page_id;
CREATE CONNECTOR¶
Create a new connector in the Kafka Connect cluster. For more information, see CREATE CONNECTOR.
CREATE SOURCE | SINK CONNECTOR connector_name
WITH( property_name = expression [, ...]);
CREATE STREAM¶
Register a stream on a Kafka topic. For more information, see CREATE STREAM.
CREATE STREAM stream_name ( { column_name data_type [KEY] } [, ...]
WITH ( property_name = expression [, ...] );
CREATE STREAM AS SELECT¶
Create a new materialized stream and corresponding Kafka topic, and stream the result of the query into the topic. For more information, see CREATE STREAM AS SELECT.
CREATE STREAM stream_name
[WITH ( property_name = expression [, ...] )]
AS SELECT select_expr [, ...]
FROM from_stream
[[ LEFT | FULL | INNER ]
JOIN [join_table | join_stream]
[WITHIN [<size> <timeunit> | (<before_size> <timeunit>, <after_size> <timeunit>)] [GRACE PERIOD <grace_size> <timeunit>]]
ON join_criteria]*
[ WHERE condition ]
[PARTITION BY new_key_expr [, ...]]
EMIT CHANGES;
CREATE TABLE¶
Register a stream on a Kafka topic. For more information, see CREATE TABLE.
CREATE TABLE table_name ( { column_name data_type (PRIMARY KEY) } [, ...] )
WITH ( property_name = expression [, ...] );
CREATE TABLE AS SELECT¶
Create a new materialized table and corresponding Kafka topic, and stream the result of the query as a changelog into the topic. For more information, see CREATE TABLE AS SELECT.
CREATE TABLE table_name
[WITH ( property_name = expression [, ...] )]
AS SELECT select_expr [, ...]
FROM from_stream | from_table
[[ LEFT | FULL | INNER ] JOIN [join_table | join_stream] ON join_criteria]*
[ WINDOW window_expression ]
[ WHERE condition ]
[ GROUP BY grouping_expression ]
[ HAVING having_expression ]
EMIT CHANGES;
CREATE TYPE¶
Alias a complex type declaration. For more information, see CREATE TYPE.
CREATE TYPE <type_name> AS <type>;
DESCRIBE¶
List columns in a stream or table along with their data types and other attributes. For more information, see DESCRIBE.
DESCRIBE [EXTENDED] (stream_name | table_name);
DESCRIBE CONNECTOR¶
List details about a connector. For more information, see DESCRIBE CONNECTOR.
DESCRIBE CONNECTOR connector_name;
DESCRIBE FUNCTION¶
List details about a function, including input parameters and return type. For more information, see DESCRIBE FUNCTION.
DESCRIBE FUNCTION function_name;
DROP CONNECTOR¶
Delete a connector from the Connect cluster. For more information, see DROP CONNECTOR.
DROP CONNECTOR connector_name;
DROP STREAM¶
Drop an existing stream and optionally mark the stream’s source topic for deletion. For more information, see DROP STREAM.
DROP STREAM [IF EXISTS] stream_name [DELETE TOPIC];
DROP TABLE¶
Drop an existing table and optionally mark the table’s source topic for deletion. For more information, see DROP TABLE.
DROP TABLE [IF EXISTS] table_name [DELETE TOPIC];
DROP TYPE¶
Remove a type alias from ksqlDB. For more information, see DROP TYPE.
DROP TYPE [IF EXISTS] <type_name> AS <type>;
EMIT CHANGES¶
Specify a push query with a continuous output refinement in a SELECT statement. For more information, see Push Queries.
CREATE STREAM stream_name
AS SELECT select_expr [, ...]
FROM from_stream
EMIT CHANGES;
EMIT FINAL¶
Specify a push query with a suppressed output refinement in a SELECT statement on a windowed aggregation. For more information, see Push Queries.
CREATE TABLE table_name
AS SELECT select_expr_with_aggregation [, ...]
FROM from_stream
[ WINDOW window_expression ]
[ GROUP BY grouping_expression ]
EMIT FINAL;
EXPLAIN¶
Show the execution plan for a SQL expression or running query. For more information, see EXPLAIN.
EXPLAIN (sql_expression | query_id);
FULL JOIN¶
Select all records when there is a match in the left stream/table or the right stream/table records. Equivalent to FULL OUTER JOIN. For more information, see Join streams and tables.
SELECT column_name(s)
FROM stream_name1 | table_name1
FULL JOIN stream_name2 | table_name2
ON <stream_name1|table_name1>.column_name=<stream_name2|table_name2>.column_name
GRACE PERIOD¶
Allow events to be accepted for a time period after a window ends. For more information, see Out-of-order events
SELECT orderzip_code, TOPK(order_total, 5) FROM orders
WINDOW TUMBLING (SIZE 1 HOUR, GRACE PERIOD 2 HOURS)
GROUP BY order_zipcode
EMIT CHANGES;
GROUP BY¶
Group records in a window. Required by the WINDOW clause. Windowing queries must group by the keys that are selected in the query. For more information, see Time and Windows in ksqlDB.
SELECT column_name, aggregate_function(column_name)
FROM table_name
WHERE column_name operator value
GROUP BY column_name
HAVING¶
Extract records from an aggregation that fulfill a specified condition.
SELECT column_name, aggregate_function(column_name)
FROM table_name
WHERE column_name operator value
GROUP BY column_name
HAVING aggregate_function(column_name) operator value
HEADER¶
Populate a column with the Kafka record’s last header that matches the key.
CREATE STREAM S (column_name BYTES HEADER('key'))
WITH (kafka_topic='s', format='json');
HEADERS¶
Populate a column with the full list of the Kafka record’s headers.
CREATE STREAM S (column_name ARRAY<STRUCT<key STRING, value BYTES>> HEADERS)
WITH (kafka_topic='s', format='json');
HOPPING¶
Group input records into fixed-sized, possibly overlapping windows, based on the timestamps of the records. For more information, see HOPPING.
SELECT WINDOWSTART, WINDOWEND, aggregate_function
FROM from_stream
WINDOW HOPPING window_expression
EMIT CHANGES;
IF EXISTS¶
Test whether a stream or table is present in ksqlDB.
DROP STREAM [IF EXISTS] stream_name [DELETE TOPIC];
DROP TABLE [IF EXISTS] table_name [DELETE TOPIC];
IN¶
Specifies multiple OR
conditions.
SELECT select_expr [, ...]
FROM from_stream | from_table
WHERE exp IN (exp0, exp1, exp2);
The above is equivalent to:
SELECT select_expr [, ...]
FROM from_stream | from_table
WHERE exp = exp0 OR exp = exp1 OR exp = exp2;
INNER JOIN¶
Select records in a stream or table that have matching values in another stream or table. For more information, see Join streams and tables.
SELECT column_name(s)
FROM stream_name1 | table_name1
INNER JOIN stream_name2 | table_name2
ON <stream_name1|table_name1>.column_name=<stream_name2|table_name2>.column_name
Tip
See INNER_JOIN in action:
INSERT INTO¶
Stream the result of a SELECT query into an existing stream and its underlying Kafka topic. For more information, see INSERT INTO.
INSERT INTO stream_name
SELECT select_expr [, ...]
FROM from_stream
[ LEFT | FULL | INNER ]
JOIN [join_table | join_stream]
[WITHIN [<size> <timeunit> | (<before_size> <timeunit>, <after_size> <timeunit>)] [GRACE PERIOD <grace_size> <timeunit>]]
ON join_criteria
[ WHERE condition ]
[ PARTITION BY new_key_expr [, ...] ]
EMIT CHANGES;
INSERT VALUES¶
Produce a row into an existing stream or table and its underlying Kafka topic based on explicitly specified values. For more information, see INSERT VALUES.
INSERT INTO stream_name|table_name [(column_name [, ...]])]
VALUES (value [,...]);
LEFT JOIN¶
Select all records from the left stream/table and the matched records from the right stream/table. For more information, see Join streams and tables.
SELECT column_name(s)
FROM stream_name1 | table_name1
LEFT JOIN stream_name2 | table_name2
ON <stream_name1|table_name1>.column_name=<stream_name2|table_name2>.column_name
LIKE¶
Match a string with the specified pattern.
SELECT select_expr [, ...]
FROM from_stream | from_table
WHERE exp LIKE pattern_string;
The LIKE operator is used for prefix or suffix matching. ksqlDB supports
the %
wildcard, which represents zero or more characters.
The following push query uses the %
wildcard to match any
user_id
that starts with “santa”.
SELECT user_id
FROM users
WHERE user_id LIKE 'santa%'
EMIT CHANGES;
NULLIF¶
Returns NULL if two expressions are equal, otherwise it returns the first expression.
SELECT NULLIF(col1, col2)
FROM from_stream | from_table
EMIT CHANGES;
PARTITION BY¶
Repartition a stream. For more information, see Partition Data to Enable Joins.
CREATE STREAM stream_nam
WITH ([...,]
PARTITIONS=number_of_partitions)
AS SELECT select_expr [, ...]
FROM from_stream
PARTITION BY new_key_expr [, ...]
EMIT CHANGES;
PRINT¶
Print the contents of Kafka topics to the ksqlDB CLI. For more information, see PRINT.
PRINT topicName [FROM BEGINNING] [INTERVAL interval] [LIMIT limit]
RIGHT JOIN¶
Select all records from the right stream/table and the matched records from the left stream/table. For more information, see Join streams and tables.
SELECT column_name(s)
FROM stream_name1 | table_name1
RIGHT JOIN stream_name2 | table_name2
ON <stream_name1|table_name1>.column_name=<stream_name2|table_name2>.column_name
RUN SCRIPT¶
Execute predefined queries and commands from a file. For more information, see RUN SCRIPT.
RUN SCRIPT <path-to-query-file>;
SELECT (Pull Query)¶
Pull the current value from a materialized table and terminate. For more information, see SELECT (Pull Query).
SELECT select_expr [, ...]
FROM from_item
[ WHERE where_condition ]
[ AND window_bounds ]
[ LIMIT count ];
SELECT (Push Query)¶
Push a continuous stream of updates to a stream or table. For more information, see SELECT (Push Query).
SELECT select_expr [, ...]
FROM from_item
[[ LEFT | FULL | INNER ]
JOIN join_item
[WITHIN [<size> <timeunit> | (<before_size> <timeunit>, <after_size> <timeunit>)] [GRACE PERIOD <grace_size> <timeunit>]]
ON join_criteria]*
[ WINDOW window_expression ]
[ WHERE condition ]
[ GROUP BY grouping_expression [, ...] ]
[ HAVING having_expression ]
EMIT CHANGES
[ LIMIT count ];
SESSION¶
Group input records into a session window. For more information, see SELECT (Push Query).
SELECT WINDOWSTART, WINDOWEND, aggregate_function
FROM from_stream
WINDOW SESSION window_expression
EMIT CHANGES;
SHOW CONNECTORS¶
List all connectors in the Connect cluster. For more information, see SHOW CONNECTORS.
SHOW | LIST CONNECTORS;
SHOW FUNCTIONS¶
List available scalar and aggregate functions available. For more information, see SHOW FUNCTIONS.
SHOW | LIST FUNCTIONS;
SHOW PROPERTIES¶
List the configuration settings that are currently in effect. For more information, see SHOW PROPERTIES.
SHOW PROPERTIES;
SHOW QUERIES¶
List queries that are currently running in the cluster. For more information, see SHOW QUERIES.
SHOW | LIST QUERIES [EXTENDED];
SHOW STREAMS¶
List the currently defined streams. For more information, see SHOW STREAMS.
SHOW | LIST STREAMS [EXTENDED];
SHOW TABLES¶
List the currently defined tables. For more information, see SHOW TABLES.
SHOW | LIST TABLES [EXTENDED];
SHOW TOPICS¶
List the available topics in the Kafka cluster that ksqlDB is configured to connect to. For more information, see SHOW TOPICS.
SHOW | LIST [ALL] TOPICS [EXTENDED];
SHOW TYPES¶
List all custom types and their type definitions. For more information, see SHOW TYPES.
SHOW | LIST TYPES;
SIZE¶
Specify the duration of a HOPPING or TUMBLING window. For more information, see Time and Windows in ksqlDB.
SELECT WINDOWSTART, WINDOWEND, aggregate_function
FROM from_stream
WINDOW TUMBLING (SIZE <time_span> <time_units>)
EMIT CHANGES;
SPOOL¶
Store issued commands and their results in a file. For more information, see SPOOL.
SPOOL <file_name|OFF>
TUMBLING¶
Group input records into fixed-sized, non-overlapping windows based on the timestamps of the records. For more information, see TUMBLING.
SELECT WINDOWSTART, WINDOWEND, aggregate_function
FROM from_stream
WINDOW TUMBLING window_expression
EMIT CHANGES;
WHERE¶
Extract records that fulfill a specified condition. For more information, see SELECT.
SELECT column_name(s)
FROM from_stream | from_table
WHERE column_name operator value
WINDOW¶
Group input records that have the same key into a window, for operations like aggregations and joins. For more information, see WINDOW.
SELECT WINDOWSTART, WINDOWEND, aggregate_function
FROM from_stream
WINDOW window_expression
EMIT CHANGES;