Flink SQL Statements in Confluent Cloud for Apache Flink
In Confluent Cloud for Apache Flink®, a statement represents a high-level resource that’s created when you enter a SQL query.
Each statement has a property that holds the SQL query that you entered. Based on the SQL query, the statement can be one of these kinds:
A metadata operation, or DDL statement.
A background statement, which writes data back to a table/topic while running in the background.
A foreground statement, which writes data back to the UI or a client.
In all of these cases, the statement represents any SQL statement for Data Definition Language (DDL), Data Manipulation Language (DML), and Data Query Language (DQL).
When you submit a SQL query, Confluent Cloud creates a statement resource. You can create a statement resource from any Confluent-supported interface, including the SQL shell, Confluent CLI, Cloud Console, the REST API, and Terraform.
The SQL query within a statement is immutable, which means that you can’t make changes to the SQL query after you submit it. If you need to edit a statement, stop the running statement and create a new statement.
You can change the security principal for the statement. If a statement is running under a user account, you can change it to run under a service account by using the Confluent Cloud Console, Confluent CLI, the REST API, or the Terraform provider. Running a statement under a service account provides better security and stability, ensuring that changes in user status or authorization don’t affect your statements.
Also, you can change the compute pool that runs a statement. This can be useful if you’re close to maxing out the resources in one pool.
You must stop the statement before changing the principal or compute pool, then restart the statement after the change.
Confluent Cloud for Apache Flink enforces a 30-day retention for statements in terminal states. For example, after a statement transitions to the STOPPED state, it no longer consumes compute, and Confluent Cloud for Apache Flink deletes it after 30 days.
If there is no consumer for the results of a foreground statement for five minutes or longer, Confluent Cloud moves the statement to the STOPPED state.
Statements and materialized tables
Confluent Cloud for Apache Flink offers two approaches for running continuous queries:
Materialized tables: the preferred approach for long-running streaming queries that act as incremental materialized views or production pipelines. Materialized tables are persistent objects that you can evolve in place by using CREATE OR ALTER MATERIALIZED TABLE. For more information, see Materialized Tables.
Statements: for exploratory queries, snapshot (batch) processing, one-time data analysis, and interactive SQL in the workspace. Statements are the direct SQL execution model described on this page.
If you are building a streaming pipeline that runs continuously and might need updates over time, use a materialized table. If you are running ad hoc queries, exploring data, or performing batch analysis, use statements directly.
Limit on query text size
Confluent Cloud for Apache Flink has a limit of 4 MB on the size of query text. This limit includes string and binary literals that are part of the query.
The maximum length of a statement name is 72 characters.
If you combine multiple SQL statements into a single semicolon-separated string, the length limit applies to the entire string.
If the query size is greater than the 4 MB limit, you receive the following error.
This query is too large to process (exceeds 4194304 bytes).
This can happen due to:
* Complex query structure.
* Too many columns selected or expanded due to * usage.
* Multiple table joins.
* Large number of conditions.
Try simplifying your query or breaking it into smaller parts.
Limits on state size
The amount of state that a statement stores when it performs stateful calculations, such as windowed aggregations, has the potential to grow indefinitely. Confluent Cloud for Apache Flink implements a two-step approach to help you manage Flink applications that have extremely large state:
Soft limit: sets an upper boundary of 500 GB per statement on the amount of state that Confluent Cloud for Apache Flink can support.
Hard limit: sets an absolute upper limit of 1000 GB (1 TB) per statement on the amount of state that a Flink application can accumulate.
Confluent Cloud for Apache Flink warns you proactively when your Flink applications are within 80% of reaching their soft or hard limits. If a Flink application hits the soft limit, Confluent Cloud for Apache Flink stops the statement, and you can decide whether to allow the application to keep running or resubmit the application with State TTL to reduce the amount of state.
These limits are absolute, not dynamic. They are fixed global values for all individual statements. They don’t scale up or down based on the compute pool size (CFUs).
Confluent Cloud for Apache Flink does not stop a statement running in a small pool, for example, 5 CFUs, if it exceeds the “per-CFU” theoretical capacity of 50 GB. Instead, it triggers a POOL_EXHAUSTED status, allowing the pool to autoscale or you to increase CFUs.
The “Statement Stopped” mechanism triggers only when a statement reaches the absolute global limit of 500 GB, regardless of the compute pool configuration.
Lifecycle operations for statements
These are the supported lifecycle operations for a statement.
Statements have a lifecycle that includes the following states:
Pending: The statement has been submitted and Flink is preparing to start running the statement.
Running: Flink is actively running the statement.
Completed: The statement has completed all of its work.
Deleting: The statement is being deleted.
Failed: The statement has encountered an error and is no longer running.
Degraded: The statement appears unhealthy, for example, no transactions have been committed for a long time, or the statement has been restarting frequently.
Stopping: The statement is about to be stopped.
Stopped: The statement has been stopped and is no longer running.
Submit a statement
List running statements
Describe a statement
Delete a statement
List statement exceptions
Stop and resume a statement
Queries in Flink
Flink enables you to issue queries with ANSI-standard SQL on data at rest (batch) and data in motion (streams).
These are the queries that are possible with Flink SQL.
- Metadata queries
CRUD on catalogs, databases, tables, and so on. Because Flink implements ANSI-Standard SQL, Flink uses a database analogy, and similar to a database, it uses the concepts of catalogs, databases, and tables. In Apache Kafka®, these concepts map to environments, Kafka clusters, and topics, respectively.
- Ad hoc / exploratory queries
You can issue queries on a topic and see the results immediately. A query can be a batch query (“show me what happened up to now”), or a transient streaming query (“show me what happened up to now and give me updates for the near future”). In this case, when the query or the session ends, no more compute is needed.
- Streaming queries
These queries run continuously and read data from one or more tables/topics and write results of the queries to one table/topic.
In general, Flink supports both batch and stream processing, but the exact subset of allowed operations differs slightly depending on the type of query. For more information, see Flink SQL Queries.
Flink executes all queries in streaming execution mode, whether the sources are bounded or unbounded.
Data lifecycle
Broadly speaking, the Flink SQL lifecycle is:
Flink reads data into a Flink table from Kafka through the Flink connector for Kafka.
Flink processes data by using SQL statements.
Flink processes data by using Flink task managers (managed by Confluent and not exposed to users), which are part of the Flink runtime. Flink can store some data temporarily as state while it processes the data.
Flink returns data to the user as a result-set.
The result-set can be bounded, in which case the query terminates.
The result-set can be unbounded, in which case the query runs until canceled manually.
OR
Flink writes data back out to one or more tables.
Flink stores data in Kafka topics.
Flink stores the schema for the table in the Flink Metastore and synchronizes it to Schema Registry.
Flink SQL Data Definition Language (DDL) statements
Data Definition Language (DDL) statements are imperative verbs that define metadata in Flink SQL by adding, changing, or deleting tables. Data Definition Language statements modify metadata only and don’t operate on data. Use these statements with declarative Flink SQL Queries to create your Flink SQL applications.
Flink SQL makes it simple to develop streaming applications using standard SQL. It’s easy to learn Flink SQL if you’ve ever worked with a database or SQL-like system that’s ANSI-SQL 2011 compliant.
Available DDL statements
These are the available DDL statements in Confluent Cloud for Flink SQL.
- ALTER
- CREATE
- DESCRIBE
- DROP
- EXPLAIN
- RESET
- SET
- SHOW
- USE