Create a Process Table Function in Confluent Cloud for Apache Flink

Extend Flink SQL in Confluent Cloud for Apache Flink® with custom row-by-row table transformations by writing a process table function (PTF) in Java and deploying it as a user-defined function. This walkthrough builds one stateless PTF and one stateful PTF, packages them, and registers and calls them from SQL.

Note

Process Table Functions are an Open Preview feature in Confluent Cloud.

A Preview feature is a Confluent Cloud component that is being introduced to gain early feedback from developers. Preview features can be used for evaluation and non-production testing purposes or to provide feedback to Confluent. The warranty, SLA, and Support Services provisions of your agreement with Confluent do not apply to Preview features. Confluent may discontinue providing preview releases of the Preview features at any time in Confluent’s’ sole discretion.

For information about limitations and unsupported features in the Open Preview release, see Process table function limitations.

Flink PTFs are available in all regions that support UDFs. For more information, see UDF regional availability.

Prerequisites

Before you begin, ensure you have the following:

  • A Confluent Cloud account with an active organization.

  • A Kafka cluster with at least one topic containing streaming data.

  • Java 11 or later and a build tool like Maven or Gradle.

Example: Stateless threshold filter PTF

This example shows a PTF that emits a boolean flag indicating whether each row’s numeric value column exceeds a threshold. The trait PASS_COLUMNS_THROUGH copies the input columns into the output automatically, so the function only declares the new boolean output column. The column name in the sink is determined by the sink table’s schema during positional INSERT binding — for example, the sink can name it above_threshold for readability:

import static org.apache.flink.table.annotation.ArgumentTrait.PASS_COLUMNS_THROUGH;
import static org.apache.flink.table.annotation.ArgumentTrait.SET_SEMANTIC_TABLE;

import org.apache.flink.table.annotation.ArgumentHint;
import org.apache.flink.table.functions.ProcessTableFunction;
import org.apache.flink.types.Row;

public class ThresholdFilter extends ProcessTableFunction<Boolean> {

    public void eval(
        @ArgumentHint({SET_SEMANTIC_TABLE, PASS_COLUMNS_THROUGH}) Row input,
        Double threshold
    ) {
        Double value = input.getFieldAs("value");
        collect(value != null && value > threshold);
    }
}

This PTF:

  • Takes a table and a threshold value as parameters.

  • Reads the value column from each input row.

  • Emits one boolean per row, tagged onto the original columns via PASS_COLUMNS_THROUGH.

To use this PTF in SQL, register it with your Flink environment, as shown in Create a UDF, then call it in a query. The input table must have a numeric value column:

SELECT *
FROM ThresholdFilter(
    input     => TABLE sensor_readings PARTITION BY sensor_id,
    threshold => 100.0
);

Example: Trailing median PTF

The following example shows a stateful PTF that computes a trailing median of temperature readings per sensor. Each input row carries one temperature reading; the PTF buffers the last N readings per sensor in state and emits the current reading together with the median of the buffer.

This pattern adapts the Process Table Functions tutorial on developer.confluent.io.

Step 1: Write the PTF

import static org.apache.flink.table.annotation.ArgumentTrait.SET_SEMANTIC_TABLE;

import java.util.ArrayList;
import java.util.Collections;
import java.util.List;

import org.apache.flink.table.annotation.ArgumentHint;
import org.apache.flink.table.annotation.DataTypeHint;
import org.apache.flink.table.annotation.StateHint;
import org.apache.flink.table.functions.ProcessTableFunction;
import org.apache.flink.types.Row;

@DataTypeHint("ROW<temperature DOUBLE, median DOUBLE>")
public class Median extends ProcessTableFunction<Row> {

    public static class TempsState {
        public List<Double> temps = new ArrayList<>();
    }

    public void eval(
        @StateHint TempsState trailingTemps,
        @ArgumentHint(SET_SEMANTIC_TABLE) Row input,
        Integer numTrailing
    ) {
        Double temperature = input.getFieldAs("temperature");

        trailingTemps.temps.add(temperature);
        while (trailingTemps.temps.size() > numTrailing) {
            trailingTemps.temps.remove(0);
        }

        List<Double> sorted = new ArrayList<>(trailingTemps.temps);
        Collections.sort(sorted);
        int n = sorted.size();
        double median = (n % 2 == 1)
            ? sorted.get(n / 2)
            : (sorted.get(n / 2 - 1) + sorted.get(n / 2)) / 2.0;

        collect(Row.of(temperature, median));
    }
}

This PTF:

  • Declares the output schema with a class-level @DataTypeHint. The partition key (sensor_id) is auto-prepended at the sink, so the PTF emits only temperature and median.

  • Holds a sliding window of the last numTrailing readings per sensor in a TempsState POJO, registered through @StateHint.

  • Accepts numTrailing as a scalar Integer argument so each query can choose its own window size.

  • Uses getFieldAs() for type-safe, named field access from the input row.

Step 2: Package and deploy

Package the PTF as a JAR, upload the artifact from Step 1: Build the artifact to Confluent Cloud, and register it:

CREATE FUNCTION Median
AS 'com.example.Median'
USING JAR '<your-artifact-uri>';

Your PTF JAR can include any third-party libraries it needs (within the artifact size limit); they ship inside the uploaded artifact and are loaded alongside your function class.

Step 3: Call the PTF from SQL

Create a source table with a temperature column and a watermarked timestamp, then call Median over it:

CREATE TABLE temperature_readings (
  sensor_id INT,
  temperature DOUBLE,
  ts TIMESTAMP(3),
  WATERMARK FOR ts AS ts
);

SELECT *
FROM Median(
  input        => TABLE temperature_readings PARTITION BY sensor_id,
  numTrailing  => 3,
  uid          => 'median-v1'
);

PARTITION BY sensor_id tells Flink to maintain a separate trailing window per sensor. The uid parameter assigns a stable identifier so state survives query restarts.

Step 3a: Call the PTF from the Table API

You can call the same PTF from the Flink Table API. Partition the source first, then chain .process() with the PTF class and any scalar arguments:

import static org.apache.flink.table.api.Expressions.$;
import static org.apache.flink.table.api.Expressions.lit;

tableEnv.from("temperature_readings")
    .partitionBy($("sensor_id"))
    .process(Median.class, lit(3).asArgument("numTrailing"))
    .execute();

Step 4: View the results

For a sensor whose readings arrive as 20.0, 22.0, 21.0, 24.0, 23.0, the PTF emits the current reading and the median of the trailing window:

sensor_id | temperature | median
----------|-------------|--------
1         | 20.0        | 20.0
1         | 22.0        | 21.0
1         | 21.0        | 21.0
1         | 24.0        | 22.0
1         | 23.0        | 23.0

Note

For more PTF patterns, see Common PTF patterns and the Apache Flink PTF documentation.

Example: Inactivity alert PTF with timers

The following example shows a PTF that uses event-time timers to detect inactive users. If a user does not produce a new event within 60 seconds, the PTF fires a timer and emits an alert.

Step 1: Write the PTF

import static org.apache.flink.table.annotation.ArgumentTrait.REQUIRE_ON_TIME;
import static org.apache.flink.table.annotation.ArgumentTrait.SET_SEMANTIC_TABLE;

import java.time.Duration;
import java.time.Instant;

import org.apache.flink.table.annotation.ArgumentHint;
import org.apache.flink.table.annotation.DataTypeHint;
import org.apache.flink.table.annotation.StateHint;
import org.apache.flink.table.functions.ProcessTableFunction;
import org.apache.flink.table.functions.ProcessTableFunction.TimeContext;
import org.apache.flink.types.Row;

// Output excludes the partition key — Flink auto-prepends user_id from PARTITION BY.
@DataTypeHint("ROW<alert_type STRING, idle_ms BIGINT>")
public class InactivityAlert extends ProcessTableFunction<Row> {

    private static final Duration TIMEOUT = Duration.ofSeconds(60);

    public static class ActivityState {
        public long lastSeenMs = 0L;
    }

    public void eval(
        Context ctx,
        @StateHint ActivityState state,
        @ArgumentHint({SET_SEMANTIC_TABLE, REQUIRE_ON_TIME}) Row input
    ) {
        state.lastSeenMs = System.currentTimeMillis();

        // Cancel the previous deadline, then schedule a fresh one. Re-registering
        // the same name has the same effect, but the explicit clear makes the
        // "this event reset the inactivity clock" intent obvious to readers.
        TimeContext<Instant> timeCtx = ctx.timeContext(Instant.class);
        timeCtx.clearTimer("inactivity");
        timeCtx.registerOnTime("inactivity", timeCtx.time().plus(TIMEOUT));
    }

    public void onTimer(ActivityState state) {
        // Timer fired — no new event arrived within the timeout. Report
        // how long the partition has been idle.
        long idleMs = System.currentTimeMillis() - state.lastSeenMs;
        collect(Row.of("inactive", idleMs));
    }
}

This PTF:

  • Uses REQUIRE_ON_TIME to declare that the PTF needs a watermarked timestamp column.

  • Registers a named timer called "inactivity" each time an event arrives. Because the PTF reuses the timer name, each new event replaces the previous timer, effectively resetting the inactivity clock.

  • When 60 seconds pass without a new event for a partition key, the timer fires and onTimer() emits an alert row.

Step 2: Deploy and call from SQL

Package the PTF as a JAR, upload the artifact to Confluent Cloud (see Step 1: Build the artifact), and register it:

CREATE FUNCTION InactivityAlert
AS 'com.example.InactivityAlert'
USING JAR '<your-artifact-uri>';

SELECT *
FROM InactivityAlert(
    input => TABLE user_events PARTITION BY user_id,
    on_time => DESCRIPTOR(event_time),
    uid => 'inactivity-v1'
);

on_time => DESCRIPTOR(event_time) tells Flink which column provides the event-time timestamp. The event_time column must have a watermark declaration on the source table.

Example: Bounded state with TTL

The following example shows how to use @StateHint(ttl=…) to bound the size of long-running stateful PTFs. The PTF counts events per partition key, but the per-key counter is automatically reset when the key is idle for 30 seconds.

Step 1: Write the PTF

import static org.apache.flink.table.annotation.ArgumentTrait.SET_SEMANTIC_TABLE;

import org.apache.flink.table.annotation.ArgumentHint;
import org.apache.flink.table.annotation.DataTypeHint;
import org.apache.flink.table.annotation.StateHint;
import org.apache.flink.table.functions.ProcessTableFunction;
import org.apache.flink.types.Row;

// Output excludes the partition key — Flink auto-prepends it from PARTITION BY.
@DataTypeHint("ROW<counter BIGINT>")
public class TtlCountdown extends ProcessTableFunction<Row> {

    public static class CountState {
        public long counter = 0L;
    }

    public void eval(
        @StateHint(ttl = "30 seconds") CountState state,
        @ArgumentHint(SET_SEMANTIC_TABLE) Row input
    ) {
        state.counter++;
        collect(Row.of(state.counter));
    }
}

The ttl = "30 seconds" attribute tells Flink to garbage-collect the CountState entry for any partition key that receives no read or write for 30 consecutive seconds. The next event for that key sees a fresh CountState with counter = 0.

Step 2: Package and deploy

CREATE FUNCTION TtlCountdown
AS 'com.example.TtlCountdown'
USING JAR '<your-artifact-uri>';

Step 3: Call the PTF from SQL

SELECT *
FROM TtlCountdown(
    input => TABLE examples.marketplace.clicks PARTITION BY user_id,
    uid => 'ttl-countdown-v1'
);

Step 4: View the results

For a steady stream of clicks, each user’s counter grows monotonically. When a user goes quiet for 30 seconds, the next click for that user starts again from 1:

user_id | counter
--------|---------
1234    | 1
1234    | 2
1234    | 3
    (30 s of no activity for user 1234)
1234    | 1

For more information about state TTL, see State TTL.

Example: Multi-table control stream

The following example shows a PTF that consumes a low-cardinality control stream and a high-volume event stream and tags each event with the active per-tenant mode. The control stream mutates per-tenant behavior; the event stream carries the tagged output.

Step 1: Write the PTF

import static org.apache.flink.table.annotation.ArgumentTrait.SET_SEMANTIC_TABLE;

import org.apache.flink.table.annotation.ArgumentHint;
import org.apache.flink.table.annotation.DataTypeHint;
import org.apache.flink.table.annotation.StateHint;
import org.apache.flink.table.functions.ProcessTableFunction;
import org.apache.flink.types.Row;

// Output excludes the partition key — Flink auto-prepends tenant_id from PARTITION BY.
@DataTypeHint("ROW<payload STRING, active_mode STRING>")
public class MtControlStream extends ProcessTableFunction<Row> {

    public static class TenantConfig {
        public String activeMode = "default";
    }

    public void eval(
        @StateHint TenantConfig state,
        @ArgumentHint(SET_SEMANTIC_TABLE) Row events,
        @ArgumentHint(SET_SEMANTIC_TABLE) Row control
    ) {
        // Control rows update the per-tenant active mode; no row is emitted.
        if (control != null) {
            String mode = control.getFieldAs("mode");
            if (mode != null) {
                state.activeMode = mode;
            }
            return;
        }
        // Event rows are tagged with the current mode.
        if (events != null) {
            String payload = events.getFieldAs("payload");
            collect(Row.of(payload, state.activeMode));
        }
    }
}

Step 2: Package and deploy

CREATE FUNCTION MtControlStream
AS 'com.example.MtControlStream'
USING JAR '<your-artifact-uri>';

Step 3: Call the PTF from SQL

SELECT *
FROM MtControlStream(
    events  => TABLE events  PARTITION BY tenant_id,
    control => TABLE control PARTITION BY tenant_id,
    uid     => 'mt-control-stream-v1'
);

Both inputs must be co-partitioned by the same key — tenant_id in this example — so each PTF instance sees the events and control rows for a single tenant.

Step 4: View the results

After a control row for tenant T1 sets mode = 'turbo', subsequent events for T1 are tagged 'turbo'. Tenant T2 is unaffected:

tenant_id | payload | active_mode
----------|---------|-------------
T1        | p1      | turbo
T1        | p2      | turbo
T2        | p3      | default
T2        | p4      | default

For more information about multi-table PTFs, see Multi-table inputs.

Example: Pass-through columns

The following example shows a PTF that enriches every input row with a running_count value. Because the input is declared with PASS_COLUMNS_THROUGH, Flink splices the input columns into the output automatically — the PTF only declares the new column.

Step 1: Write the PTF

import static org.apache.flink.table.annotation.ArgumentTrait.PASS_COLUMNS_THROUGH;
import static org.apache.flink.table.annotation.ArgumentTrait.SET_SEMANTIC_TABLE;

import org.apache.flink.table.annotation.ArgumentHint;
import org.apache.flink.table.annotation.StateHint;
import org.apache.flink.table.functions.ProcessTableFunction;
import org.apache.flink.types.Row;

public class PassThroughEnrich extends ProcessTableFunction<Long> {

    public static class CountState {
        public long count = 0L;
    }

    public void eval(
        @StateHint CountState state,
        @ArgumentHint({SET_SEMANTIC_TABLE, PASS_COLUMNS_THROUGH}) Row input
    ) {
        state.count++;
        collect(state.count);
    }
}

Because the PTF extends ProcessTableFunction<Long>, Flink infers a single-column BIGINT output. The trait PASS_COLUMNS_THROUGH then prepends every column of the input table to that one column in the sink.

Step 2: Package and deploy

CREATE FUNCTION PassThroughEnrich
AS 'com.example.PassThroughEnrich'
USING JAR '<your-artifact-uri>';

Step 3: Call the PTF from SQL

SELECT *
FROM PassThroughEnrich(
    input => TABLE examples.marketplace.clicks PARTITION BY user_id,
    uid   => 'pass-through-enrich-v1'
);

Step 4: View the results

Every input column is preserved and a single BIGINT column from collect() is appended. The column name in the sink is determined by the sink table’s schema during positional INSERT binding — for example, the sink can name it running_count for readability. The partition key (user_id) appears exactly once: Flink does not duplicate the partition column when PASS_COLUMNS_THROUGH is set:

user_id | click_id  | url      | running_count
--------|-----------|----------|---------------
1234    | click-42  | /home    | 1
1234    | click-55  | /shop    | 2
5678    | click-87  | /home    | 1

For more information about pass-through PTFs, see Pass-through columns.

Troubleshooting

PTF not found

If you receive an error that the PTF is not found, ensure:

No output produced

If the PTF query runs but produces no output:

  • Verify that data is flowing into the source table by querying examples.marketplace.clicks directly.

  • Check whether Flink is generating watermarks. Add a SELECT query with a TUMBLE window to verify watermark progress.

High state size or degraded performance

If your PTF statement enters a DEGRADED state or accumulates excessive state:

  • Check the statement’s state size and CFU consumption in the Cloud Console or Query Profiler.

  • Reduce the number of unique partition keys to limit the total state entries.

  • Increase the compute pool’s MAX_CFU to provide more resources.

  • For long-running stateful PTFs, consider implementing manual state cleanup logic within the eval() method. See Process table function limitations.

Next steps