Tables and Topics in Flink

Apache Flink® and the Table API use the concept of dynamic tables to facilitate the manipulation and processing of streaming data. Dynamic tables represent an abstraction for working with both batch and streaming data in a unified manner, offering a flexible and expressive way to define, modify, and query structured data. In contrast to the static tables that represent batch data, dynamic tables change over time. But like static batch tables, systems can execute queries over dynamic tables.

Confluent Cloud for Apache Flink®️ implements ANSI-Standard SQL and has the familiar concepts of catalogs, databases, and tables. Confluent Cloud maps a Flink catalog to an environment and vice-versa. Similarly, Flink databases and tables are mapped to Apache Kafka® clusters and topics. For more information, see Metadata mapping between Kafka cluster, topics, schemas, and Flink.

Dynamic tables and continuous queries

Every table in Flink is equivalent to a stream of events describing the changes that are being made to that table. A stream of changes like this a changelog stream. Essentially, a stream is the changelog of a table, and every table is backed by a stream. This is also the case for regular database tables.

Querying a dynamic table yields a continuous query. A continuous query never terminates and produces dynamic results - another dynamic table. The query continuously updates its dynamic result table to reflect changes on its dynamic input tables. Essentially, a continuous query on a dynamic table is similar to a query that defines a materialized view.

The output of a continuous query is always equivalent to the result of the same query executed in batch mode on a snapshot of the input tables.

Append-only table

Stream-table table duality for an append-only table

Stream-table table duality for an append-only table

In this animation, the only changes happening to the Orders table are the new orders being appended to the end of the table. The corresponding changelog stream is just a stream of INSERT events. Adding another order to the table is the same as adding another INSERT statement to the stream, as shown below the table. This is an example of an append-only or insert-only table.

Updating table

Not all tables are append-only tables. Tables can also contain events that modify or delete existing rows. The changelog stream used by Flink SQL contains three additional event types to accomodate different ways that tables can be updated. Besides the regular Insertion event, Update Before and Update After are a pair of events that work together to update an earlier result. The Delete event has the effect you would expect, removing a record from the table.

Stream-table table duality for an updating table

Stream-table table duality for an updating table

This animation has the same starting point as the previous example that showed the append-only table. But this time, an order has been cancelled, and the item in that order hasn’t been sold. The result of this event is that the Bestsellers table is updated, rather then doing another insert. The update starts with appending another order to the append-only/insert-only Orders table, which is registered as an INSERT event in the changelog stream.

Because the SQL statement is doing grouping, the result is an updating table instead of an append-only/insert-only table. In this example, an order for 15 hats is cancelled. To process the event with the 15-hat order cancellation, the query produces two update events:

  • The first is an UPDATE_BEFORE event that retracts the current result that showed 50 hats as the bestselling item.
  • The second is an UPDATE_AFTER event that replaces the old entry with a new one that shows 35 hats.

Conceptually, the UPDATE_BEFORE event is processed first, which removes the old entry from the Bestsellers table. Then, the sync processes the UPDATE_AFTER event, which inserts the updated results.

The following figure visualizes the relationship of streams, dynamic tables, and continuous queries:

Relationship between streams, dynamic tables, and continuous queries
  1. A stream is converted into a dynamic table.
  2. A continuous query is evaluated on the dynamic table yielding a new dynamic table.
  3. The resulting dynamic table is converted back into a stream.

Dynamic tables are a logical concept. The only state that is actually materialized by the Flink SQL runtime is whatever is strictly necessary to produce correct results for the specific query being executed. For example, the figure above shows a query executing a simple filter. This requires no state, so nothing is materialized.

Changelog entries

Flink provides four different types of changelog entries:

Short name Long name Semantics
+I Insertion This records only the insertions that occur.
-U Update Before This retracts a previously emitted result.
+U Update After This updates a previously emitted result. This requires a primary key if -U is omitted for idempotent updates.
-D Delete This deletes the last result.

If the downstream system supports upserting, you should use a primary key in Confluent Cloud for Apache Flink to avoid the need to use Update Before.

Depending on the combination of source, sink, and business logic applied, you can end up with the following types of changelog streams:

Changelog stream types Stream category Changelog entry types
Appending stream Append stream Contains only +I
Retracting stream Update stream Can contain -U and/or -D
Upserting streams Update stream Never contains -U but can contain +U and/or -D