Create a Process Table Function in Confluent Cloud for Apache Flink
This guide shows you how to create custom process table functions (PTFs) in Java and deploy them to Confluent Cloud for Apache Flink®. You build a stateless PTF and a stateful PTF, then register and call them from SQL.
Note
Process Table Functions are an Early Access Program feature in Confluent Cloud.
An Early Access feature is a component of Confluent Cloud introduced to gain feedback. This feature should be used only for evaluation and non-production testing purposes or to provide feedback to Confluent, particularly as it becomes more widely available in follow-on preview editions.
Early Access Program features are intended for evaluation use in development and testing environments only, and not for production use. Early Access Program features are provided: (a) without support; (b) “AS IS”; and (c) without indemnification, warranty, or condition of any kind. No service level commitment will apply to Early Access Program features. Early Access Program features are considered to be a Proof of Concept as defined in the Confluent Cloud Terms of Service. Confluent may discontinue providing preview releases of the Early Access Program features at any time in Confluent’s sole discretion.
For information about limitations and unsupported features in the Early Access release, see Process table function limitations.
Flink PTFs are available in all regions where UDFs are available. For more information, see UDF regional availability.
Prerequisites
Before you begin, ensure you have the following:
A Confluent Cloud account with an active organization.
A Kafka cluster with at least one topic containing streaming data.
Java 11 or later and a build tool like Maven or Gradle.
Example: Stateless threshold filter PTF
This example shows a PTF that filters rows where a numeric value exceeds a threshold:
import org.apache.flink.table.functions.ProcessTableFunction;
import org.apache.flink.table.annotation.ArgumentHint;
import org.apache.flink.table.annotation.ArgumentTrait;
import org.apache.flink.table.annotation.DataTypeHint;
import org.apache.flink.types.Row;
public class ThresholdFilter extends ProcessTableFunction<Row> {
public void eval(
@ArgumentHint(value = ArgumentTrait.SET_SEMANTIC_TABLE) Row row,
@DataTypeHint("DOUBLE") Double threshold
) {
// Get the numeric value from the first column
Double value = (Double) row.getField(0);
// Only emit rows that exceed the threshold
if (value != null && value > threshold) {
collect(row);
}
}
}
This PTF:
Takes a table and a threshold value as parameters.
Examines each row’s first column value.
Only emits rows where the value exceeds the threshold.
To use this PTF in SQL, register it with your Flink environment, as shown in Create a UDF, then call it in a query:
SELECT *
FROM ThresholdFilter(TABLE sensor_readings, 100.0);
Example: Stateful event counter PTF
The following example shows a stateful PTF that counts events per partition key and emits a running total with each event. This example uses the examples.marketplace.clicks sample data table available in Confluent Cloud.
Step 1: Write the PTF
import static org.apache.flink.table.annotation.ArgumentTrait.SET_SEMANTIC_TABLE;
import org.apache.flink.table.annotation.ArgumentHint;
import org.apache.flink.table.annotation.StateHint;
import org.apache.flink.table.functions.ProcessTableFunction;
import org.apache.flink.types.Row;
public class EventCounter extends ProcessTableFunction<EventCounter.EventCount> {
public static class EventCount {
public int user_id;
public String click_id;
public int running_count;
}
public static class CountState {
public int count = 0;
}
public void eval(
@StateHint CountState state,
@ArgumentHint(SET_SEMANTIC_TABLE) Row input
) {
state.count++;
EventCount result = new EventCount();
result.user_id = input.getFieldAs("user_id");
result.click_id = input.getFieldAs("click_id");
result.running_count = state.count;
collect(result);
}
}
This PTF:
Declares a
CountStatePOJO with@StateHint. Flink automatically persists state per partition key.Declares an
EventCountoutput POJO so Flink can infer the output schema.Increments the count each time a new event arrives for the same partition key.
Uses
getFieldAs()for type-safe, named field access from the input row.
Step 2: Package and deploy
Package the PTF as a JAR, upload the artifact from Step 1: Build the artifact to Confluent Cloud, and register it:
CREATE FUNCTION EventCounter
AS 'com.example.EventCounter'
USING JAR '<your-artifact-uri>';
Step 3: Call the PTF from SQL
SELECT *
FROM EventCounter(
input => TABLE examples.marketplace.clicks PARTITION BY user_id,
uid => 'event-counter-v1'
);
PARTITION BY user_id tells Flink to maintain separate state for each user. The uid parameter assigns a stable identifier so state survives query restarts.
Step 4: View the results
Expected output:
user_id | click_id | running_count
--------|-----------|---------------
1234 | click-42 | 1
1234 | click-55 | 2
5678 | click-87 | 1
1234 | click-91 | 3
Each user has an independent running count. When a new click arrives for user_id = 1234, the count increments from where it left off.
Note
For more PTF patterns, see the Apache Flink PTF documentation.
Example: Inactivity alert PTF with timers
The following example shows a PTF that uses event-time timers to detect inactive users. If a user does not produce a new event within 60 seconds, the PTF fires a timer and emits an alert.
Step 1: Write the PTF
import static org.apache.flink.table.annotation.ArgumentTrait.REQUIRE_ON_TIME;
import static org.apache.flink.table.annotation.ArgumentTrait.SET_SEMANTIC_TABLE;
import java.time.Duration;
import java.time.Instant;
import org.apache.flink.table.annotation.ArgumentHint;
import org.apache.flink.table.annotation.StateHint;
import org.apache.flink.table.functions.ProcessTableFunction;
import org.apache.flink.table.functions.ProcessTableFunction.TimeContext;
import org.apache.flink.types.Row;
public class InactivityAlert extends ProcessTableFunction<InactivityAlert.Alert> {
private static final Duration TIMEOUT = Duration.ofSeconds(60);
public static class Alert {
public int user_id;
public String alert_type;
}
public static class ActivityState {
public int userId = 0;
}
public void eval(
Context ctx,
@StateHint ActivityState state,
@ArgumentHint({SET_SEMANTIC_TABLE, REQUIRE_ON_TIME}) Row input
) {
state.userId = input.getFieldAs("user_id");
// Register or replace a named timer 60 seconds from now
TimeContext<Instant> timeCtx = ctx.timeContext(Instant.class);
timeCtx.registerOnTime("inactivity", timeCtx.time().plus(TIMEOUT));
}
public void onTimer(ActivityState state) {
// Timer fired — no new event arrived within the timeout
Alert alert = new Alert();
alert.user_id = state.userId;
alert.alert_type = "inactive";
collect(alert);
}
}
This PTF:
Uses
REQUIRE_ON_TIMEto declare that the PTF needs a watermarked timestamp column.Registers a named timer called
"inactivity"each time an event arrives. Because the timer name is reused, each new event replaces the previous timer, effectively resetting the inactivity clock.When 60 seconds pass without a new event for a partition key, the timer fires and
onTimer()emits an alert.
Step 2: Deploy and call from SQL
Package the PTF as a JAR, upload the artifact to Confluent Cloud (see Step 1: Build the artifact), and register it:
CREATE FUNCTION InactivityAlert
AS 'com.example.InactivityAlert'
USING JAR '<your-artifact-uri>';
SELECT *
FROM InactivityAlert(
input => TABLE user_events PARTITION BY user_id,
on_time => DESCRIPTOR(event_time),
uid => 'inactivity-v1'
);
on_time => DESCRIPTOR(event_time) tells Flink which column provides the event-time timestamp. The event_time column must have a watermark declaration on the source table.
Troubleshooting
PTF not found
If you receive an error that the PTF is not found, ensure:
You are using a Flink compute pool in a region that supports PTFs. See Supported Cloud Regions for Confluent Cloud for Apache Flink.
Your Flink version supports PTFs. PTFs are available starting with Flink 2.1.
You have spelled the function name correctly. Function names are case-sensitive.
No output produced
If the PTF query runs but produces no output:
Verify that data is flowing into the source table by querying
examples.marketplace.clicksdirectly.Check that watermarks are being generated. Add a
SELECTquery with aTUMBLEwindow to verify watermark progress.
High state size or degraded performance
If your PTF statement enters a DEGRADED state or accumulates excessive state:
Check the statement’s state size and CFU consumption in the Cloud Console or Query Profiler.
Reduce the number of unique partition keys to limit the total state entries.
Increase the compute pool’s
MAX_CFUto provide more resources.For long-running stateful PTFs, consider implementing manual state cleanup logic within the
eval()method. See Process table function limitations.
Next steps
Learn about PTF limitations in the Early Access release. See Process table function limitations.
Review the Apache Flink PTF documentation for advanced stateful PTF patterns.