Determinism in Continuous Queries on Confluent Cloud for Apache Flink

In Flink SQL, an operation is deterministic if it reliably computes identical results when repeated with identical input values. Batch queries are generally deterministic, but streaming queries introduce additional sources of non-determinism — including non-deterministic functions, source connector behavior, and state time to live (TTL) — that can cause incorrect results or runtime errors.

What is determinism?

An operation is deterministic if it reliably computes identical results when repeated with identical input values. This definition is based on the SQL standard’s description of determinism.

Determinism: batch versus streaming

Factor

Batch

Streaming

Data boundedness

Bounded (fixed dataset)

Unbounded (continuous)

Dynamic functions

Single value per execution

Evaluated per record

Source connector reads

Consistent (snapshot)

Can vary across reads

State management

No internal state

State TTL can cause non-determinism

Update messages

Not applicable

NDU can cause errors

Are batch queries deterministic?

Batch queries are generally deterministic: repeated execution of the same query on a given bounded data set yields consistent results. However, non-deterministic functions such as CURRENT_TIMESTAMP and UUID() can produce different results across executions, as shown by the following example queries.

Two examples of batch queries with non-deterministic results

For example, consider a newly created website click log table:

CREATE TABLE clicks (
    uid VARCHAR(128),
    cTime TIMESTAMP(3),
    url VARCHAR(256)
)

Some new records are written to the table:

+------+---------------------+------------+
|  uid |               cTime |        url |
+------+---------------------+------------+
| Mary | 2023-08-22 12:00:01 |      /home |
|  Bob | 2023-08-22 12:00:01 |      /home |
| Mary | 2023-08-22 12:00:05 | /prod?id=1 |
|  Liz | 2023-08-22 12:01:00 |      /home |
| Mary | 2023-08-22 12:01:30 |      /cart |
|  Bob | 2023-08-22 12:01:35 | /prod?id=3 |
+------+---------------------+------------+

The following query applies a time filter to the click log table and wants to return the last two minutes of click records:

SELECT * FROM clicks
WHERE cTime BETWEEN TIMESTAMPADD(MINUTE, -2, CURRENT_TIMESTAMP) AND CURRENT_TIMESTAMP;

When the query was submitted at “2023-08-22 12:02:00”, it returned all 6 rows in the table, and when it was executed again a minute later, at “2023-08-22 12:03:00”, it returned only 3 items:

+------+---------------------+------------+
|  uid |               cTime |        url |
+------+---------------------+------------+
|  Liz | 2023-08-22 12:01:00 |      /home |
| Mary | 2023-08-22 12:01:30 |      /cart |
|  Bob | 2023-08-22 12:01:35 | /prod?id=3 |
+------+---------------------+------------+

Another query wants to add a unique identifier to each returned record, because the clicks table doesn’t have a primary key.

SELECT UUID() AS uuid, * FROM clicks LIMIT 3;

Executing this query twice in a row generates a different uuid identifier for each row:

-- first execution
+--------------------------------+------+---------------------+------------+
|                           uuid |  uid |               cTime |        url |
+--------------------------------+------+---------------------+------------+
| aaaa4894-16d4-44d0-a763-03f... | Mary | 2023-08-22 12:00:01 |      /home |
| ed26fd46-960e-4228-aaf2-0aa... |  Bob | 2023-08-22 12:00:01 |      /home |
| 1886afc7-dfc6-4b20-a881-b0e... | Mary | 2023-08-22 12:00:05 | /prod?id=1 |
+--------------------------------+------+---------------------+------------+

-- second execution
+--------------------------------+------+---------------------+------------+
|                           uuid |  uid |               cTime |        url |
+--------------------------------+------+---------------------+------------+
| 95f7301f-bcf2-4b6f-9cf3-1ea... | Mary | 2023-08-22 12:00:01 |      /home |
| 63301e2d-d180-4089-876f-683... |  Bob | 2023-08-22 12:00:01 |      /home |
| f24456d3-e942-43d1-a00f-fdb... | Mary | 2023-08-22 12:00:05 | /prod?id=1 |
+--------------------------------+------+---------------------+------------+

Non-determinism in batch processing

Non-deterministic functions cause most of the non-determinism in batch processing, as shown in the previous query examples, where the built-in functions CURRENT_TIMESTAMP and UUID() actually behave differently in batch processing. Compare with the following query:

SELECT CURRENT_TIMESTAMP, * FROM clicks;

CURRENT_TIMESTAMP is the same value on all records returned.

+-------------------------+------+---------------------+------------+
|       CURRENT_TIMESTAMP |  uid |               cTime |        url |
+-------------------------+------+---------------------+------------+
| 2023-08-23 17:25:46.831 | Mary | 2023-08-22 12:00:01 |      /home |
| 2023-08-23 17:25:46.831 |  Bob | 2023-08-22 12:00:01 |      /home |
| 2023-08-23 17:25:46.831 | Mary | 2023-08-22 12:00:05 | /prod?id=1 |
| 2023-08-23 17:25:46.831 |  Liz | 2023-08-22 12:01:00 |      /home |
| 2023-08-23 17:25:46.831 | Mary | 2023-08-22 12:01:30 |      /cart |
| 2023-08-23 17:25:46.831 |  Bob | 2023-08-22 12:01:35 | /prod?id=3 |
+-------------------------+------+---------------------+------------+

This difference exists because Flink SQL inherits the definition of functions from Apache Calcite, where there are two types of functions other than deterministic functions: non-deterministic functions and dynamic functions.

  • The non-deterministic functions execute at runtime in clusters and return a value per record.

  • The dynamic functions determine the corresponding values only when Flink generates the query plan. They don’t execute at runtime, and different invocations return different values at different times, but return the same values during the same execution. Built-in dynamic functions are mainly temporal functions.

How does determinism differ in stream processing?

Streaming queries are harder to make deterministic than batch queries because the input data is unbounded. Flink SQL abstracts streaming processing as the continuous query on dynamic tables, where logically every change in the base table triggers the query to run. As a result, dynamic functions that return a single value per batch execution become non-deterministic in streaming, because Flink evaluates them continuously over time.

If the clicks log table in the previous example comes from an Apache Kafka® topic that receives continuous writes, the same query in stream mode returns a CURRENT_TIMESTAMP that changes over time.

SELECT CURRENT_TIMESTAMP, * FROM clicks;

For example:

+-------------------------+------+---------------------+------------+
|       CURRENT_TIMESTAMP |  uid |               cTime |        url |
+-------------------------+------+---------------------+------------+
| 2023-08-23 17:25:46.831 | Mary | 2023-08-22 12:00:01 |      /home |
| 2023-08-23 17:25:47.001 |  Bob | 2023-08-22 12:00:01 |      /home |
| 2023-08-23 17:25:47.310 | Mary | 2023-08-22 12:00:05 | /prod?id=1 |
+-------------------------+------+---------------------+------------+

What causes non-determinism in streaming?

In addition to the non-deterministic functions, these are other factors that can generate non-determinism:

  1. Non-deterministic back read of a source connector.

  2. Querying based on processing time. Processing time is not supported in Confluent Cloud for Apache Flink.

  3. Clearing internal state data based on time to live (TTL). For more information, see Implement state time-to-live (TTL).

Non-deterministic back read of source connector

For Flink SQL, determinism applies only to the computation, because Flink SQL doesn’t store user data itself. Here, it’s necessary to distinguish between the managed internal state in streaming mode and the user data itself. If the source connector’s implementation can’t provide deterministic back read, it brings non-determinism to the input data, which can produce non-deterministic results.

Common examples are inconsistent data for multiple reads from the same offset, or requests for data that no longer exists because of the retention time, for example, when the requested data is beyond the configured TTL of a Kafka topic.

Clear internal state data based on TTL

Because of the unbounded nature of stream processing, the internal state data maintained by long-running streaming queries in operations like regular join and group aggregation (non-windowed aggregation) can continuously increase. Setting a state TTL to clean up internal state data is often a necessary compromise but can make the computation results non-deterministic.

What is a non-deterministic update (NDU)?

A non-deterministic update (NDU) occurs when non-deterministic column values interfere with the update messages that operators use to maintain internal state. Unlike simple non-deterministic results — where the query works but produces inconsistent output across runs — NDU can cause incorrect results or runtime errors.

Flink implements a complete incremental update mechanism based on the continuous query on dynamic tables abstraction. All operations that need to generate incremental messages maintain complete internal state data, and the operation of the entire query pipeline, including the complete directed acyclic graph (DAG) from source to sink operators, relies on the guarantee of correct delivery of update messages between operators. Non-determinism can break this guarantee, leading to errors.

Update messages (the changelog) can contain these message types:

  • Insert (I)

  • Delete (D)

  • Update_Before (UB)

  • Update_After (UA)

In an insert-only changelog pipeline, there’s no NDU problem. When an update message contains at least one message D, UB, UA in addition to I, Flink deduces the update key of the message, which serves as the primary key of the changelog, from the query.

  • When Flink can deduce the update key, the operators in the pipeline maintain the internal state by the update key.

  • When Flink can’t deduce the update key, the change data capture (CDC) source table or sink table might not define a primary key, or Flink might not be able to derive some operations from the semantics of the query.

All operators maintaining internal state can only process update (D/UB/UA) messages through complete rows. Sink nodes work in retract mode when you don’t define a primary key, and delete operations apply to complete rows.

This means that in the update-by-row mode, the non-deterministic column values can’t affect the update messages that the operators that need to maintain state receive, or this causes NDU problems and computation errors. For a query pipeline with update messages that can’t derive the update key, the following points are the most important sources of NDU problems:

  1. Non-deterministic functions, including scalar, table, and aggregate functions, both built-in and custom.

  2. Lookup join on an evolving source.

  3. CDC sources that carry metadata fields, such as system columns, that don’t belong to the row entity itself.

This documentation discusses exceptions caused by cleaning internal state data based on TTL separately as a runtime fault-tolerant handling strategy. For more information, see FLINK-24666.