Determinism in Continuous Queries on Confluent Cloud for Apache Flink

This topic answers the following questions about determinism in Confluent Cloud for Apache Flink®:

  • What is determinism?
  • Is all batch processing deterministic?
    • Two examples of batch queries with non-deterministic results
    • Non-determinism in batch processing
  • Determinism in stream processing
    • Non-determinism in streaming
    • Non-deterministic update in streaming
    • How to eliminate the impact of non-deterministic update in streaming queries

What is determinism?

Paraphrasing the SQL standard’s description of determinism, an operation is deterministic if it reliably computes identical results when repeated with identical input values.

Determinism for regular batch queries

In a classic batch scenario, repeated execution of the same query for a given bounded data set will yield consistent results, which is the most intuitive understanding of determinism.

In practice, however, the same query does not always return consistent results on a batch process either, as shown by the following example queries.

Two examples of batch queries with non-deterministic results

For example, consider a newly created website click log table:

CREATE TABLE clicks (
    uid VARCHAR(128),
    cTime TIMESTAMP(3),
    url VARCHAR(256)
)

Some new records are written to the table:

+------+---------------------+------------+
|  uid |               cTime |        url |
+------+---------------------+------------+
| Mary | 2023-08-22 12:00:01 |      /home |
|  Bob | 2023-08-22 12:00:01 |      /home |
| Mary | 2023-08-22 12:00:05 | /prod?id=1 |
|  Liz | 2023-08-22 12:01:00 |      /home |
| Mary | 2023-08-22 12:01:30 |      /cart |
|  Bob | 2023-08-22 12:01:35 | /prod?id=3 |
+------+---------------------+------------+

The following query applies a time filter to the click log table and wants to return the last two minutes of click records:

SELECT * FROM clicks
WHERE cTime BETWEEN TIMESTAMPADD(MINUTE, -2, CURRENT_TIMESTAMP) AND CURRENT_TIMESTAMP;

When the query was submitted at “2023-08-22 12:02:00”, it returned all 6 rows in the table, and when it was executed again a minute later, at “2023-08-22 12:03:00”, it returned only 3 items:

+------+---------------------+------------+
|  uid |               cTime |        url |
+------+---------------------+------------+
|  Liz | 2023-08-22 12:01:00 |      /home |
| Mary | 2023-08-22 12:01:30 |      /cart |
|  Bob | 2023-08-22 12:01:35 | /prod?id=3 |
+------+---------------------+------------+

Another query wants to add a unique identifier to each returned record, since the clicks table doesn’t have a primary key.

SELECT UUID() AS uuid, * FROM clicks LIMIT 3;

Executing this query twice in a row generates a different uuid identifier for each row:

-- first execution
+--------------------------------+------+---------------------+------------+
|                           uuid |  uid |               cTime |        url |
+--------------------------------+------+---------------------+------------+
| aaaa4894-16d4-44d0-a763-03f... | Mary | 2023-08-22 12:00:01 |      /home |
| ed26fd46-960e-4228-aaf2-0aa... |  Bob | 2023-08-22 12:00:01 |      /home |
| 1886afc7-dfc6-4b20-a881-b0e... | Mary | 2023-08-22 12:00:05 | /prod?id=1 |
+--------------------------------+------+---------------------+------------+

-- second execution
+--------------------------------+------+---------------------+------------+
|                           uuid |  uid |               cTime |        url |
+--------------------------------+------+---------------------+------------+
| 95f7301f-bcf2-4b6f-9cf3-1ea... | Mary | 2023-08-22 12:00:01 |      /home |
| 63301e2d-d180-4089-876f-683... |  Bob | 2023-08-22 12:00:01 |      /home |
| f24456d3-e942-43d1-a00f-fdb... | Mary | 2023-08-22 12:00:05 | /prod?id=1 |
+--------------------------------+------+---------------------+------------+

Non-determinism in batch processing

The non-determinism in batch processing is caused mainly by the non-deterministic functions, as shown in the previous query examples, where the built-in functions CURRENT_TIMESTAMP and UUID() actually behave differently in batch processing. Compare with the following query:

SELECT CURRENT_TIMESTAMP, * FROM clicks;

CURRENT_TIMESTAMP is the same value on all records returned

+-------------------------+------+---------------------+------------+
|       CURRENT_TIMESTAMP |  uid |               cTime |        url |
+-------------------------+------+---------------------+------------+
| 2023-08-23 17:25:46.831 | Mary | 2023-08-22 12:00:01 |      /home |
| 2023-08-23 17:25:46.831 |  Bob | 2023-08-22 12:00:01 |      /home |
| 2023-08-23 17:25:46.831 | Mary | 2023-08-22 12:00:05 | /prod?id=1 |
| 2023-08-23 17:25:46.831 |  Liz | 2023-08-22 12:01:00 |      /home |
| 2023-08-23 17:25:46.831 | Mary | 2023-08-22 12:01:30 |      /cart |
| 2023-08-23 17:25:46.831 |  Bob | 2023-08-22 12:01:35 | /prod?id=3 |
+-------------------------+------+---------------------+------------+

This difference is due to the fact that Flink SQL inherits the definition of functions from Apache Calcite, where there are two types of functions other than deterministic function: non-deterministic functions and dynamic functions.

  • The non-deterministic functions are executed at runtime in clusters and evaluated per record.
  • The dynamic functions determine the corresponding values only when the query plan is generated. They’re not executed at runtime, and different values are obtained at different times, but the same values are obtained during the same execution. Built-in dynamic functions are mainly temporal functions.

Determinism in stream processing

A core difference between streaming and batch is the unboundedness of the data. Flink SQL abstracts streaming processing as the continuous query on dynamic tables. So the dynamic function in the batch query example is equivalent to a non-deterministic function in a streaming processing, where logically every change in the base table triggers the query to be executed.

If the clicks log table in the previous example is from an Apache Kafka® topic that’s continuously written, the same query in stream mode returns a CURRENT_TIMESTAMP that changes over time.

SELECT CURRENT_TIMESTAMP, * FROM clicks;

For example:

+-------------------------+------+---------------------+------------+
|       CURRENT_TIMESTAMP |  uid |               cTime |        url |
+-------------------------+------+---------------------+------------+
| 2023-08-23 17:25:46.831 | Mary | 2023-08-22 12:00:01 |      /home |
| 2023-08-23 17:25:47.001 |  Bob | 2023-08-22 12:00:01 |      /home |
| 2023-08-23 17:25:47.310 | Mary | 2023-08-22 12:00:05 | /prod?id=1 |
+-------------------------+------+---------------------+------------+

Non-determinism in streaming

In addition to the non-deterministic functions, these are other factors that may generate non-determinism:

  1. Non-deterministic back read of a source connector.
  2. Querying based on processing time.
  3. Clearing internal state data based on TTL.

Non-deterministic back read of source connector

For Flink SQL, the determinism provided is limited to the computation only, because it doesn’t store user data itself. Here, it’s necessary to distinguish between the managed internal state in streaming mode and the user data itself. If the source connector’s implementation can’t provide deterministic back read, it brings non-determinism to the input data, which may produce non-deterministic results.

Common examples are inconsistent data for multiple reads from a same offset, or requests for data that no longer exists because of the retention time, for example, when the requested data is beyond the configured TTL of a Kafka topic.

Query based on processing time

Unlike event time, processing time is based on the machine’s local time, and this processing does not provide determinism. Related operations that rely on the time attribute include the following:

Another typical operation is the lookup join, which is semantically similar to a temporal join based on processing time, where non-determinism arises when the accessed external table changes over time.

Clear internal state data based on TTL

Because of the unbounded nature of stream processing, the internal state data maintained by long-running streaming queries in operations like regular join and group aggregation (non-windowed aggregation) may continuously increase. Setting a state TTL to clean up internal state data is often a necessary compromise but may make the computation results non-deterministic.

The impact of the non-determinism on different queries is different. For some queries it only produces non-deterministic results, which means that the query works, but multiple runs fail to produce consistent results. But other queries can produce more serious effects, like incorrect results or runtime errors. The main cause of these failures is “non-deterministic update”.

Non-deterministic update in streaming

Flink implements a complete incremental update mechanism based on the continuous query on dynamic tables abstraction. All operations that need to generate incremental messages maintain complete internal state data, and the operation of the entire query pipeline, including the complete DAG from source to sink operators, relies on the guarantee of correct delivery of update messages between operators, which can be broken by non-determinism, leading to errors.

What is a “non-deterministic Update” (NDU)? Update messages (the changelog) may contain these kinds of message types:

  • Insert (I)
  • Delete (D)
  • Update_Before (UB)
  • Update_After (UA)

In an insert-only changelog pipeline, there’s no NDU problem. When there is an update message containing at least one message D, UB, UA in addition to I, the update key of the message, which can be regarded as the primary key of the changelog, is deduced from the query.

  • When the update key can be deduced, the operators in the pipeline maintain the internal state by the update key.
  • When the update key can’t be deduced, it’s possible that the primary key isn’t defined in the CDC source table or sink table, or some operations can’t be deduced from the semantics of the query.

All operators maintaining internal state can only process update (D/UB/UA) messages through complete rows. Sink nodes work in retract mode when no primary key is defined, and delete operations are performed by complete rows.

This means that in the update-by-row mode, all the update messages received by the operators that need to maintain the state can’t be interfered by the non-deterministic column values, otherwise it will cause NDU problems resulting in computation errors. For a query pipeline with update messages that can’t derive the update key, the following points are the most important sources of NDU problems:

  1. Non-deterministic functions, including scalar, table, aggregate functions, built-in or custom ones
  2. LookupJoin on an evolving source
  3. CDC source carries metadata fields, like system columns, that don’t belong to the row entity itself

Exceptions caused by cleaning internal state data based on TTL are discussed separately as a runtime fault-tolerant handling strategy. For more information, see FLINK-24666.

How to eliminate the impact of non-deterministic update in streaming

The NDU problem in streaming queries is usually not intuitive, and the risk of the problem may arise from a small change in a complex query.

Starting with version 1.16, Flink SQL (FLINK-27849) introduces an experimental NDU handling mechanism, table.optimizer.non-deterministic-update.strategy.

When TRY_RESOLVE mode is enabled, it checks whether there is NDU problem in the streaming query and tries to eliminate the NDU problem generated by Lookup Join (internal materialization is added), if there are still factors in point 1 or 3 above that can’t be eliminated automatically, Flink SQL gives a detailed error messages to prompt you to adjust the SQL to avoid introducing non-determinism. Considering the high cost and complexity of operators brought by materialization, there is no corresponding automatic resolution mechanism supported yet.

The following sections describe best practices for reducing the impact of non-deterministic updates.

Enable TRY_RESOLVE mode

Enable TRY_RESOLVE mode before running the streaming query, when you check that there is an unsolvable NDU problem in the query, try to modify the SQL according to the error prompt to avoid the problem proactively.

The following code shows a real case from FLINK-27639.

INSERT INTO t_join_sink
SELECT o.order_id, o.order_name, l.logistics_id, l.logistics_target, l.logistics_source, now()
FROM t_order AS o
LEFT JOIN t_logistics AS l ON ord.order_id=logistics.order_id

The execution plan generated by default produces a runtime error when TRY_RESOLVE mode is enabled:

org.apache.flink.table.api.TableException: The column(s): logistics_time(generated by non-deterministic function: NOW ) can not satisfy the determinism requirement for correctly processing update message(changelogMode contains not only insert ‘I’).... Please consider removing these non-deterministic columns or making them deterministic.

related rel plan:
Calc(select=[CAST(order_id AS INTEGER) AS order_id, order_name, logistics_id, logistics_target, logistics_source, CAST(NOW() AS TIMESTAMP(6)) AS logistics_time], changelogMode=[I,UB,UA,D], upsertKeys=[])
+- Join(joinType=[LeftOuterJoin], where=[=(order_id, order_id0)], select=[order_id, order_name, logistics_id, logistics_target, logistics_source, order_id0], leftInputSpec=[JoinKeyContainsUniqueKey], rightInputSpec=[HasUniqueKey], changelogMode=[I,UB,UA,D], upsertKeys=[])
   :- Exchange(distribution=[hash[order_id]], changelogMode=[I,UB,UA,D], upsertKeys=[{0}])
   :  +- TableSourceScan(table=[[default_catalog, default_database, t_order, project=[order_id, order_name], metadata=[]]], fields=[order_id, order_name], changelogMode=[I,UB,UA,D], upsertKeys=[{0}])
   +- Exchange(distribution=[hash[order_id]], changelogMode=[I,UB,UA,D], upsertKeys=[])
      +- TableSourceScan(table=[[default_catalog, default_database, t_logistics, project=[logistics_id, logistics_target, logistics_source, order_id], metadata=[]]], fields=[logistics_id, logistics_target, logistics_source, order_id], changelogMode=[I,UB,UA,D], upsertKeys=[{0}])

Follow the error prompt, remove the NOW function, use other deterministic functions instead, or use the time field in the order table to eliminate this NDU problem and avoid runtime errors.

Declare a primary key

When using Lookup Join, try to declare the primary key, if one exists. In many cases, a lookup source table with primary key definitions can prevent Flink SQL from deriving update keys, which prevents the high materialization cost.

The following two examples shows why you’re encouraged to declare primary keys for lookup source tables.

INSERT INTO sink_with_pk
SELECT t1.a, t1.b, t2.c
FROM (
  SELECT *, proctime() proctime FROM cdc
) t1
JOIN dim_with_pk FOR system_time AS OF t1.proctime AS t2
   ON t1.a = t2.a

-- plan: the upsertKey of the left stream is reserved when lookup table with a primary key definition and use it as lookup key, so that the high cost materialization can be omitted.
Sink(table=[default_catalog.default_database.sink_with_pk], fields=[a, b, c])
+- Calc(select=[a, b, c])
   +- LookupJoin(table=[default_catalog.default_database.dim_with_pk], joinType=[InnerJoin], lookup=[a=a], select=[a, b, a, c])
      +- DropUpdateBefore
         +- TableSourceScan(table=[[default_catalog, default_database, cdc, project=[a, b], metadata=[]]], fields=[a, b])
INSERT INTO sink_with_pk
SELECT t1.a, t1.b, t2.c
FROM (
  SELECT *, proctime() proctime FROM cdc
) t1
JOIN dim_without_pk FOR system_time AS OF t1.proctime AS t2
   ON t1.a = t2.a

-- Execution plan when `TRY_RESOLVE` is enabled.
-- This may encounter errors at runtime when TRY_RESOLVE mode isn't enabled.
Sink(table=[default_catalog.default_database.sink_with_pk], fields=[a, b, c], upsertMaterialize=[true])
+- Calc(select=[a, b, c])
   +- LookupJoin(table=[default_catalog.default_database.dim_without_pk], joinType=[InnerJoin], lookup=[a=a], select=[a, b, a, c], upsertMaterialize=[true])
      +- TableSourceScan(table=[[default_catalog, default_database, cdc, project=[a, b], metadata=[]]], fields=[a, b])

The second case can be solved by adding materialization if TRY_RESOLVE is enabled, but the cost is very high: there are two more costly materializations compared with the one that has a primary key.

IGNORE mode

When the lookup source table accessed by Lookup Join is static, the TRY_RESOLVE mode may not be enabled. When Lookup Join accesses a static lookup source table, you can first turn on TRY_RESOLVE mode to check that there are no other NDU problems, and then restore IGNORE mode to avoid unnecessary materialization overhead.

Note

You must ensure that the lookup source table is purely static and not updated, otherwise IGNORE mode is not safe.