Flink SQL Queries in Confluent Cloud¶
In Confluent Cloud for Apache Flink®️, Data Manipulation Language (DML) statements, also known as queries, are declarative verbs that read and modify data in Flink tables.
Unlike Data Definition Language (DDL) statements, DML statements modify only data and don’t change metadata. When you want to change metadata, use DDL statements.
Important
Confluent Cloud for Apache Flink®️ is currently available for Preview. A Preview feature is a Confluent Cloud component that is being introduced to gain early feedback from developers. Preview features can be used for evaluation and non-production testing purposes or to provide feedback to Confluent. The warranty, SLA, and Support Services provisions of your agreement with Confluent do not apply to Preview features. Confluent may discontinue providing Preview releases of the Preview features at any time in Confluent’s sole discretion. Check out Getting Help for questions, feedback and requests.
For Flink SQL features and limitations in the preview program, see Notable Limitations in Public Preview.
These are the available DML statements in Confluent Cloud for Flink SQL.
Prerequisites¶
You only need to have basic knowledge of SQL to follow along. No other programming experience is assumed.
Running the HELP
command lists the full set of supported SQL
statements.
Run the following commands to set session properties.
SET 'sql-client.execution.result-mode' = 'tableau';
Your output should resemble:
[INFO] Session property has been set.
In this example, the result-mode
is set to tableau
, which causes query
results to display in a simple static table. Setting the value to table
causes query results to display in a dynamicaly updating table.
You’re ready to run you first Flink SQL query.
Hello Flink SQL¶
Run the following simple query to print “Hello Flink SQL”.
SELECT 'Hello Flink SQL';
Your output should resemble:
+----+--------------------------------+
| op | EXPR$0 |
+----+--------------------------------+
| +I | Hello Flink SQL |
+----+--------------------------------+
Received a total of 1 row
Run the following query to aggregate values in a table.
SELECT Name, COUNT(*) AS Num
FROM
(VALUES ('Neo'), ('Trinity'), ('Morpheus'), ('Trinity')) AS NameTable(Name)
GROUP BY Name;
Your output should resemble:
+----------+-----+
| Name | Num |
+----------+-----+
| Morpheus | 1 |
| Neo | 1 |
| Trinity | 2 |
+----------+-----+
3 rows in set
Functions¶
Flink SQL supports many built-in functions that help you build sophisticated SQL queries.
Run the SHOW FUNCTIONS
command to see the full list of built-in functions.
SHOW FUNCTIONS;
Your output should resemble:
+-------------------------------+
| function name |
+-------------------------------+
| AGG_DECIMAL_MINUS |
| AGG_DECIMAL_PLUS |
| COALESCE |
| CURRENT_WATERMARK |
| GREATEST |
| IFNULL |
| IS_JSON |
| JSON_ARRAY |
| ...
Run the following command to execute the built-in CURRENT_TIMESTAMP
function, which returns the local machine’s current system time.
SELECT CURRENT_TIMESTAMP;
Your output should resemble:
+-------------------------+
| CURRENT_TIMESTAMP |
+-------------------------+
| 2023-02-28 23:34:26.106 |
+-------------------------+
1 row in set
SELECT COS(0) AS cosine;
Your output should resemble:
+--------+
| cosine |
+--------+
| 1.0 |
+--------+
1 row in set
Source Tables¶
As with all SQL engines, Flink SQL queries operate on rows in tables. But unlike traditional databases, Flink SQL doesn’t manage data-at-rest in a local store. Instead, Flink SQL queries operate continuously over external tables.
Flink data processing pipelines begin with source tables. Source tables
produce rows operated over during the query’s execution; they are the
tables referenced in the FROM
clause of a query.
Tables are created automatically in Confluent Cloud from all the Kafka topics. Also, you can create tables by using the SQL client.
The SQL client supports SQL DDL commands similar to traditional SQL. Standard SQL DDL is used to create and alter tables.
Continuous Queries¶
While not designed initially with streaming semantics in mind, SQL is a powerful tool for building continuous data pipelines. Where Flink SQL differs from traditional database queries is that is continuously consumes rows as they arrive and produces updates to its results.
A continuous query never terminates and produces a dynamic table as a result. dynamic tables are the core concept of Flink’s SQL support for streaming data.
Aggregations on continuous streams must store aggregated results continuously during the execution of the query. For example, suppose you need to count the number of employees for each department from an incoming data stream. To output timely results as new rows are processed, the query must maintain the most up-to-date count for each department.
SELECT
dept_id,
COUNT(*) as emp_count
FROM employee_information
GROUP BY dept_id;
Such queries are considered stateful. Flink’s advanced fault-tolerance mechanism maintains internal state and consistency, so queries always return the correct result, even in the face of hardware failure.
Sink Tables¶
When running the previous query, the SQL client provides output in real-time
but in a read-only fashion. Storing results - to power a report or dashboard
- requires writing out to another table. You can achieve this by using an
INSERT INTO
statement. The table referenced in this clause is known
as a sink table. An INSERT INTO
statement is submitted as a detached
query to the Flink cluster.
INSERT INTO department_counts
SELECT
dept_id,
COUNT(*) as emp_count
FROM employee_information;
Once submitted, this query runs and stores the results into the sink table directly, instead of loading the results into the system memory.
Syntax¶
Flink parses SQL using Apache Calcite, which supports standard ANSI SQL.
The following BNF-grammar describes the superset of supported SQL features in batch and streaming queries. The Operations section shows examples for the supported features and indicates which features are only supported for batch or streaming queries.
query:
values
| WITH withItem [ , withItem ]* query
| {
select
| selectWithoutFrom
| query UNION [ ALL ] query
| query EXCEPT query
| query INTERSECT query
}
[ ORDER BY orderItem [, orderItem ]* ]
[ LIMIT { count | ALL } ]
[ OFFSET start { ROW | ROWS } ]
[ FETCH { FIRST | NEXT } [ count ] { ROW | ROWS } ONLY]
withItem:
name
[ '(' column [, column ]* ')' ]
AS '(' query ')'
orderItem:
expression [ ASC | DESC ]
select:
SELECT [ ALL | DISTINCT ]
{ * | projectItem [, projectItem ]* }
FROM tableExpression
[ WHERE booleanExpression ]
[ GROUP BY { groupItem [, groupItem ]* } ]
[ HAVING booleanExpression ]
[ WINDOW windowName AS windowSpec [, windowName AS windowSpec ]* ]
selectWithoutFrom:
SELECT [ ALL | DISTINCT ]
{ * | projectItem [, projectItem ]* }
projectItem:
expression [ [ AS ] columnAlias ]
| tableAlias . *
tableExpression:
tableReference [, tableReference ]*
| tableExpression [ NATURAL ] [ LEFT | RIGHT | FULL ] JOIN tableExpression [ joinCondition ]
joinCondition:
ON booleanExpression
| USING '(' column [, column ]* ')'
tableReference:
tablePrimary
[ matchRecognize ]
[ [ AS ] alias [ '(' columnAlias [, columnAlias ]* ')' ] ]
tablePrimary:
[ TABLE ] tablePath [ dynamicTableOptions ] [systemTimePeriod] [[AS] correlationName]
| LATERAL TABLE '(' functionName '(' expression [, expression ]* ')' ')'
| [ LATERAL ] '(' query ')'
| UNNEST '(' expression ')'
tablePath:
[ [ catalogName . ] databaseName . ] tableName
systemTimePeriod:
FOR SYSTEM_TIME AS OF dateTimeExpression
dynamicTableOptions:
/*+ OPTIONS(key=val [, key=val]*) */
key:
stringLiteral
val:
stringLiteral
values:
VALUES expression [, expression ]*
groupItem:
expression
| '(' ')'
| '(' expression [, expression ]* ')'
| CUBE '(' expression [, expression ]* ')'
| ROLLUP '(' expression [, expression ]* ')'
| GROUPING SETS '(' groupItem [, groupItem ]* ')'
windowRef:
windowName
| windowSpec
windowSpec:
[ windowName ]
'('
[ ORDER BY orderItem [, orderItem ]* ]
[ PARTITION BY expression [, expression ]* ]
[
RANGE numericOrIntervalExpression {PRECEDING}
| ROWS numericExpression {PRECEDING}
]
')'
matchRecognize:
MATCH_RECOGNIZE '('
[ PARTITION BY expression [, expression ]* ]
[ ORDER BY orderItem [, orderItem ]* ]
[ MEASURES measureColumn [, measureColumn ]* ]
[ ONE ROW PER MATCH ]
[ AFTER MATCH
( SKIP TO NEXT ROW
| SKIP PAST LAST ROW
| SKIP TO FIRST variable
| SKIP TO LAST variable
| SKIP TO variable )
]
PATTERN '(' pattern ')'
[ WITHIN intervalLiteral ]
DEFINE variable AS condition [, variable AS condition ]*
')'
measureColumn:
expression AS alias
pattern:
patternTerm [ '|' patternTerm ]*
patternTerm:
patternFactor [ patternFactor ]*
patternFactor:
variable [ patternQuantifier ]
patternQuantifier:
'*'
| '*?'
| '+'
| '+?'
| '?'
| '??'
| '{' { [ minRepeat ], [ maxRepeat ] } '}' ['?']
| '{' repeat '}'
Flink SQL uses a lexical policy for identifier (table, attribute, function names) that’s similar to Java.
The case of identifiers is preserved whether or not they are quoted.
After which, identifiers are matched case-sensitively.
Unlike Java, back-ticks enable identifiers to contain non-alphanumeric characters, for example:
SELECT a AS `my field` FROM t;
String literals must be enclosed in single quotes, for example,
SELECT 'Hello World'
. Duplicate a single quote for escaping, for example,
SELECT 'It''s me'
.
SELECT 'Hello World', 'It''s me';
Your output should resemble:
+-------------+---------+
| EXPR$0 | EXPR$1 |
+-------------+---------+
| Hello World | It's me |
+-------------+---------+
1 row in set
Unicode characters are supported in string literals. If explicit unicode code points are required, use the following syntax:
- Use the backslash (
\
) as the escaping character (default), for example,SELECT U&'\263A'
. - Use a custom escaping character, for example,
SELECT U&'#263A' UESCAPE '#'
.