Process Table Functions in Confluent Cloud for Apache Flink
A process table function (PTF) is an advanced type of user-defined function that enables custom, stateful stream processing logic in Confluent Cloud for Apache Flink®. PTFs extend Flink capabilities beyond standard SQL and simple user-defined functions by providing access to managed state and timers.
PTFs are also known as polymorphic table functions based on the SQL:2016 standard.
Note
Process Table Functions are an Early Access Program feature in Confluent Cloud.
An Early Access feature is a component of Confluent Cloud introduced to gain feedback. This feature should be used only for evaluation and non-production testing purposes or to provide feedback to Confluent, particularly as it becomes more widely available in follow-on preview editions.
Early Access Program features are intended for evaluation use in development and testing environments only, and not for production use. Early Access Program features are provided: (a) without support; (b) “AS IS”; and (c) without indemnification, warranty, or condition of any kind. No service level commitment will apply to Early Access Program features. Early Access Program features are considered to be a Proof of Concept as defined in the Confluent Cloud Terms of Service. Confluent may discontinue providing preview releases of the Early Access Program features at any time in Confluent’s sole discretion.
The Early Access release of PTFs includes support for stateful PTFs with managed state using @StateHint.
For information about limitations and unsupported features in the Early Access release, see Process table function limitations.
What are process table functions?
Unlike traditional user-defined functions that process one row at a time (1-to-1) or aggregate functions that process many rows to produce one result (N-to-1), PTFs support N-to-M semantics. This means a PTF can:
Consume multiple input rows
Emit multiple output rows
Maintain persistent state across events
Schedule future actions using timers
PTFs combine the simplicity of serverless functions with the power of stateful stream processing. They provide an escape hatch when SQL alone cannot express your business logic.
Why use process table functions?
Use PTFs when you need to implement complex, stateful logic that cannot be achieved with standard SQL or simple user-defined functions. Common use cases include:
Custom windowing logic: Build hybrid windows that emit data based on time intervals or event counts.
Deduplication with state: Remember when events were seen over a specific time period to remove duplicates.
Timer-based alerts: Trigger alerts when expected events do not occur within a time threshold.
Complex state machines: Track multi-step processes, such as user journeys through a checkout flow.
Real-time analytics: Calculate custom metrics that require accessing historical state.
PTFs vs other function types
The following table compares PTFs with other function types available in Flink:
Feature | Scalar UDF | Table UDF | Aggregate UDF | Process table function |
|---|---|---|---|---|
Input rows | 1 | 1 | N | N |
Output rows | 1 | N | 1 | N |
Managed state | No | No | No | Yes |
Timers | No | No | No | Yes |
Use case | Transform single value | Split one row into many | Aggregate many rows | Stateful stream processing |
How do process table functions work?
PTFs operate on input tables and produce output tables. A PTF receives one or more table arguments, processes rows within each logical partition, and emits output rows. You call a PTF from SQL using named arguments:
SELECT user_id, click_id, running_count
FROM EventCounter(
input => TABLE examples.marketplace.clicks PARTITION BY user_id,
uid => 'event-counter-v1'
);
The key elements of a PTF invocation are:
TABLE ... PARTITION BYPasses an input table to the PTF and defines how rows are grouped. Each unique
user_idis processed independently with its own isolated state.uid => '...'Assigns a stable identifier for state persistence across query restarts.
You can use PTFs in two ways:
- SQL-first approach
Register a PTF in Java and call it directly from a SQL
SELECTstatement to solve complex windowing or stateful processing problems, as shown above.- Table API approach
Build full applications using the Table API with PTFs. Define state using Java annotations and deploy to Confluent Cloud. The platform handles automatic scaling, exactly-once processing, and serverless operations.
Process table function limitations
This section describes the limitations and unsupported features of process table functions (PTFs) in the Early Access release of Confluent Cloud for Apache Flink.
State management limitations
ListView not supported
ListView state is not available in PTFs. ListView is a specialized state interface designed for extremely large state collections that support iteration but not random seeking.
Alternative for normal-sized lists
For lists that fit in memory, use @StateHint with a state POJO containing a List field. Flink handles serialization automatically:
import static org.apache.flink.table.annotation.ArgumentTrait.SET_SEMANTIC_TABLE;
import org.apache.flink.table.annotation.ArgumentHint;
import org.apache.flink.table.annotation.StateHint;
import org.apache.flink.table.functions.ProcessTableFunction;
import org.apache.flink.types.Row;
import java.util.ArrayList;
import java.util.List;
public class ListStatePTF extends ProcessTableFunction<Row> {
// State POJO with a List field
public static class ListStateData {
public List<String> items = new ArrayList<>();
}
public void eval(
@StateHint ListStateData state,
@ArgumentHint(SET_SEMANTIC_TABLE) Row input
) {
state.items.add(input.getFieldAs(0).toString());
}
}
Important
The main limitation with ListView is that it’s designed for extremely large state that supports iteration but not random seeking. For most use cases, storing a List in a @StateHint POJO as shown above is sufficient and handles serialization automatically.
MapView not supported
MapView state is not available in PTFs. MapView is a specialized state interface designed for extremely large state collections.
Alternative for normal-sized maps
For maps that fit in memory, use @StateHint with a state POJO containing a Map field. Flink handles serialization automatically:
import static org.apache.flink.table.annotation.ArgumentTrait.SET_SEMANTIC_TABLE;
import org.apache.flink.table.annotation.ArgumentHint;
import org.apache.flink.table.annotation.StateHint;
import org.apache.flink.table.functions.ProcessTableFunction;
import org.apache.flink.types.Row;
import java.util.HashMap;
import java.util.Map;
public class MapStatePTF extends ProcessTableFunction<Row> {
// State POJO with a Map field
public static class MapStateData {
public Map<String, Integer> counts = new HashMap<>();
}
public void eval(
@StateHint MapStateData state,
@ArgumentHint(SET_SEMANTIC_TABLE) Row input
) {
String key = input.getFieldAs(0).toString();
state.counts.put(key, state.counts.getOrDefault(key, 0) + 1);
}
}
Important
The main limitation with MapView is that it’s designed for extremely large state. For most use cases, storing a Map in a @StateHint POJO as shown above is sufficient and handles serialization automatically.
Timer and state time-to-live (TTL) limitations
Event-time timers
Event-time timers are not available. Event-time timers enable PTFs to schedule callbacks at specific event-time watermarks, enabling time-based logic such as session timeouts or delayed processing.
For simple timeout scenarios, use processing-time logic within the PTF’s eval() method. Processing-time logic is not deterministic and does not provide exactly-once guarantees for time-based operations.
State TTL configuration
Automatic state time-to-live (TTL) configuration is not available. State TTL allows automatic cleanup of state entries that have not been accessed for a specified period.
Implement manual state cleanup logic within the PTF. Track access timestamps and periodically remove expired entries:
import static org.apache.flink.table.annotation.ArgumentTrait.SET_SEMANTIC_TABLE;
import org.apache.flink.table.annotation.ArgumentHint;
import org.apache.flink.table.annotation.StateHint;
import org.apache.flink.table.functions.ProcessTableFunction;
import org.apache.flink.types.Row;
public class TTLStatePTF extends ProcessTableFunction<Row> {
private static final long TTL_MS = 3600000; // 1 hour
public static class TTLState {
public long lastAccessTime = 0;
public String data = null;
}
public void eval(
@StateHint TTLState state,
@ArgumentHint(SET_SEMANTIC_TABLE) Row input
) {
long now = System.currentTimeMillis();
// Check if state has expired
if (state.lastAccessTime > 0 && (now - state.lastAccessTime) > TTL_MS) {
state.data = null;
}
// Update state and access time
state.data = input.getFieldAs(0).toString();
state.lastAccessTime = now;
}
}
Important
This workaround uses processing time, which is not deterministic. State cleanup may occur at different times during replay or recovery.
Language support limitations
Python support for PTFs is not available. PTFs must be written in Java.
For information about Python user-defined functions (non-PTF), see Create a User-Defined Function with Confluent Cloud for Apache Flink.
Async operation limitations
Asynchronous operations within PTFs have limited support.
Best practices for state management
Keep state POJOs small. Avoid storing large collections in
@StateHintfields. For normal-sizedListorMapfields, Flink handles serialization automatically, but extremely large state can impact performance.ListViewandMapViewsupport for large state collections is planned for a future release.Choose partition keys carefully. The
PARTITION BYclause determines how state is distributed. Too few unique keys can create hot partitions, while extremely high cardinality keys increase the total amount of state Flink must manage.Declare state POJO fields as
publicwith default values. Flink requires public fields to serialize and deserialize state automatically. Private fields with getters and setters are not supported.Test PTFs with realistic data volumes before production deployment.