SELECT Statement in Confluent Cloud for Apache Flink¶
Confluent Cloud for Apache Flink®️ enables querying the content of your tables by using familiar SELECT syntax.
Syntax¶
SELECT [DISTINCT] select_list FROM table_expression [ WHERE boolean_expression ] [ LIMIT row_limit ]
Description¶
The SELECT statement in Flink does what the SQL standard says it must do. You needn’t look further than standard SQL itself to understand the behavior.
For example, UNION without ALL means that duplicate rows must be removed.
Flink maintains the relation, called a dynamic table, specified by the SQL query. Its behavior is always the same as if you ran the SQL query again, over the current snapshot of the data, each time a new row arrives for any table in the relation.
This formalism is what enables you to reason about exactly what Flink will do just by understanding what any SQL system, like MySQL, Snowflake, or Oracle, would do.
Another way to understand what Flink SQL does is to consider the following statement:
SELECT * FROM clicks ORDER BY clickTime LIMIT 10;
This statement doesn’t only look at 10 rows, sort them, and terminate. It
maintains this relation, and as new orders arrive, the relation changes, always
showing the top 10 most recent orders. This is exactly as if you re-ran the
query each time a new row was written to the clicks
table. You’ll get the
same result.
Select list¶
The select_list
specification *
means the query resolves all
columns. But in production, using *
is not recommended, because it
makes queries less robust to catalog changes. Instead, use a select_list
to specify a subset of available columns or make calculations using
the columns. For example, if an orders
table has columns named
order_id
, price
, and tax
you could write the following query:
SELECT order_id, price + tax FROM orders
Table expression¶
The table_expression
can be any source of data, including a table, view,
or VALUES
clause, the joined results of multiple existing tables, or a
subquery.
Assuming that an orders
table is available in the catalog, the
following would read all rows from .
SELECT * FROM orders;
VALUES clause¶
Queries can consume from inline data by using the VALUES
clause. Each tuple
corresponds to one row. You can provide an alias to assign a name to each column.
SELECT order_id, price
FROM (VALUES (1, 2.0), (2, 3.1))
AS t (order_id, price);
Your output should resemble:
order_id price
1 2.0
2 3.1
Functions¶
You can invoke built-in scalar functions on the columns of a single row.
SELECT PRETTY_PRINT(order_id) FROM orders;
DISTINCT¶
If SELECT DISTINCT
is specified, all duplicate rows are removed from
the result set, which means that one row is kept from each group of duplicates.
For streaming queries, the required state for computing the query result might grow infinitely. State size depends on the number of distinct rows.
SELECT DISTINCT id FROM orders;
Usage¶
In the Flink SQL shell or in a Cloud Console workspace, run the following commands to see examples of the SELECT statement.
Create a table for web page click events.
-- Create a table for web page click events. CREATE TABLE clicks ( ip_address VARCHAR, url VARCHAR, click_ts_raw BIGINT );
Populate the table with mock clickstream data.
-- Populate the table with mock clickstream data. INSERT INTO clicks VALUES( '10.0.0.1', 'https://acme.com/index.html', 1692812175), ( '10.0.0.12', 'https://apache.org/index.html', 1692826575), ( '10.0.0.13', 'https://confluent.io/index.html', 1692826575), ( '10.0.0.1', 'https://acme.com/index.html', 1692812175), ( '10.0.0.12', 'https://apache.org/index.html', 1692819375), ( '10.0.0.13', 'https://confluent.io/index.html', 1692826575);
Press ENTER to return to the SQL shell. Because INSERT INTO VALUES is a point-in-time statement, it exits after it completes inserting records.
View all rows in the
clicks
table by using a SELECT statement.SELECT * FROM clicks;
Your output should resemble:
ip_address url click_ts_raw 10.0.0.1 https://acme.com/index.html 1692812175 10.0.0.12 https://apache.org/index.html 1692826575 10.0.0.13 https://confluent.io/index.html 1692826575 10.0.0.1 https://acme.com/index.html 1692812175 10.0.0.12 https://apache.org/index.html 1692819375 10.0.0.13 https://confluent.io/index.html 1692826575
View only unique rows in the
clicks
table by using a SELECT DISTINCT statement.SELECT DISTINCT * FROM clicks;
Your output should resemble:
ip_address url click_ts_raw 10.0.0.1 https://acme.com/index.html 1692812175 10.0.0.12 https://apache.org/index.html 1692826575 10.0.0.13 https://confluent.io/index.html 1692826575 10.0.0.12 https://apache.org/index.html 1692819375
View only records that have the ip_address of
10.0.0.1
by using a SELECT WHERE statement.SELECT * FROM clicks WHERE ip_address='10.0.0.1';
Your output should resemble:
ip_address url click_ts_raw 10.0.0.1 https://acme.com/index.html 1692812175 10.0.0.1 https://acme.com/index.html 1692812175
Examples¶
The following examples show frequently encountered scenarios with SELECT.
Most minimal statement¶
- Syntax
SELECT 1;
- Properties
- Statement is bounded
Check local time zone is configured correctly¶
- Syntax
SELECT NOW();
- Properties
- Statement is bounded
- NOW() returns a TIMSTAMP_LTZ(3), so if the client is configured correctly, it should show a timestamp in your local time zone.
Combine multiple tables into one¶
- Syntax
CREATE TABLE t_union_1 (i INT); CREATE TABLE t_union_2 (i INT); TABLE t_union_1 UNION ALL TABLE t_union_2; -- alternate syntax SELECT * FROM t_union_1 UNION ALL SELECT * FROM t_union_2;
Get insights into the current watermark¶
- Syntax
CREATE TABLE t_watermarked_insight (s STRING) DISTRIBUTED INTO 1 BUCKETS; INSERT INTO t_watermarked_insight VALUES ('Bob'), ('Alice'), ('Charly'); SELECT $rowtime, CURRENT_WATERMARK($rowtime) FROM t_watermarked_insight;
The output resembles:
$rowtime EXPR$1 2024-04-29 11:59:01.080 NULL 2024-04-29 11:59:01.093 2024-04-04 15:27:37.433 2024-04-29 11:59:01.094 2024-04-04 15:27:37.433
- Properties
- The CURRENT_WATERMARK function returns the watermark that arrived at the operator evaluating the SELECT statement.
- The returned watermark is the minimum of all inputs, across all tables/topics and their partitions.
- If a common watermark was not received from all inputs, the function returns NULL.
- The CURRENT_WATERMARK function takes a time attribute, which is a column that has WATERMARK FOR defined.
A watermark is always emitted after the row has been processed, so the first row always has a NULL watermark.
Because the default watermark algorithm requires at least 250 records, initially it assumes the maximum lag of 7 days plus a safety margin of 7 days.
The watermark quickly (exponentially) goes down as more data arrives.
Sources emit watermarks every 200 ms, but within the first 200 ms they emit per row for powering examples like this.
Flatten fields into columns¶
- Syntax
CREATE TABLE t_flattening (i INT, r1 ROW<i INT, s STRING>, r2 ROW<other INT>); SELECT r1.*, r2.* FROM t_flattening;
- Properties
- You can apply the
*
operator on nested data, which enables flattening fields into columns of the table.