Create a Process Table Function in Confluent Cloud for Apache Flink

This guide shows you how to create custom process table functions (PTFs) in Java and deploy them to Confluent Cloud for Apache Flink®. You build a stateless PTF and a stateful PTF, then register and call them from SQL.

Note

Process Table Functions are an Early Access Program feature in Confluent Cloud.

An Early Access feature is a component of Confluent Cloud introduced to gain feedback. This feature should be used only for evaluation and non-production testing purposes or to provide feedback to Confluent, particularly as it becomes more widely available in follow-on preview editions.

Early Access Program features are intended for evaluation use in development and testing environments only, and not for production use. Early Access Program features are provided: (a) without support; (b) “AS IS”; and (c) without indemnification, warranty, or condition of any kind. No service level commitment will apply to Early Access Program features. Early Access Program features are considered to be a Proof of Concept as defined in the Confluent Cloud Terms of Service. Confluent may discontinue providing preview releases of the Early Access Program features at any time in Confluent’s sole discretion.

Prerequisites

Before you begin, ensure you have the following:

  • A Confluent Cloud account with an active organization.

  • A Kafka cluster with at least one topic containing streaming data.

  • Java 11 or later and a build tool like Maven or Gradle.

Example: Stateless threshold filter PTF

This example shows a PTF that filters rows where a numeric value exceeds a threshold:

import org.apache.flink.table.functions.ProcessTableFunction;
import org.apache.flink.table.annotation.ArgumentHint;
import org.apache.flink.table.annotation.ArgumentTrait;
import org.apache.flink.table.annotation.DataTypeHint;
import org.apache.flink.types.Row;

public class ThresholdFilter extends ProcessTableFunction<Row> {

    public void eval(
        @ArgumentHint(value = ArgumentTrait.SET_SEMANTIC_TABLE) Row row,
        @DataTypeHint("DOUBLE") Double threshold
    ) {
        // Get the numeric value from the first column
        Double value = (Double) row.getField(0);

        // Only emit rows that exceed the threshold
        if (value != null && value > threshold) {
            collect(row);
        }
    }
}

This PTF:

  • Takes a table and a threshold value as parameters.

  • Examines each row’s first column value.

  • Only emits rows where the value exceeds the threshold.

To use this PTF in SQL, register it with your Flink environment, as shown in Create a UDF, then call it in a query:

SELECT *
FROM ThresholdFilter(TABLE sensor_readings, 100.0);

Example: Stateful event counter PTF

The following example shows a stateful PTF that counts events per partition key and emits a running total with each event. This example uses the examples.marketplace.clicks sample data table available in Confluent Cloud.

Step 1: Write the PTF

import static org.apache.flink.table.annotation.ArgumentTrait.SET_SEMANTIC_TABLE;

import org.apache.flink.table.annotation.ArgumentHint;
import org.apache.flink.table.annotation.StateHint;
import org.apache.flink.table.functions.ProcessTableFunction;
import org.apache.flink.types.Row;

public class EventCounter extends ProcessTableFunction<EventCounter.EventCount> {

    public static class EventCount {
        public int user_id;
        public String click_id;
        public int running_count;
    }

    public static class CountState {
        public int count = 0;
    }

    public void eval(
        @StateHint CountState state,
        @ArgumentHint(SET_SEMANTIC_TABLE) Row input
    ) {
        state.count++;

        EventCount result = new EventCount();
        result.user_id = input.getFieldAs("user_id");
        result.click_id = input.getFieldAs("click_id");
        result.running_count = state.count;

        collect(result);
    }
}

This PTF:

  • Declares a CountState POJO with @StateHint. Flink automatically persists state per partition key.

  • Declares an EventCount output POJO so Flink can infer the output schema.

  • Increments the count each time a new event arrives for the same partition key.

  • Uses getFieldAs() for type-safe, named field access from the input row.

Step 2: Package and deploy

Package the PTF as a JAR, upload the artifact from Step 1: Build the artifact to Confluent Cloud, and register it:

CREATE FUNCTION EventCounter
AS 'com.example.EventCounter'
USING JAR '<your-artifact-uri>';

Step 3: Call the PTF from SQL

SELECT *
FROM EventCounter(
    input => TABLE examples.marketplace.clicks PARTITION BY user_id,
    uid => 'event-counter-v1'
);

PARTITION BY user_id tells Flink to maintain separate state for each user. The uid parameter assigns a stable identifier so state survives query restarts.

Step 4: View the results

Expected output:

user_id | click_id  | running_count
--------|-----------|---------------
1234    | click-42  | 1
1234    | click-55  | 2
5678    | click-87  | 1
1234    | click-91  | 3

Each user has an independent running count. When a new click arrives for user_id = 1234, the count increments from where it left off.

Note

For more PTF patterns, see the Apache Flink PTF documentation.

Troubleshooting

PTF not found

If you receive an error that the PTF is not found, ensure:

No output produced

If the PTF query runs but produces no output:

  • Verify that data is flowing into the source table by querying examples.marketplace.clicks directly.

  • Check that watermarks are being generated. Add a SELECT query with a TUMBLE window to verify watermark progress.

High state size or degraded performance

If your PTF statement enters a DEGRADED state or accumulates excessive state:

  • Check the statement’s state size and CFU consumption in the Cloud Console or Query Profiler.

  • Reduce the number of unique partition keys to limit the total state entries.

  • Increase the compute pool’s MAX_CFU to provide more resources.

  • For long-running stateful PTFs, consider implementing manual state cleanup logic within the eval() method. See Process table function limitations.

Next steps