Process Table Functions in Confluent Cloud for Apache Flink

A process table function (PTF) is an advanced type of user-defined function that enables custom, stateful stream processing logic in Confluent Cloud for Apache Flink®. PTFs extend Flink capabilities beyond standard SQL and simple user-defined functions by providing access to managed state and timers.

PTFs are also known as polymorphic table functions based on the SQL:2016 standard.

Note

Process Table Functions are an Early Access Program feature in Confluent Cloud.

An Early Access feature is a component of Confluent Cloud introduced to gain feedback. This feature should be used only for evaluation and non-production testing purposes or to provide feedback to Confluent, particularly as it becomes more widely available in follow-on preview editions.

Early Access Program features are intended for evaluation use in development and testing environments only, and not for production use. Early Access Program features are provided: (a) without support; (b) “AS IS”; and (c) without indemnification, warranty, or condition of any kind. No service level commitment will apply to Early Access Program features. Early Access Program features are considered to be a Proof of Concept as defined in the Confluent Cloud Terms of Service. Confluent may discontinue providing preview releases of the Early Access Program features at any time in Confluent’s sole discretion.

The Early Access release of PTFs includes support for stateful PTFs with managed state using @StateHint.

For information about limitations and unsupported features in the Early Access release, see Process table function limitations.

What are process table functions?

Unlike traditional user-defined functions that process one row at a time (1-to-1) or aggregate functions that process many rows to produce one result (N-to-1), PTFs support N-to-M semantics. This means a PTF can:

  • Consume multiple input rows

  • Emit multiple output rows

  • Maintain persistent state across events

  • Schedule future actions using timers

PTFs combine the simplicity of serverless functions with the power of stateful stream processing. They provide an escape hatch when SQL alone cannot express your business logic.

Why use process table functions?

Use PTFs when you need to implement complex, stateful logic that cannot be achieved with standard SQL or simple user-defined functions. Common use cases include:

  • Custom windowing logic: Build hybrid windows that emit data based on time intervals or event counts.

  • Deduplication with state: Remember when events were seen over a specific time period to remove duplicates.

  • Timer-based alerts: Trigger alerts when expected events do not occur within a time threshold.

  • Complex state machines: Track multi-step processes, such as user journeys through a checkout flow.

  • Real-time analytics: Calculate custom metrics that require accessing historical state.

PTFs vs other function types

The following table compares PTFs with other function types available in Flink:

Feature

Scalar UDF

Table UDF

Aggregate UDF

Process table function

Input rows

1

1

N

N

Output rows

1

N

1

N

Managed state

No

No

No

Yes

Timers

No

No

No

Yes

Use case

Transform single value

Split one row into many

Aggregate many rows

Stateful stream processing

How do process table functions work?

PTFs operate on input tables and produce output tables. A PTF receives one or more table arguments, processes rows within each logical partition, and emits output rows. You call a PTF from SQL using named arguments:

SELECT user_id, click_id, running_count
FROM EventCounter(
    input => TABLE examples.marketplace.clicks PARTITION BY user_id,
    uid => 'event-counter-v1'
);

The key elements of a PTF invocation are:

TABLE ... PARTITION BY

Passes an input table to the PTF and defines how rows are grouped. Each unique user_id is processed independently with its own isolated state.

uid => '...'

Assigns a stable identifier for state persistence across query restarts.

You can use PTFs in two ways:

SQL-first approach

Register a PTF in Java and call it directly from a SQL SELECT statement to solve complex windowing or stateful processing problems, as shown above.

Table API approach

Build full applications using the Table API with PTFs. Define state using Java annotations and deploy to Confluent Cloud. The platform handles automatic scaling, exactly-once processing, and serverless operations.

Process table function limitations

This section describes the limitations and unsupported features of process table functions (PTFs) in the Early Access release of Confluent Cloud for Apache Flink.

State management limitations

ListView not supported

ListView state is not available in PTFs. ListView is a specialized state interface designed for extremely large state collections that support iteration but not random seeking.

Alternative for normal-sized lists

For lists that fit in memory, use @StateHint with a state POJO containing a List field. Flink handles serialization automatically:

import static org.apache.flink.table.annotation.ArgumentTrait.SET_SEMANTIC_TABLE;

import org.apache.flink.table.annotation.ArgumentHint;
import org.apache.flink.table.annotation.StateHint;
import org.apache.flink.table.functions.ProcessTableFunction;
import org.apache.flink.types.Row;
import java.util.ArrayList;
import java.util.List;

public class ListStatePTF extends ProcessTableFunction<Row> {

    // State POJO with a List field
    public static class ListStateData {
        public List<String> items = new ArrayList<>();
    }

    public void eval(
        @StateHint ListStateData state,
        @ArgumentHint(SET_SEMANTIC_TABLE) Row input
    ) {
        state.items.add(input.getFieldAs(0).toString());
    }
}

Important

The main limitation with ListView is that it’s designed for extremely large state that supports iteration but not random seeking. For most use cases, storing a List in a @StateHint POJO as shown above is sufficient and handles serialization automatically.

MapView not supported

MapView state is not available in PTFs. MapView is a specialized state interface designed for extremely large state collections.

Alternative for normal-sized maps

For maps that fit in memory, use @StateHint with a state POJO containing a Map field. Flink handles serialization automatically:

import static org.apache.flink.table.annotation.ArgumentTrait.SET_SEMANTIC_TABLE;

import org.apache.flink.table.annotation.ArgumentHint;
import org.apache.flink.table.annotation.StateHint;
import org.apache.flink.table.functions.ProcessTableFunction;
import org.apache.flink.types.Row;
import java.util.HashMap;
import java.util.Map;

public class MapStatePTF extends ProcessTableFunction<Row> {

    // State POJO with a Map field
    public static class MapStateData {
        public Map<String, Integer> counts = new HashMap<>();
    }

    public void eval(
        @StateHint MapStateData state,
        @ArgumentHint(SET_SEMANTIC_TABLE) Row input
    ) {
        String key = input.getFieldAs(0).toString();
        state.counts.put(key, state.counts.getOrDefault(key, 0) + 1);
    }
}

Important

The main limitation with MapView is that it’s designed for extremely large state. For most use cases, storing a Map in a @StateHint POJO as shown above is sufficient and handles serialization automatically.

Timer and state time-to-live (TTL) limitations

Event-time timers

Event-time timers are not available. Event-time timers enable PTFs to schedule callbacks at specific event-time watermarks, enabling time-based logic such as session timeouts or delayed processing.

For simple timeout scenarios, use processing-time logic within the PTF’s eval() method. Processing-time logic is not deterministic and does not provide exactly-once guarantees for time-based operations.

State TTL configuration

Automatic state time-to-live (TTL) configuration is not available. State TTL allows automatic cleanup of state entries that have not been accessed for a specified period.

Implement manual state cleanup logic within the PTF. Track access timestamps and periodically remove expired entries:

import static org.apache.flink.table.annotation.ArgumentTrait.SET_SEMANTIC_TABLE;

import org.apache.flink.table.annotation.ArgumentHint;
import org.apache.flink.table.annotation.StateHint;
import org.apache.flink.table.functions.ProcessTableFunction;
import org.apache.flink.types.Row;

public class TTLStatePTF extends ProcessTableFunction<Row> {
    private static final long TTL_MS = 3600000; // 1 hour

    public static class TTLState {
        public long lastAccessTime = 0;
        public String data = null;
    }

    public void eval(
        @StateHint TTLState state,
        @ArgumentHint(SET_SEMANTIC_TABLE) Row input
    ) {
        long now = System.currentTimeMillis();

        // Check if state has expired
        if (state.lastAccessTime > 0 && (now - state.lastAccessTime) > TTL_MS) {
            state.data = null;
        }

        // Update state and access time
        state.data = input.getFieldAs(0).toString();
        state.lastAccessTime = now;
    }
}

Important

This workaround uses processing time, which is not deterministic. State cleanup may occur at different times during replay or recovery.

Language support limitations

Python support for PTFs is not available. PTFs must be written in Java.

For information about Python user-defined functions (non-PTF), see Create a User-Defined Function with Confluent Cloud for Apache Flink.

Async operation limitations

Asynchronous operations within PTFs have limited support.

Best practices for state management

  • Keep state POJOs small. Avoid storing large collections in @StateHint fields. For normal-sized List or Map fields, Flink handles serialization automatically, but extremely large state can impact performance. ListView and MapView support for large state collections is planned for a future release.

  • Choose partition keys carefully. The PARTITION BY clause determines how state is distributed. Too few unique keys can create hot partitions, while extremely high cardinality keys increase the total amount of state Flink must manage.

  • Declare state POJO fields as public with default values. Flink requires public fields to serialize and deserialize state automatically. Private fields with getters and setters are not supported.

  • Test PTFs with realistic data volumes before production deployment.