Create a Process Table Function in Confluent Cloud for Apache Flink
This guide shows you how to create custom process table functions (PTFs) in Java and deploy them to Confluent Cloud for Apache Flink®. You build a stateless PTF and a stateful PTF, then register and call them from SQL.
Note
Process Table Functions are an Early Access Program feature in Confluent Cloud.
An Early Access feature is a component of Confluent Cloud introduced to gain feedback. This feature should be used only for evaluation and non-production testing purposes or to provide feedback to Confluent, particularly as it becomes more widely available in follow-on preview editions.
Early Access Program features are intended for evaluation use in development and testing environments only, and not for production use. Early Access Program features are provided: (a) without support; (b) “AS IS”; and (c) without indemnification, warranty, or condition of any kind. No service level commitment will apply to Early Access Program features. Early Access Program features are considered to be a Proof of Concept as defined in the Confluent Cloud Terms of Service. Confluent may discontinue providing preview releases of the Early Access Program features at any time in Confluent’s sole discretion.
Prerequisites
Before you begin, ensure you have the following:
A Confluent Cloud account with an active organization.
A Kafka cluster with at least one topic containing streaming data.
Java 11 or later and a build tool like Maven or Gradle.
Example: Stateless threshold filter PTF
This example shows a PTF that filters rows where a numeric value exceeds a threshold:
import org.apache.flink.table.functions.ProcessTableFunction;
import org.apache.flink.table.annotation.ArgumentHint;
import org.apache.flink.table.annotation.ArgumentTrait;
import org.apache.flink.table.annotation.DataTypeHint;
import org.apache.flink.types.Row;
public class ThresholdFilter extends ProcessTableFunction<Row> {
public void eval(
@ArgumentHint(value = ArgumentTrait.SET_SEMANTIC_TABLE) Row row,
@DataTypeHint("DOUBLE") Double threshold
) {
// Get the numeric value from the first column
Double value = (Double) row.getField(0);
// Only emit rows that exceed the threshold
if (value != null && value > threshold) {
collect(row);
}
}
}
This PTF:
Takes a table and a threshold value as parameters.
Examines each row’s first column value.
Only emits rows where the value exceeds the threshold.
To use this PTF in SQL, register it with your Flink environment, as shown in Create a UDF, then call it in a query:
SELECT *
FROM ThresholdFilter(TABLE sensor_readings, 100.0);
Example: Stateful event counter PTF
The following example shows a stateful PTF that counts events per partition key and emits a running total with each event. This example uses the examples.marketplace.clicks sample data table available in Confluent Cloud.
Step 1: Write the PTF
import static org.apache.flink.table.annotation.ArgumentTrait.SET_SEMANTIC_TABLE;
import org.apache.flink.table.annotation.ArgumentHint;
import org.apache.flink.table.annotation.StateHint;
import org.apache.flink.table.functions.ProcessTableFunction;
import org.apache.flink.types.Row;
public class EventCounter extends ProcessTableFunction<EventCounter.EventCount> {
public static class EventCount {
public int user_id;
public String click_id;
public int running_count;
}
public static class CountState {
public int count = 0;
}
public void eval(
@StateHint CountState state,
@ArgumentHint(SET_SEMANTIC_TABLE) Row input
) {
state.count++;
EventCount result = new EventCount();
result.user_id = input.getFieldAs("user_id");
result.click_id = input.getFieldAs("click_id");
result.running_count = state.count;
collect(result);
}
}
This PTF:
Declares a
CountStatePOJO with@StateHint. Flink automatically persists state per partition key.Declares an
EventCountoutput POJO so Flink can infer the output schema.Increments the count each time a new event arrives for the same partition key.
Uses
getFieldAs()for type-safe, named field access from the input row.
Step 2: Package and deploy
Package the PTF as a JAR, upload the artifact from Step 1: Build the artifact to Confluent Cloud, and register it:
CREATE FUNCTION EventCounter
AS 'com.example.EventCounter'
USING JAR '<your-artifact-uri>';
Step 3: Call the PTF from SQL
SELECT *
FROM EventCounter(
input => TABLE examples.marketplace.clicks PARTITION BY user_id,
uid => 'event-counter-v1'
);
PARTITION BY user_id tells Flink to maintain separate state for each user. The uid parameter assigns a stable identifier so state survives query restarts.
Step 4: View the results
Expected output:
user_id | click_id | running_count
--------|-----------|---------------
1234 | click-42 | 1
1234 | click-55 | 2
5678 | click-87 | 1
1234 | click-91 | 3
Each user has an independent running count. When a new click arrives for user_id = 1234, the count increments from where it left off.
Note
For more PTF patterns, see the Apache Flink PTF documentation.
Troubleshooting
PTF not found
If you receive an error that the PTF is not found, ensure:
You are using a Flink compute pool in a region that supports PTFs. See Supported Cloud Regions for Confluent Cloud for Apache Flink.
Your Flink version supports PTFs. PTFs are available starting with Flink 2.1.
You have spelled the function name correctly. Function names are case-sensitive.
No output produced
If the PTF query runs but produces no output:
Verify that data is flowing into the source table by querying
examples.marketplace.clicksdirectly.Check that watermarks are being generated. Add a
SELECTquery with aTUMBLEwindow to verify watermark progress.
High state size or degraded performance
If your PTF statement enters a DEGRADED state or accumulates excessive state:
Check the statement’s state size and CFU consumption in the Cloud Console or Query Profiler.
Reduce the number of unique partition keys to limit the total state entries.
Increase the compute pool’s
MAX_CFUto provide more resources.For long-running stateful PTFs, consider implementing manual state cleanup logic within the
eval()method. See Process table function limitations.
Next steps
Learn about PTF limitations in the Early Access release. See Process table function limitations.
Review the Apache Flink PTF documentation for advanced stateful PTF patterns.