Flink SQL Queries in Confluent Cloud for Apache Flink
In Confluent Cloud for Apache Flink®, Data Manipulation Language (DML) statements, also known as queries, are declarative verbs that read and modify data in Apache Flink® tables.
Unlike Data Definition Language (DDL) statements, DML statements modify only data and don’t change metadata. When you want to change metadata, use DDL statements.
These are the available DML statements in Confluent Cloud for Flink SQL.
Prerequisites
You need the following prerequisites to use Confluent Cloud for Apache Flink.
Access to Confluent Cloud.
The organization ID, environment ID, and compute pool ID for your organization.
The OrganizationAdmin, EnvironmentAdmin, or FlinkAdmin role for creating compute pools, or the FlinkDeveloper role if you already have a compute pool. If you don’t have the appropriate role, reach out to your OrganizationAdmin or EnvironmentAdmin.
The Confluent CLI. To use the Flink SQL shell, update to the latest version of the Confluent CLI by running the following command:
confluent update --yes
If you used homebrew to install the Confluent CLI, update the CLI by using the
brew upgradecommand, instead ofconfluent update.For more information, see Confluent CLI.
Use a workspace or the Flink SQL shell
You can run queries and statements either in a Confluent Cloud Console workspace or in the Flink SQL shell.
To run queries in the Confluent Cloud Console, follow these steps.
Log in to the Confluent Cloud Console.
Navigate to the Environments page.
Click the tile that has the environment where your Flink compute pools are provisioned.
Click Flink. The Compute Pools list opens.
In the compute pool where you want to run statements, click Open SQL workspace.
The workspace opens with a cell for editing SQL statements.
To run queries in the Flink SQL shell, run the following command:
confluent flink shell --compute-pool <compute-pool-id> --environment <env-id>
You’re ready to run your first Flink SQL query.
Hello SQL
Run the following simple query to print “Hello SQL”.
SELECT 'Hello SQL';
Your output should resemble:
EXPR$0
Hello SQL
Run the following query to aggregate values in a table.
SELECT Name, COUNT(*) AS Num
FROM
(VALUES ('Neo'), ('Trinity'), ('Morpheus'), ('Trinity')) AS NameTable(Name)
GROUP BY Name;
Your output should resemble:
Name Num
Neo 1
Morpheus 1
Trinity 2
Functions
Flink supports many built-in functions that help you build sophisticated SQL queries.
Run the SHOW FUNCTIONS statement to see the full list of built-in functions.
SHOW FUNCTIONS;
Your output should resemble:
+------------------------+
| function name |
+------------------------+
| % |
| * |
| + |
| - |
| / |
| < |
| <= |
| <> |
| = |
| > |
| >= |
| ABS |
| ACOS |
| AND |
| ARRAY |
| ARRAY_CONTAINS |
| ...
Run the following statement to execute the built-in CURRENT_TIMESTAMP function, which returns the local machine’s current system time.
SELECT CURRENT_TIMESTAMP;
Your output should resemble:
CURRENT_TIMESTAMP
2024-01-17 13:07:43.537
Run the following statement to compute the cosine of 0.
SELECT COS(0) AS cosine;
Your output should resemble:
cosine
1.0
Escape characters
The following table shows the C-style escape sequences available in Flink SQL.
Backslash Escape Sequence | Interpretation |
|---|---|
| backspace |
| form feed |
| newline |
| carriage return |
| tab |
| octal byte value |
| hexadecimal byte value |
| 16 or 32-bit hexadecimal Unicode character value |
- Example
-- returns 'aaa' SELECT e'\u0061\x61\141' AS c; SELECT E'\u0061\x61\141' AS c;
Source Tables
As with all SQL engines, Flink SQL queries operate on rows in tables. But unlike traditional databases, Flink doesn’t manage data-at-rest in a local store. Instead, Flink SQL queries operate continuously over external tables.
Flink data processing pipelines begin with source tables. Source tables produce rows operated over during the query’s execution; they are the tables referenced in the FROM clause of a query.
Tables are created automatically in Confluent Cloud from all the Apache Kafka® topics. Also, you can create tables by using the SQL shell.
The Flink SQL shell supports SQL DDL commands similar to traditional SQL. Standard SQL DDL is used to create and alter tables.
The following statement creates an employee_information table.
CREATE TABLE employee_information(
emp_id INT,
name VARCHAR,
dept_id INT);
Confluent Cloud creates the corresponding employee_information topic automatically.
Continuous Queries
You can define a continuous foreground query from the employee_information table that reads new rows as they are made available and immediately outputs their results. For example, you can filter for the employees who work in department 1.
SELECT * from employee_information WHERE dept_id = 1;
Although SQL wasn’t designed initially with streaming semantics in mind, it’s a powerful tool for building continuous data pipelines. A Flink query differs from a traditional database query by consuming rows continuously as they arrive and producing updates to the query results.
A continuous query never terminates and produces a dynamic table as a result. Dynamic tables are the core concept of Flink’s SQL support for streaming data.
Aggregations on continuous streams must store aggregated results continuously during the execution of the query. For example, suppose you need to count the number of employees for each department from an incoming data stream. To output timely results as new rows are processed, the query must maintain the most up-to-date count for each department.
SELECT
dept_id,
COUNT(*) as emp_count
FROM employee_information
GROUP BY dept_id;
Such queries are considered stateful. Flink’s advanced fault-tolerance mechanism maintains internal state and consistency, so queries always return the correct result, even in the face of hardware failure.
Foreground and background queries
The core difference between foreground and background queries is how the query runs and where its results go:
Foreground queries are interactive and short-lived, with results buffered and pulled by the client.
Background queries are long-running infrastructure queries, with results written to Kafka topics.
Foreground queries
A foreground query is a SQL statement without an INSERT INTO clause. Results are not written to a Kafka topic but are buffered in Flink until the client fetches them.
Foreground queries are designed for interactive and exploratory use cases. Use foreground queries to quickly iterate on SQL in the Confluent Cloud Console or tools, similar to running SELECT in a transactional database.
Execution and lifecycle
Foreground queries have these characteristics:
Tied to user sessions: When the session or token expires, the foreground query stops.
Session-scoped and interactive: Foreground queries are typically run temporarily during an interactive session and are not intended to be persistent infrastructure.
Limited recovery: Foreground queries have no retry policy. If errors occur, the statement fails. This is true even for transient errors, like when Kubernetes kills a pod because a node is under pressure. Foreground queries typically run with parallelism set to 1, have minimal recovery semantics, and are not designed to survive failures or restarts.
No autoscaling: Autopilot doesn’t scale or restart foreground queries, because scaling risks duplicate or incorrect results.
Identity and authentication
Foreground queries often run with the human user’s identity, so the user experience is frictionless without requiring service account setup. Audit logs show the user action. However, foreground queries can run only while the user’s session is valid.
Use cases
Use foreground queries for these scenarios:
Ad hoc
SELECTqueries on topics or tables for inspectionUser-defined function (UDF) or system tests
Quick experiments where correctness matters more than autoscaling and you’re okay with no recovery
Background queries
A background query contains an INSERT INTO clause and writes results into a Kafka topic. Results are not buffered for direct retrieval in the Confluent Cloud Console.
Background queries are designed as long-running streaming jobs (pipelines) that operate continuously, independent of any user’s session.
Execution and lifecycle
Background queries have these characteristics:
Run indefinitely: Background queries process data from one topic and place results into another. They behave like deployed services rather than interactive queries.
Support recovery and retries: Background queries are configured to restart on failures and survive upgrades. They can restart and keep running across system failures.
Autoscaling allowed: Autopilot can scale background jobs vertically or horizontally. Correctness is handled by normal streaming semantics, not by “no restart” guarantees.
Identity and authentication
Background queries are intended to run with a service account, because they’re long-lived infrastructure and must keep working even when the original user logs out or leaves the company.
Use cases
Use background queries for these scenarios:
Continuous pipelines that enrich, aggregate, or transform streams from input topics to output topics
Production workloads that must be durable, scalable, and recoverable
Sink Tables
When running the previous query, the Flink SQL provides output in real-time but in a read-only fashion. Storing results - to power a report or dashboard - requires writing out to another table. You can achieve this by using an INSERT INTO statement. The table referenced in this clause is known as a sink table. An INSERT INTO statement is submitted as a detached query to Flink.
INSERT INTO department_counts
SELECT
dept_id,
COUNT(*) as emp_count
FROM employee_information;
Once submitted, this query runs and stores the results into the sink table directly, instead of loading the results into the system memory.
Syntax
Flink parses SQL using Apache Calcite, which supports standard ANSI SQL.
The following BNF-grammar describes the superset of supported SQL features.
query:
values
| WITH withItem [ , withItem ]* query
| {
select
| selectWithoutFrom
| query UNION [ ALL ] query
| query EXCEPT query
| query INTERSECT query
}
[ ORDER BY orderItem [, orderItem ]* ]
[ LIMIT { count | ALL } ]
[ OFFSET start { ROW | ROWS } ]
[ FETCH { FIRST | NEXT } [ count ] { ROW | ROWS } ONLY]
withItem:
name
[ '(' column [, column ]* ')' ]
AS '(' query ')'
orderItem:
expression [ ASC | DESC ]
select:
SELECT [ ALL | DISTINCT ]
{ * | projectItem [, projectItem ]* }
FROM tableExpression
[ WHERE booleanExpression ]
[ GROUP BY { groupItem [, groupItem ]* } ]
[ HAVING booleanExpression ]
[ WINDOW windowName AS windowSpec [, windowName AS windowSpec ]* ]
selectWithoutFrom:
SELECT [ ALL | DISTINCT ]
{ * | projectItem [, projectItem ]* }
projectItem:
expression [ [ AS ] columnAlias ]
| tableAlias . *
tableExpression:
tableReference [, tableReference ]*
| tableExpression [ NATURAL ] [ LEFT | RIGHT | FULL ] JOIN tableExpression [ joinCondition ]
joinCondition:
ON booleanExpression
| USING '(' column [, column ]* ')'
tableReference:
tablePrimary
[ matchRecognize ]
[ [ AS ] alias [ '(' columnAlias [, columnAlias ]* ')' ] ]
tablePrimary:
[ TABLE ] tablePath [ dynamicTableOptions ] [systemTimePeriod] [[AS] correlationName]
| LATERAL TABLE '(' functionName '(' expression [, expression ]* ')' ')'
| [ LATERAL ] '(' query ')'
| UNNEST '(' expression ')'
tablePath:
[ [ catalogName . ] databaseName . ] tableName
systemTimePeriod:
FOR SYSTEM_TIME AS OF dateTimeExpression
dynamicTableOptions:
/*+ OPTIONS(key=val [, key=val]*) */
key:
stringLiteral
val:
stringLiteral
values:
VALUES expression [, expression ]*
groupItem:
expression
| '(' ')'
| '(' expression [, expression ]* ')'
| CUBE '(' expression [, expression ]* ')'
| ROLLUP '(' expression [, expression ]* ')'
| GROUPING SETS '(' groupItem [, groupItem ]* ')'
windowRef:
windowName
| windowSpec
windowSpec:
[ windowName ]
'('
[ ORDER BY orderItem [, orderItem ]* ]
[ PARTITION BY expression [, expression ]* ]
[
RANGE numericOrIntervalExpression {PRECEDING}
| ROWS numericExpression {PRECEDING}
]
')'
matchRecognize:
MATCH_RECOGNIZE '('
[ PARTITION BY expression [, expression ]* ]
[ ORDER BY orderItem [, orderItem ]* ]
[ MEASURES measureColumn [, measureColumn ]* ]
[ ONE ROW PER MATCH ]
[ AFTER MATCH
( SKIP TO NEXT ROW
| SKIP PAST LAST ROW
| SKIP TO FIRST variable
| SKIP TO LAST variable
| SKIP TO variable )
]
PATTERN '(' pattern ')'
[ WITHIN intervalLiteral ]
DEFINE variable AS condition [, variable AS condition ]*
')'
measureColumn:
expression AS alias
pattern:
patternTerm [ '|' patternTerm ]*
patternTerm:
patternFactor [ patternFactor ]*
patternFactor:
variable [ patternQuantifier ]
patternQuantifier:
'*'
| '*?'
| '+'
| '+?'
| '?'
| '??'
| '{' { [ minRepeat ], [ maxRepeat ] } '}' ['?']
| '{' repeat '}'
statementSet:
EXECUTE STATEMENT SET
BEGIN
{ insertStatement ';' }+
END ';'
Flink uses a lexical policy for identifier (table, attribute, function names) that’s similar to Java.
The case of identifiers is preserved whether or not they are quoted.
After which, identifiers are matched case-sensitively.
Unlike Java, back-ticks enable identifiers to contain non-alphanumeric characters, for example:
SELECT a AS `my field` FROM t;
String literals must be enclosed in single quotes, for example, SELECT 'Hello World'. Duplicate a single quote for escaping, for example, SELECT 'It''s me'.
SELECT 'Hello World', 'It''s me';
Your output should resemble:
EXPR$0 EXPR$1
Hello World It's me
Unicode characters are supported in string literals. If explicit unicode code points are required, use the following syntax.
Use the backslash (\) as the escaping character (default), for example, SELECT U&'\263A':
SELECT U&'\263A';
Your output should resemble:
EXPR$0
☺
Also, you can use a custom escaping character with UESCAPE, for example, SELECT U&'#2713' UESCAPE '#':
SELECT U&'#2713' UESCAPE '#';
Your output should resemble:
EXPR$0
✓