SELECT (Pull Query) statement in ksqlDB for Confluent Platform¶
Synopsis¶
SELECT select_expr [, ...]
FROM from_item
[ WHERE where_condition ]
[ AND window_bounds ]
[ LIMIT count ];
Description¶
Pulls the current value from the from_item
and terminates. The
from_item
can be a materialized view, a table, or a stream. The
result of this statement is not persisted in a Kafka topic and is
printed out only in the console. Pull queries run with predictably low
latency because materialized views are incrementally updated as new
events arrive. They are a great match for request/response flows. For
asynchronous application flows, see
Push Queries.
You can execute a pull query by sending an HTTP request to the ksqlDB REST API, and the API responds with a single response.
- Pull queries are expressed using a strict subset of ANSI SQL.
- You can issue a pull query against any table that was created by a CREATE TABLE AS SELECT statement.
- You can issue a pull query against any stream.
- Currently, we do not support pull queries against tables created by using a CREATE TABLE statement.
- Pull queries do not support
JOIN
,PARTITION BY
,GROUP BY
andWINDOW
clauses (but can query materialized tables that contain those clauses). LIMIT
clause supports non-negative integers.
Important
ksqlDB may experience a deadlock if you run multiple pull queries concurrently.
If you experience hanging pull queries, the following mitigations may help.
- Rewrite your persistent queries or add a new persistent query to enable efficient pull queries, ideally key lookups, but at least short-range scans, or decrease state store size.
- Rewrite your table scans as individual key lookups.
- Continue issuing table scans, but issue fewer of them at once.
- Adjust your pull query load to avoid sharp spikes in the pull query request rate all at once.
- Implement timeouts from your client so that if pull queries take too long to run, your client terminates the connection to free up resources and unblock other queries.
WHERE
Clause Guidelines¶
By default, only key lookups are enabled. They have the following requirements:
- Key column(s) must use an equality comparison to a literal
(e.g.
KEY = 'abc'
). - On windowed tables,
WINDOWSTART
andWINDOWEND
can be optionally compared to literals. For more information on windowed tables, see Time and Windows in ksqlDB.
You can loosen the restrictions on the WHERE
clause, or eliminate
the WHERE
clause altogether, by enabling table scans in your current
CLI session with the command
SET 'ksql.query.pull.table.scan.enabled'='true';
. Table scans can
also be enabled by default by setting a server configuration property
with ksql.query.pull.table.scan.enabled=true
. Once table scans are
enabled, the following additional expressions are allowed:
- Key column(s) using range comparisons to literals.
- Non-key columns to be used alone, without key references.
- Columns to be compared to other columns.
- References to subsets of columns from a multi-column key.
- Complex expressions without direct column references using UDFs and
function calls (e.g.
instr(NAME_COL, 'hello') > 0
).
Examples¶
Pull queries¶
The following examples show pull queries against a table named
TOP_TEN_RANKS
created by using a
CREATE TABLE AS SELECT
statement.
First, create a table named GRADES
by using a
CREATE TABLE statement:
CREATE TABLE GRADES (ID INT PRIMARY KEY, GRADE STRING, RANK INT)
WITH (kafka_topic = 'test_topic', value_format = 'JSON', partitions = 4);
Then, create a derived table named TOP_TEN_RANKS
by using a
CREATE TABLE AS SELECT
statement:
CREATE TABLE TOP_TEN_RANKS
AS SELECT ID, RANK
FROM GRADES
WHERE RANK <= 10;
If you want to look up only the student with ID = 5
in the
TOP_TEN_RANKS
table using a pull query:
SELECT * FROM TOP_TEN_RANKS
WHERE ID = 5;
After enabling table scans, you can fetch the current state of your
TOP_TEN_RANKS
table using a pull query:
SELECT * FROM TOP_TEN_RANKS;
If you want to look up the students whose ranks lie in the range
(4, 8)
:
SELECT * FROM TOP_TEN_RANKS
WHERE RANK > 4 AND RANK < 8;
STREAM¶
Pull queries against a stream.
First, create a stream named STUDENTS
by using a
CREATE STREAM statement:
CREATE STREAM STUDENTS (ID STRING KEY, SCORE INT)
WITH (kafka_topic='students_topic', value_format='JSON', partitions=4);
If you want to look up students with ranks greater than 5
you can
issue the query:
SELECT * FROM STUDENTS
WHERE rank > 5;
INNER JOIN¶
Pull queries against a table INNER_JOIN
that is created by joining
multiple tables:
CREATE TABLE LEFT_TABLE (ID BIGINT PRIMARY KEY, NAME varchar, VALUE bigint)
WITH (kafka_topic='left_topic', value_format='JSON', partitions=4);
CREATE TABLE RIGHT_TABLE (ID BIGINT PRIMARY KEY, F1 varchar, F2 bigint)
WITH (kafka_topic='right_topic', value_format='JSON', partitions=4);
CREATE TABLE INNER_JOIN AS SELECT L.ID, NAME, VALUE, F1, F2 FROM LEFT_TABLE L JOIN RIGHT_TABLE R ON L.ID = R.ID;
You can fetch the current state of your table INNER_JOIN
by using a
pull query:
SELECT * FROM INNER_JOIN [ WHERE where_condition ];
Tip
See INNER_JOIN in action:
WINDOW¶
Pull queries against a windowed table NUMBER_OF_TESTS
created by
aggregating the stream STUDENTS
created above:
CREATE TABLE NUMBER_OF_TESTS AS
SELECT ID, COUNT(1) AS COUNT
FROM STUDENTS
WINDOW TUMBLING(SIZE 1 SECOND)
GROUP BY ID;
Look up the number of tests taken by a student with ID='10'
:
SELECT *
FROM NUMBER_OF_TESTS
WHERE ID='10';
Look up the number of tests taken by a student with ID='10'
in the
window range 100 <= WindowStart AND WindowEnd <= 16000
:
SELECT *
FROM NUMBER_OF_TESTS
WHERE ID='10' AND 100 <= WindowStart AND WindowEnd <= 16000;
STRUCT output¶
You can output a struct from a
query by using a SELECT statement. The following example creates a
struct from a stream named s1
.
SELECT STRUCT(f1 := v1, f2 := v2) FROM s1;
Pull queries with pseudo columns¶
You can use the ROWTIME
pseudo column within pull queries. Below is
an example of issuing a pull query with ROWTIME
in both the
SELECT
and WHERE
clauses.
SELECT name, ROWTIME FROM users WHERE ROWTIME > 50000;
However, this is disallowed for ROWPARTITION
and ROWOFFSET
.