Process Table Functions in Confluent Cloud for Apache Flink
A process table function (PTF) is the most flexible type of user-defined function in Confluent Cloud for Apache Flink®. PTFs are the “escape hatch” to Flink’s lowest-level stream-processing primitives — managed state, event-time timers, and multi-table inputs — for cases where standard SQL and simple user-defined functions can’t express your business logic.
Conceptually, a PTF is itself a user-defined function that provides all the primitives to build functions ranging from a simple scalar function to advanced windowing and aggregation functions.
PTFs are also called polymorphic table functions based on the SQL:2016 standard.
Note
Process Table Functions are an Open Preview feature in Confluent Cloud.
A Preview feature is a Confluent Cloud component that is being introduced to gain early feedback from developers. Preview features can be used for evaluation and non-production testing purposes or to provide feedback to Confluent. The warranty, SLA, and Support Services provisions of your agreement with Confluent do not apply to Preview features. Confluent may discontinue providing preview releases of the Preview features at any time in Confluent’s’ sole discretion.
The Open Preview release of PTFs includes:
Stateful PTFs with managed state, including state time-to-live (TTL), using
@StateHint.Event-time timers using
onTimer(), with event-time arguments declared through theREQUIRE_ON_TIMEtrait and bound at SQL call time withDESCRIPTOR(...).Multi-table inputs, with up to 20
SET_SEMANTIC_TABLEarguments per PTF.Pass-through columns through the
PASS_COLUMNS_THROUGHargument trait.
For information about limitations and unsupported features in the Open Preview release, see Process table function limitations.
Flink PTFs are available in all regions that support UDFs. For more information, see UDF regional availability.
What are process table functions?
Unlike traditional user-defined functions that process one row at a time (1-to-1) or aggregate functions that process many rows to produce one result (N-to-1), PTFs support N-to-M semantics. This means a PTF can:
Consume multiple input rows from one or more tables.
Emit multiple output rows.
Maintain persistent, per-partition state across events.
Schedule future actions using event-time timers.
PTFs combine the simplicity of serverless functions with the power of stateful stream processing. They provide an escape hatch when SQL alone cannot express your business logic.
Differences from Apache Flink
PTFs in Confluent Cloud for Apache Flink® are based on the Flink 2.2 ProcessTableFunction API. The 20-table limit on multi-table arguments, event-time timers, and the PASS_COLUMNS_THROUGH trait for single-table PTFs all behave the same as Flink open source. The table below lists the runtime differences to keep in mind when porting a PTF between the two.
Area | Flink open source | Confluent Cloud for Apache Flink® |
|---|---|---|
| Supported | Supported (Open Preview) |
| Recommended | Required |
| Supported | Not supported. Use a plain |
| Up to nanoseconds (precision 9) | Capped at microseconds (precision 6) |
State POJO field visibility | Public fields, or private fields with accessors | Public fields with default values required |
Why use process table functions?
Use PTFs when you need to implement complex, stateful logic that cannot be achieved with standard SQL or simple user-defined functions. Common use cases include:
Custom windowing logic: Build hybrid windows that emit data based on time intervals or event counts.
Deduplication with state: Remember when events were seen over a specific time period to remove duplicates.
Timer-based alerts: Trigger alerts when expected events do not occur within a time threshold.
Complex state machines: Track multi-step processes, such as user journeys through a checkout flow.
Control streams: Use a low-cardinality side input to mutate per-key behavior in a main stream.
Real-time analytics: Calculate custom metrics that require accessing historical state.
Note
Reach for built-in SQL operators first. PTFs give you direct access to state, timers, and side outputs — but with that power comes responsibility for managing each one correctly. If a TUMBLE, OVER, MATCH_RECOGNIZE, or built-in function already does what you need, use it instead.
Common PTF patterns
The following patterns recur often enough across customer workloads that they are worth recognizing as templates. Each is a thumbnail; full walkthroughs live in Create a Process Table Function and the Flink OSS documentation.
Pattern | Shape |
|---|---|
Time-bounded deduplication |
|
Event-absence / timeout detection | Store the triggering event in state, register a timer for the deadline, and |
Custom micro-batching |
|
Temporal stream join with TTL | Multi-table PTF that stores one side of the join in state and uses |
Threshold breaching with early reset | Counter in state + timer for the rolling window, plus an explicit |
Missing-heartbeat / liveness check | Named timer that’s replaced on each heartbeat; |
Sensor debouncing | Last-seen value in state; emit only when the new value differs by more than a threshold. |
Early-firing sessionization |
|
PTFs vs other function types
The following table compares PTFs with other function types available in Flink:
Feature | Scalar UDF | Table UDF | Aggregate UDF | Process table function |
|---|---|---|---|---|
Input rows | 1 | 1 | N | N |
Output rows | 1 | N | 1 | N |
Managed state | No | No | No | Yes |
Timers | No | No | No | Yes |
Multi-table input | No | No | No | Yes |
Use case | Transform single value | Split one row into many | Aggregate many rows | Stateful stream processing |
How do process table functions work?
PTFs operate on input tables and produce output tables. A PTF receives one or more table arguments, processes rows within each logical partition, and emits output rows. You call a PTF from SQL using named arguments:
SELECT user_id, click_id, running_count
FROM EventCounter(
input => TABLE examples.marketplace.clicks PARTITION BY user_id,
uid => 'event-counter-v1'
);
The key elements of a PTF invocation are:
TABLE ... PARTITION BYPasses an input table to the PTF and defines the row grouping. The PTF processes each unique
user_idindependently with its own isolated state. In Confluent Cloud for Apache Flink®,PARTITION BYis required on everySET_SEMANTIC_TABLEargument.uid => '...'Assigns a stable identifier for state persistence across query restarts.
You can use PTFs in two ways:
- SQL-first approach
Register a PTF in Java and call it directly from a SQL
SELECTstatement to solve complex windowing or stateful processing problems, as the previous example shows.- Table API approach
Build full applications using the Table API with PTFs. Define state using Java annotations and deploy to Confluent Cloud. The platform handles automatic scaling, exactly-once processing, and serverless operations.
Because PTFs are discovered through Java reflection, you must annotate the class, the eval method, and its parameters to tell the runtime how to call the function and what to return. The annotation surface is small:
Annotation | Purpose |
|---|---|
| Declares the PTF’s output schema. Required whenever the function extends |
| Marks a parameter as managed, per-partition state. The state class is a POJO with public fields. Add |
| Declares that the parameter is a table argument. Combine with other traits like |
| Declares the SQL type of a non-table argument. Useful when Java reflection can’t infer the precision you want (e.g., |
Table argument semantics
A PTF declares each table argument with one of two argument traits:
SET_SEMANTIC_TABLEThe PTF processes a logical partition of rows, defined by the
PARTITION BYclause on the SQL invocation. Set semantics give the PTF access to managed state, state TTL, and timers — all of which are scoped per partition. Every stateful example on this page uses set semantics. In Confluent Cloud for Apache Flink®,PARTITION BYis required on everySET_SEMANTIC_TABLEargument.ROW_SEMANTIC_TABLEThe PTF processes one row at a time, with no partition concept and no per-key state. Row semantics are useful for stateless enrichment, where the PTF acts as a 1:1 row transformer. Use
ROW_SEMANTIC_TABLEwhen the PTF doesn’t needPARTITION BY,@StateHint, or timers.
Declare the trait on the table parameter with @ArgumentHint:
@ArgumentHint(SET_SEMANTIC_TABLE) Row input // partition-scoped
@ArgumentHint(ROW_SEMANTIC_TABLE) Row input // row-by-row
Either semantic can be combined with other traits — for example, @ArgumentHint({SET_SEMANTIC_TABLE, PASS_COLUMNS_THROUGH}) for pass-through enrichment within a partition, or @ArgumentHint({SET_SEMANTIC_TABLE, REQUIRE_ON_TIME}) when the PTF schedules event-time timers.
State TTL
Confluent Cloud for Apache Flink® supports automatic state expiration on PTF state entries through the ttl attribute of @StateHint. When a partition key receives no read or write for the configured duration, Flink garbage-collects the state entry. Use state TTL to bound the size of long-running stateful PTFs without writing manual cleanup logic.
Set ttl to a duration string on the @StateHint annotation:
import static org.apache.flink.table.annotation.ArgumentTrait.SET_SEMANTIC_TABLE;
import org.apache.flink.table.annotation.ArgumentHint;
import org.apache.flink.table.annotation.DataTypeHint;
import org.apache.flink.table.annotation.StateHint;
import org.apache.flink.table.functions.ProcessTableFunction;
import org.apache.flink.types.Row;
// Output excludes the partition key — Flink auto-prepends it from PARTITION BY.
@DataTypeHint("ROW<counter BIGINT>")
public class TtlCountdown extends ProcessTableFunction<Row> {
public static class CountState {
public long counter = 0L;
}
public void eval(
@StateHint(ttl = "30 seconds") CountState state,
@ArgumentHint(SET_SEMANTIC_TABLE) Row input
) {
state.counter++;
collect(Row.of(state.counter));
}
}
In this example, state.counter resets to 0 if no event arrives for the partition key within 30 seconds. TTL applies independently to every partition key, so an active key is unaffected by other keys’ inactivity.
The ttl attribute accepts standard Flink duration syntax: "30 seconds", "5 minutes", "1 hour", "7 days". TTL is measured against state-backend access time, not event time.
Multi-table inputs
A PTF can declare multiple SET_SEMANTIC_TABLE arguments. Each input table is co-partitioned by its PARTITION BY key and delivered to the same PTF instance, so the function can join, enrich, or correlate streams across inputs with shared state. Use multi-table inputs for control streams that mutate per-key behavior in a main stream, side inputs that look up reference data, or self-joins that need two independent traversals of the same source.
import static org.apache.flink.table.annotation.ArgumentTrait.SET_SEMANTIC_TABLE;
import org.apache.flink.table.annotation.ArgumentHint;
import org.apache.flink.table.annotation.DataTypeHint;
import org.apache.flink.table.annotation.StateHint;
import org.apache.flink.table.functions.ProcessTableFunction;
import org.apache.flink.types.Row;
// Output excludes the partition key — Flink auto-prepends tenant_id from PARTITION BY.
@DataTypeHint("ROW<payload STRING, active_mode STRING>")
public class MtControlStream extends ProcessTableFunction<Row> {
public static class TenantConfig {
public String activeMode = "default";
}
public void eval(
@StateHint TenantConfig state,
@ArgumentHint(SET_SEMANTIC_TABLE) Row events,
@ArgumentHint(SET_SEMANTIC_TABLE) Row control
) {
// Control rows update the per-tenant active mode; no row is emitted.
if (control != null) {
String mode = control.getFieldAs("mode");
if (mode != null) {
state.activeMode = mode;
}
return;
}
// Event rows are tagged with the current mode.
if (events != null) {
String payload = events.getFieldAs("payload");
collect(Row.of(payload, state.activeMode));
}
}
}
Call the PTF with both tables in a single SQL invocation:
SELECT *
FROM MtControlStream(
events => TABLE events PARTITION BY tenant_id,
control => TABLE control PARTITION BY tenant_id
);
Confluent Cloud for Apache Flink® supports up to 20 table arguments per PTF, the same as Flink open source. Every SET_SEMANTIC_TABLE argument must include a PARTITION BY clause; Confluent Cloud for Apache Flink® does not allow unpartitioned set-semantic tables.
Pass-through columns
The PASS_COLUMNS_THROUGH trait copies every column of the input table into the output. Use it when a PTF enriches each row with one or more additional fields without re-emitting the input schema in Java.
import static org.apache.flink.table.annotation.ArgumentTrait.PASS_COLUMNS_THROUGH;
import static org.apache.flink.table.annotation.ArgumentTrait.SET_SEMANTIC_TABLE;
import org.apache.flink.table.annotation.ArgumentHint;
import org.apache.flink.table.annotation.StateHint;
import org.apache.flink.table.functions.ProcessTableFunction;
import org.apache.flink.types.Row;
public class PassThroughEnrich extends ProcessTableFunction<Long> {
public static class CountState {
public long count = 0L;
}
public void eval(
@StateHint CountState state,
@ArgumentHint({SET_SEMANTIC_TABLE, PASS_COLUMNS_THROUGH}) Row input
) {
state.count++;
collect(state.count);
}
}
The output table contains every column of input followed by the single running_count value emitted by collect(). The PTF does not have to project or re-emit the input columns — Flink splices them in automatically. PASS_COLUMNS_THROUGH applies to single-table PTFs only; multi-table PTFs cannot use this trait, the same as in Flink open source.
Event-time timers
PTFs support event-time timers for scheduling callbacks at specific watermark positions. Timers enable time-based logic such as session timeouts, delayed processing, and alerting when expected events do not arrive.
How timers work
A timer fires when the current watermark advances past the timer’s registered timestamp. The virtual processor defined by PARTITION BY scopes the timers — each partition maintains its own independent set of timers. Flink persists and restores timers during failures and restarts.
To use timers, add REQUIRE_ON_TIME to the @ArgumentHint annotation on the table argument. This makes the on_time descriptor argument mandatory in the SQL call, which specifies which column provides the event-time timestamp.
Registering and clearing timers
Register timers inside eval() or onTimer() using the TimeContext API:
TimeContext<Instant> timeCtx = ctx.timeContext(Instant.class);
// Named timer — can be replaced or cleared by name
timeCtx.registerOnTime("timeout", timeCtx.time().plusSeconds(60));
// Unnamed timer — one per timestamp value
timeCtx.registerOnTime(timeCtx.time().plus(Duration.ofMinutes(5)));
To clear timers:
timeCtx.clearTimer("timeout"); // Clear a named timer
timeCtx.clearTimer(someTimestamp); // Clear an unnamed timer by timestamp
ctx.clearAllTimers(); // Clear all timers in the partition
ctx.clearAll(); // Clear all state and timers
The onTimer() method
When a timer fires, Flink calls the onTimer() method. This method has access to the same state entries declared in eval():
public void onTimer(OnTimerContext onTimerCtx, MyState state) {
String timerName = onTimerCtx.currentTimer(); // null for unnamed timers
collect(new Alert(state.userId, "Timeout reached"));
}
The onTimer() method signature supports:
An optional
OnTimerContextas the first parameter.Zero or more
@StateHintparameters matching those ineval().
SQL invocation with timers
When calling a PTF with timers, use the on_time descriptor argument to specify the watermarked timestamp column:
SELECT *
FROM InactivityAlert(
input => TABLE user_events PARTITION BY user_id,
on_time => DESCRIPTOR(event_time),
uid => 'inactivity-alert-v1'
);
The on_time column must have a watermark declaration on the source table.
Common timer pitfalls
A few timer mistakes appear repeatedly in PTF code reviews. Knowing them up-front saves debugging time:
Duplicate registration. Re-registering a named timer silently replaces the previous one — this is usually what you want (it’s the “reset on activity” pattern). Re-registering an unnamed timer at the same timestamp inflates state for no benefit; prefer named timers when the registration is conditional.
Forgotten cancellation. A timer that’s no longer needed but still registered will still fire.
onTimer()then runs against whatever state it finds — which may be empty, stale, or wrong for the original condition. CallclearTimer()as soon as the awaited event arrives.Event-time vs. processing-time confusion. Event-time timers fire on watermark advance, not on wall-clock progress. A stream whose watermark stops advancing — for example, an idle partition with no source rows — will never fire any event-time timer, even after hours of real time. Inspect watermarks first when an alert “never fires”.
Timer best practices
Clean up timers that you no longer need. Use
clearTimer()after the condition the timer was waiting for is satisfied.Avoid registering an excessive number of timers per partition. Each timer consumes state, and large numbers of timers can affect performance and recovery time.
Use named timers when you need to replace or cancel a specific timer. Named timers use a string name as their identifier, and re-registering the same name replaces the previous timer.
Use unnamed timers when you need multiple independent timers at different timestamps within the same partition.
Process table function limitations
This section describes the limitations and unsupported features of process table functions (PTFs) in the Open Preview release of Confluent Cloud for Apache Flink.
State view limitations
ListView not supported
ListView state is not available in PTFs. ListView is a specialized state interface designed for extremely large state collections that support iteration but not random seeking.
For lists that fit in memory, use @StateHint with a state POJO containing a List field. Flink handles serialization automatically:
import static org.apache.flink.table.annotation.ArgumentTrait.SET_SEMANTIC_TABLE;
import org.apache.flink.table.annotation.ArgumentHint;
import org.apache.flink.table.annotation.StateHint;
import org.apache.flink.table.functions.ProcessTableFunction;
import org.apache.flink.types.Row;
import java.util.ArrayList;
import java.util.List;
public class ListStatePTF extends ProcessTableFunction<Row> {
public static class ListStateData {
public List<String> items = new ArrayList<>();
}
public void eval(
@StateHint ListStateData state,
@ArgumentHint(SET_SEMANTIC_TABLE) Row input
) {
state.items.add(input.getFieldAs(0).toString());
}
}
MapView not supported
MapView state is not available in PTFs. MapView is a specialized state interface designed for extremely large state collections.
For maps that fit in memory, use @StateHint with a state POJO containing a Map field. Flink handles serialization automatically:
import static org.apache.flink.table.annotation.ArgumentTrait.SET_SEMANTIC_TABLE;
import org.apache.flink.table.annotation.ArgumentHint;
import org.apache.flink.table.annotation.StateHint;
import org.apache.flink.table.functions.ProcessTableFunction;
import org.apache.flink.types.Row;
import java.util.HashMap;
import java.util.Map;
public class MapStatePTF extends ProcessTableFunction<Row> {
public static class MapStateData {
public Map<String, Integer> counts = new HashMap<>();
}
public void eval(
@StateHint MapStateData state,
@ArgumentHint(SET_SEMANTIC_TABLE) Row input
) {
String key = input.getFieldAs(0).toString();
state.counts.put(key, state.counts.getOrDefault(key, 0) + 1);
}
}
Important
ListView and MapView are designed for state too large to fit in task-manager memory. For most use cases, a plain List or Map field in a @StateHint POJO is sufficient and handles serialization automatically. Pair these fields with @StateHint(ttl=…) to bound growth in long-running statements.
Language support limitations
Python support for PTFs is not available. PTFs must be written in Java.
For information about Python user-defined functions (non-PTF), see Create a User-Defined Function with Confluent Cloud for Apache Flink.
Best practices
Keep state POJOs small. Avoid storing large collections in
@StateHintfields. For normal-sizedListorMapfields, Flink handles serialization automatically, but extremely large state can impact performance.ListViewandMapViewsupport for large state collections is planned for a future release.Be aware that every read or write of a
@StateHintPOJO serializes the entire field. A growingListorMapfield will get more expensive per event as it grows; bound the growth with@StateHint(ttl=…)or by partitioning more aggressively so each key holds less data.ListView/MapViewwill land in a future release for truly large collections.Use
@StateHint(ttl=…)to bound state growth in long-running stateful PTFs. See State TTL.Choose partition keys carefully. The
PARTITION BYclause determines the state distribution. Too few unique keys can create hot partitions, while extremely high cardinality keys increase the total amount of state Flink must manage.Declare state POJO fields as
publicwith default values. Flink requires public fields to serialize and deserialize state automatically. Flink doesn’t support private fields with getters and setters.Test PTFs with realistic data volumes before production deployment.
Follow timer best practices when using event-time timers. Clean up timers after you no longer need them to avoid excessive state growth.