Built-in Process Table Functions
A process table function (PTF) takes a table as input and returns a table as output, which lets you express logic that plain SQL can’t. For an overview of how PTFs work and how to write your own, see Process Table Functions.
Confluent Cloud for Apache Flink® also provides built-in PTFs for common stream processing patterns. These are ready to use directly in Flink SQL, with no custom code to write or deploy. The following built-in PTFs are available.
Changelog functions
The changelog PTFs give you explicit control over how Flink SQL interprets and produces changelog streams.
FROM_CHANGELOG: reads an append-only stream that carries an operation code and turns it into an updating table.
TO_CHANGELOG: turns an updating table into an append-only stream of change events.
For step-by-step examples with sample input and output, see Read and write custom changelog formats.
Note
In these function names, changelog refers to the external append-only stream that encodes each change with an explicit operation code. FROM_CHANGELOG reads that stream into a Flink table. TO_CHANGELOG writes a Flink table back out as that stream.
FROM_CHANGELOG
Reads an append-only stream that carries an operation code and turns it into an updating table that you can query, join, and aggregate over. Use it to interpret custom change data capture (CDC) or changelog formats that Flink doesn’t recognize natively.
Syntax
SELECT * FROM FROM_CHANGELOG(
input => TABLE input_table [ PARTITION BY key ],
op => DESCRIPTOR(op_column),
op_mapping => MAP[ 'input_code', 'ROW_KIND', ... ],
error_handling => 'FAIL' | 'SKIP' -- optional, defaults to FAIL
)
Parameters
inputThe append-only input table. To Flink, every record is an insert. Use
PARTITION BYto route all events for a key to the same task.opNames the field that holds the operation code. The column must be of type
STRING. Defaults toop.op_mappingMaps each operation code in your data to a Flink row kind:
INSERT,UPDATE_BEFORE,UPDATE_AFTER, orDELETE. If you omit it, the operation codes must already match the row-kind names.error_handlingOptional. Controls what happens when an input row’s operation code is
NULLor isn’t present inop_mapping.FAIL, the default, throws a runtime exception.SKIPsilently drops the row and continues.
Returns
An updating table whose row kinds come from your op_mapping. If the mapping emits both UPDATE_BEFORE and UPDATE_AFTER, the result is a retract stream. If it emits UPDATE_AFTER without UPDATE_BEFORE, the result is an upsert stream keyed on the PARTITION BY columns, which must match the unique key of your data.
Default op_mapping
If you omit op_mapping, the operation codes in your data must already be the Flink row-kind names, so a stream produced by TO_CHANGELOG reads back without extra configuration.
Input code | Row kind |
|---|---|
| INSERT |
| UPDATE_BEFORE |
| UPDATE_AFTER |
| DELETE |
You can map several input codes to one row kind by listing them in a single key, separated by commas, for example 'c, r' for INSERT. Each row kind can appear at most once across the mapping.
Output schema
The output has every input column except the operation-code column, which Flink interprets and removes. With PARTITION BY, the partition-key columns move to the front of the output.
Examples
Basic usage
Read a stream that already uses the Flink row-kind names, with the default mapping:
-- Input (append-only):
-- +I[id:1, op:'INSERT', name:'Alice']
-- +I[id:2, op:'INSERT', name:'Bob']
-- +I[id:1, op:'UPDATE_BEFORE', name:'Alice']
-- +I[id:1, op:'UPDATE_AFTER', name:'Alice2']
-- +I[id:2, op:'DELETE', name:'Bob']
SELECT * FROM FROM_CHANGELOG(input => TABLE cdc_stream);
-- Output (updating table):
-- +I[id:1, name:'Alice']
-- +I[id:2, name:'Bob']
-- -U[id:1, name:'Alice']
-- +U[id:1, name:'Alice2']
-- -D[id:2, name:'Bob']
After all events, the table holds one row: id 1 with name 'Alice2'.
Custom operation codes
Map your own codes to row kinds. Here c is a create, ub and ua are the before and after of an update, d is a delete, and the codes are read from a column named operation:
SELECT * FROM FROM_CHANGELOG(
input => TABLE cdc_stream,
op => DESCRIPTOR(operation),
op_mapping => MAP[
'c', 'INSERT',
'ub', 'UPDATE_BEFORE',
'ua', 'UPDATE_AFTER',
'd', 'DELETE'
]
);
Partition by a key
Add PARTITION BY to route all events for a key to the same task. FROM_CHANGELOG assumes events for the same key arrive in order. PARTITION BY also moves the key columns to the front of the output, so prefer leaving it off unless a downstream operator is keyed on that column:
-- Input columns (name, id, op, doc) become output columns (id, name, doc)
SELECT * FROM FROM_CHANGELOG(input => TABLE cdc_stream PARTITION BY id);
Produce an upsert table
To produce an upsert table, partition by the unique key and use an op_mapping that emits UPDATE_AFTER but not UPDATE_BEFORE. Flink then treats the partition key as the upsert key:
-- Upsert input: INSERT / UPDATE_AFTER / DELETE only
SELECT * FROM FROM_CHANGELOG(
input => TABLE cdc_stream PARTITION BY id,
op_mapping => MAP[
'INSERT', 'INSERT',
'UPDATE_AFTER', 'UPDATE_AFTER',
'DELETE', 'DELETE'
]
);
-- Output (upsert changelog, upsert key = id):
-- +I[id:1, name:'Alice']
-- +I[id:2, name:'Bob']
-- +U[id:1, name:'Alice2']
-- -D[id:2, name:'Bob']
Handle invalid operation codes
By default, a row whose operation code is NULL or not in op_mapping fails the statement. Set error_handling to SKIP to drop those rows and continue instead:
SELECT * FROM FROM_CHANGELOG(
input => TABLE cdc_stream,
error_handling => 'SKIP'
);
TO_CHANGELOG
Turns an updating table into an append-only stream of change events, each carrying an explicit operation code. This is the first Flink SQL operator that can turn an upsert or retract stream back into an append stream. Use it to feed a downstream consumer that doesn’t understand the Flink internal changelog (and would otherwise read the -U and +U records as duplicates) or a compacted topic that expects CDC.
Syntax
SELECT * FROM TO_CHANGELOG(
input => TABLE input_table [ PARTITION BY key ],
op => DESCRIPTOR(op_column),
op_mapping => MAP[ 'ROW_KIND', 'output_code', ... ],
produces_full_deletes => TRUE -- optional, defaults to TRUE
)
Parameters
inputThe updating input table to convert. Accepts insert-only, retract, and upsert tables. For an upsert input, the
PARTITION BYkey should match or be a subset of the table’s upsert key.opNames the operation-code column in the output. Defaults to
op.op_mappingMaps each Flink row kind to the operation code you want in the output.
produces_full_deletesOptional
BOOLEAN. WhenTRUE, the default, each delete carries all columns, the full row image. WhenFALSE, only the key columns are kept and the rest are set tonull.
Returns
An append-only table with the operation-code column added. Every output row is an INSERT, whatever the input row’s original change operation was.
Default op_mapping
If you omit op_mapping, each row kind maps to its standard name (INSERT, UPDATE_BEFORE, UPDATE_AFTER, DELETE), so FROM_CHANGELOG can read the result back without extra configuration. Provide an op_mapping to use your own codes. When you do, only the row kinds you map are forwarded and the rest are dropped, which is a simple way to filter. List several row kinds in one key, separated by commas, to map them to the same code.
Output schema
The output is the operation-code column followed by every input column. With PARTITION BY, the partition-key columns move to the front.
Examples
Basic usage
Convert the upsert result of an aggregation into an append stream that labels each change:
-- Input (upsert table from an aggregation):
-- +I[name:'Alice', cnt:1]
-- +U[name:'Alice', cnt:2]
-- -D[name:'Bob', cnt:1]
SELECT * FROM TO_CHANGELOG(input => TABLE my_aggregation);
-- Output (append-only):
-- +I[op:'INSERT', name:'Alice', cnt:1]
-- +I[op:'UPDATE_AFTER', name:'Alice', cnt:2]
-- +I[op:'DELETE', name:'Bob', cnt:1]
Custom operation codes and filtering
Name the output column and map row kinds to your own codes. Any row kind you leave out of the mapping is dropped, so this also filters, for example to forward only inserts and updates:
SELECT * FROM TO_CHANGELOG(
input => TABLE my_aggregation,
op => DESCRIPTOR(op_code),
op_mapping => MAP[ 'INSERT', 'I', 'UPDATE_AFTER', 'U' ]
);
-- Only INSERT and UPDATE_AFTER are forwarded; DELETE is dropped.
Upsert stream
Map both INSERT and UPDATE_AFTER to one upsert code and DELETE to another, dropping UPDATE_BEFORE:
SELECT * FROM TO_CHANGELOG(
input => TABLE upsert_source PARTITION BY id,
op_mapping => MAP[ 'INSERT, UPDATE_AFTER', 'u', 'DELETE', 'd' ]
);
Deletion-flag pattern
Some consumers expect a boolean delete flag instead of an operation code. Map the row kinds to true and false in a column you name deleted:
SELECT * FROM TO_CHANGELOG(
input => TABLE my_aggregation,
op => DESCRIPTOR(deleted),
op_mapping => MAP[ 'INSERT, UPDATE_AFTER', 'false', 'DELETE', 'true' ]
);
Full versus partial deletes
produces_full_deletes controls what a delete row carries. The upsert key is the column or columns that uniquely identify a row, taken from a declared primary key, a GROUP BY key, or the PARTITION BY key.
With produces_full_deletes => TRUE (the default), each delete carries the full row image. For an upsert source that emits key-only deletes, Flink inserts a ChangelogNormalize step to rebuild the full image from state:
-- Upsert source delete: -D[id:5, name:null] (key only)
-- Output: +I[op:'DELETE', id:5, name:'Alice']
SELECT * FROM TO_CHANGELOG(input => TABLE upsert_source);
With produces_full_deletes => FALSE, Flink skips that step and emits deletes that carry only the key columns, with the rest set to null. This matches what an upsert sink such as a compacted topic expects, and it requires an upsert key or PARTITION BY:
-- Output: +I[id:5, op:'DELETE', name:null]
SELECT * FROM TO_CHANGELOG(
input => TABLE upsert_source PARTITION BY id,
produces_full_deletes => FALSE
);
Note
With produces_full_deletes => FALSE, non-key columns are nulled on delete rows, so the output widens those columns to nullable even when they are NOT NULL on the input. Use the default if the output must keep the input’s NOT NULL types.
Partition by a key
Use PARTITION BY when a downstream operator is keyed on that column. It moves the key columns to the front of the output:
-- Input columns (name, id, cnt) become output columns (id, op, name, cnt)
SELECT * FROM TO_CHANGELOG(input => TABLE my_aggregation PARTITION BY id);
The changelog functions work on flat records with a direct one-to-one operation mapping, where each input record maps to a single row kind. A single message that carries both the before and after image of a row isn’t supported.
Important
FROM_CHANGELOG is an advanced feature. When you use it, you tell Flink that your stream is a valid changelog, and Flink doesn’t validate that claim. An incorrect changelog can produce silently wrong results downstream and, in the worst case, leave a statement in an unrecoverable state. When the data handed to FROM_CHANGELOG isn’t the shape the function expects, it becomes hard to diagnose what actually went wrong, so Confluent is only able to provide limited support for statements that contain FROM_CHANGELOG.
