User-defined Functions in Confluent Cloud for Apache Flink¶
Confluent Cloud for Apache Flink® supports user-defined functions (UDFs), which are extension points for running custom logic that you can’t express in the system-provided Flink SQL queries or with the Table API.
You can implement user-defined functions in Java, and you can use third-party libraries within a UDF. Confluent Cloud for Apache Flink supports scalar functions (UDFs), which map scalar values to a new scalar value, and table functions (UDTFs), which map multiple scalar values to multiple output rows.
- Create an example UDF: Create a User Defined Function
- Add logging to your UDFs: Enable Logging in a User Defined Function
- Availability: UDF regional availability
- Limitations: UDF limitations
- Example code: Flink UDF Java Examples
Artifacts¶
Artifacts are Java packages, or JAR files, that contain user-defined functions and all of the required dependencies. Artifacts are uploaded to Confluent Cloud and scoped to a specific region in a Confluent Cloud environment. To be used for UDF, artifacts must follow a few common implementation principles, which are described in the following sections.
To use a UDF, you must register one or more functions that reference the artifact.
Functions¶
Functions are SQL objects that reference a class in an artifact and can be used in any SQL Statement or Table API program.
Once an artifact is uploaded, you register a function by using the CREATE FUNCTION statement.
Once a function is registered, you can invoked it from any SQL statement or Table API program.
The following example shows how to register a TShirtSizingIsSmaller
function and invoke it in a SQL statement.
-- Register the function.
CREATE FUNCTION is_smaller
AS 'com.example.my.TShirtSizingIsSmaller'
USING JAR 'confluent-artifact://<artifact-id>/<version-id>';
-- Invoke the function.
SELECT IS_SMALLER ('L', 'M');
To build and upload a UDF to Confluent Cloud for Apache Flink for use in Flink SQL or the Table API, see Create a UDF.
RBAC¶
To upload artifacts, register functions, and invoke functions, you must have the FlinkDeveloper role or higher. For more information, see Grant Role-Based Access.
Scalar functions¶
A user-defined scalar function maps zero, one, or multiple scalar values to a new scalar value. You can use any data type listed in Data Types as a parameter or return type of an evaluation method.
To define a scalar function, extend the ScalarFunction
base class in
org.apache.flink.table.functions
and implement one or more evaluation
methods named eval(...)
.
The following code example shows how to define your own hash code function.
import org.apache.flink.table.annotation.InputGroup;
import org.apache.flink.table.api.*;
import org.apache.flink.table.functions.ScalarFunction;
import static org.apache.flink.table.api.Expressions.*;
public static class HashFunction extends ScalarFunction {
// take any data type and return INT
public int eval(@DataTypeHint(inputGroup = InputGroup.ANY) Object o) {
return o.hashCode();
}
}
The following example shows how to call the HashFunction
UDF in a
Flink SQL statement.
SELECT HashFunction(myField) FROM MyTable;
To build and upload a UDF to Confluent Cloud for Apache Flink for use in Flink SQL, see Create a User Defined Function.
Table functions¶
Confluent Cloud for Apache Flink also supports user-defined table functions (UDTFs), which take multiple scalar values as input arguments and return multiple rows as output, instead of a single value.
To create a user-defined table function, extend the TableFunction
base
class in org.apache.flink.table.functions
and implement one or more
of the evaluation methods, which are named eval(...)
. Input and output data
types are inferred automatically by using reflection, including the generic
argument T
of the class, for determining the output data type. Unlike
scalar functions, the evaluation method itself doesn’t have a return type.
Instead, a table function provides a collect(T)
method that’s called within
every evaluation method to emit zero, one, or more records.
In the Table API, a table function is used with the
.joinLateral(...)
or .leftOuterJoinLateral(...)
operators. The
joinLateral
operator cross-joins each row from the outer table (the table
on the left of the operator) with all rows produced by the table-valued
function (on the right side of the operator). The leftOuterJoinLateral
operator joins each row from the outer table with all rows produced by the
table-valued function and preserves outer rows, for which the table function
returns an empty table.
Note
User-defined table functions are distinct from the Table API but can be used in Table API code.
In SQL, use LATERAL TABLE(<TableFunction>)
with JOIN
or LEFT JOIN
with an ON TRUE
join condition.
The following code example shows how to implement a simple string splitting function.
import org.apache.flink.table.annotation.DataTypeHint;
import org.apache.flink.table.annotation.FunctionHint;
import org.apache.flink.table.api.*;
import org.apache.flink.table.functions.TableFunction;
import org.apache.flink.types.Row;
import static org.apache.flink.table.api.Expressions.*;
@FunctionHint(output = @DataTypeHint("ROW<word STRING, length INT>"))
public static class SplitFunction extends TableFunction<Row> {
public void eval(String str) {
for (String s : str.split(" ")) {
// use collect(...) to emit a row
collect(Row.of(s, s.length()));
}
}
}
The following example shows how to call the SplitFunction
UDTF in a
Flink SQL statement.
SELECT myField, word, length
FROM MyTable
LEFT JOIN LATERAL TABLE(SplitFunction(myField)) ON TRUE;
To build and upload a user-defined table function to Confluent Cloud for Apache Flink for use in Flink SQL, see Create a User Defined Table Function.
Implementation considerations¶
All UDFs adhere to a few common implementation principles, which are described in the following sections.
The following code example shows how to implement a simple scalar function and how to call it in Flink SQL.
For the Table API, you can register the function in code and invoke it.
For SQL queries, your UDF must be registered by using the CREATE FUNCTION statement. For more information, see Create a User-defined Function.
import org.apache.flink.table.api.*;
import org.apache.flink.table.functions.ScalarFunction;
import static org.apache.flink.table.api.Expressions.*;
// define function logic
public static class SubstringFunction extends ScalarFunction {
public String eval(String s, Integer begin, Integer end) {
return s.substring(begin, end);
}
}
The following example shows how to call the SubstringFunction
UDF in a
Flink SQL statement.
SELECT SubstringFunction('test string', 2, 5);
Function class¶
Your implementation class must extend one of the system-provided base classes.
- Scalar functions extend the
org.apache.flink.table.functions.ScalarFunction
class. - Table functions extend the
org.apache.flink.table.functions.TableFunction
class.
The class must be declared public, not abstract, and must be accessible globally. Non-static inner or anonymous classes are not supported.
Evaluation methods¶
You define the behavior of a scalar function by implementing a custom
evaluation method, named eval
, which must be declared public
.
You can overload evaluation methods by implementing multiple methods named
eval
.
The evaluation method is called by code-generated operators during runtime.
Regular JVM method-calling semantics apply, so these implementation options are available:
- You can implement overloaded methods, like
eval(Integer)
andeval(LocalDateTime)
. - You can use var-args, like
eval(Integer...)
. - You can use object inheritance, like
eval(Object)
that takes bothLocalDateTime
andInteger
. - You can use combinations of these, like
eval(Object...)
that takes all kinds of arguments.
The ScalarFunction
base class provides a set of optional methods that you
can override, open()
, close()
, isDeterministic()
, and
supportsConstantFolding()
. You can use the open()
method for
initialization work and the close()
method for cleanup work.
Internally, Table API and SQL code generation works with primitive values where
possible. To reduce overhead during runtime, a user-defined scalar function
should declare parameters and result types as primitive types instead of their
boxed classes. For example, DATE/TIME is equal to int
, and TIMESTAMP is
equal to long
.
The following code example shows a user-defined function that has overloaded
eval
methods.
import org.apache.flink.table.functions.ScalarFunction;
// function with overloaded evaluation methods
public static class SumFunction extends ScalarFunction {
public Integer eval(Integer a, Integer b) {
return a + b;
}
public Integer eval(String a, String b) {
return Integer.valueOf(a) + Integer.valueOf(b);
}
public Integer eval(Double... d) {
double result = 0;
for (double value : d)
result += value;
return (int) result;
}
}
Type inference¶
The Table API is strongly typed, so both function parameters and return types must be mapped to a data type.
The Flink planner needs information about expected types, precision, and scale. Also it needs information about how internal data structures are represented as JVM objects when calling a user-defined function.
Type inference is the process of validating input arguments and deriving data types for both the parameters and the result of a function.
User-defined functions in Flink implement automatic type-inference extraction
that derives data types from the function’s class and its evaluation methods
by using reflection. If this implicit extraction approach with reflection
fails, you can help the extraction process by annotating affected parameters,
classes, or methods with @DataTypeHint
and @FunctionHint
.
Automatic type inference¶
Automatic type inference inspects the function’s class and evaluation methods
to derive data types for the arguments and return value of a function. The
@DataTypeHint
and @FunctionHint
annotations support automatic
extraction.
For a list of classes that implicitly map to a data type, see Data type extraction.
Data type hints¶
In some situations, you may need to support automatic extraction inline for
parameters and return types of a function. In these cases you can use data
type hints and the @DataTypeHint
annotation to define data types.
The following code example shows how to use data type hints.
import org.apache.flink.table.annotation.DataTypeHint;
import org.apache.flink.table.annotation.InputGroup;
import org.apache.flink.table.functions.ScalarFunction;
import org.apache.flink.types.Row;
// user-defined function that has overloaded evaluation methods.
public static class OverloadedFunction extends ScalarFunction {
// No hint required for type inference.
public Long eval(long a, long b) {
return a + b;
}
// Define the precision and scale of a decimal.
public @DataTypeHint("DECIMAL(12, 3)") BigDecimal eval(double a, double b) {
return BigDecimal.valueOf(a + b);
}
// Define a nested data type.
@DataTypeHint("ROW<s STRING, t TIMESTAMP_LTZ(3)>")
public Row eval(int i) {
return Row.of(String.valueOf(i), Instant.ofEpochSecond(i));
}
// Enable wildcard input and custom serialized output.
@DataTypeHint(value = "RAW", bridgedTo = ByteBuffer.class)
public ByteBuffer eval(@DataTypeHint(inputGroup = InputGroup.ANY) Object o) {
return MyUtils.serializeToByteBuffer(o);
}
}
Function hints¶
In some situations, you may want one evaluation method to handle multiple different data types, or you may have overloaded evaluation methods with a common result type that should be declared only once.
The @FunctionHint
annotation provides a mapping from argument data types
to a result data type. It enables annotating entire function classes or
evaluation methods for input, accumulator, and result data types. You can
declare one or more annotations on a class or individually for each evaluation
method for overloading function signatures.
All hint parameters are optional. If a parameter is not defined, the default reflection-based extraction is used. Hint parameters defined on a function class are inherited by all evaluation methods.
The following code example shows how to use function hints.
import org.apache.flink.table.annotation.DataTypeHint;
import org.apache.flink.table.annotation.FunctionHint;
import org.apache.flink.table.functions.TableFunction;
import org.apache.flink.types.Row;
// User-defined function with overloaded evaluation methods
// but globally defined output type.
@FunctionHint(output = @DataTypeHint("ROW<s STRING, i INT>"))
public static class OverloadedFunction extends ScalarFunction<Row> {
public void eval(int a, int b) {
collect(Row.of("Sum", a + b));
}
// Overloading arguments is still possible.
public void eval() {
collect(Row.of("Empty args", -1));
}
}
// Decouples the type inference from evaluation methods.
// The type inference is entirely determined by the function hints.
@FunctionHint(
input = {@DataTypeHint("INT"), @DataTypeHint("INT")},
output = @DataTypeHint("INT")
)
@FunctionHint(
input = {@DataTypeHint("BIGINT"), @DataTypeHint("BIGINT")},
output = @DataTypeHint("BIGINT")
)
@FunctionHint(
input = {},
output = @DataTypeHint("BOOLEAN")
)
public static class OverloadedFunction extends ScalarFunction<Object> {
// Ensure a method exists that the JVM can call.
public void eval(Object... o) {
if (o.length == 0) {
collect(false);
}
collect(o[0]);
}
}
Named parameters¶
When you call a user-define function, you can use parameter names to specify
the values of the parameters. Named parameters enable passing both the
parameter name and value to a function. This approach avoids confusion caused
by incorrect parameter order, and it improves code readability and
maintainability. Also, named parameters can omit optional parameters, which
are filled with null
by default. Use the @ArgumentHint
annotation to
specify the name, type, and whether a parameter is required or not.
The following code examples demonstrate how to use @ArgumentHint
in
different scopes.
Use the
@ArgumentHint
annotation on the parameters of theeval
method of the function:import com.sun.tracing.dtrace.ArgsAttributes; import org.apache.flink.table.annotation.ArgumentHint; import org.apache.flink.table.functions.ScalarFunction; public static class NamedParameterClass extends ScalarFunction { // Use the @ArgumentHint annotation to specify the name, type, and whether a parameter is required. public String eval(@ArgumentHint(name = "param1", isOptional = false, type = @DataTypeHint("STRING")) String s1, @ArgumentHint(name = "param2", isOptional = true, type = @DataTypeHint("INT")) Integer s2) { return s1 + ", " + s2; } }
Use the
@ArgumentHint
annotation on theeval
method of the function.import org.apache.flink.table.annotation.ArgumentHint; import org.apache.flink.table.functions.ScalarFunction; public static class NamedParameterClass extends ScalarFunction { // Use the @ArgumentHint annotation to specify the name, type, and whether a parameter is required. @FunctionHint( argument = {@ArgumentHint(name = "param1", isOptional = false, type = @DataTypeHint("STRING")), @ArgumentHint(name = "param2", isOptional = true, type = @DataTypeHint("INTEGER"))} ) public String eval(String s1, Integer s2) { return s1 + ", " + s2; } }
Use the
@ArgumentHint
annotation on the class of the function.import org.apache.flink.table.annotation.ArgumentHint; import org.apache.flink.table.functions.ScalarFunction; // Use the @ArgumentHint annotation to specify the name, type, and whether a parameter is required. @FunctionHint( argument = {@ArgumentHint(name = "param1", isOptional = false, type = @DataTypeHint("STRING")), @ArgumentHint(name = "param2", isOptional = true, type = @DataTypeHint("INTEGER"))} ) public static class NamedParameterClass extends ScalarFunction { public String eval(String s1, Integer s2) { return s1 + ", " + s2; } }
The @ArgumentHint
annotation already contains the @DataTypeHint
annotation, so you can’t use it with @DataTypeHint
in @FunctionHint
.
When applied to function parameters, @ArgumentHint
can’t be used with
@DataTypeHint
at the same time, so you should use @ArgumentHint
instead.
Named parameters take effect only when the corresponding class doesn’t contain overloaded functions and variable parameter functions, otherwise using named parameters causes an error.
Determinism¶
Every user-defined function class can declare whether it produces deterministic
results or not by overriding the isDeterministic()
method. If the function
is not purely functional, like random()
, date()
, or now()
, the
method must return false
. By default, isDeterministic()
returns
true
.
Also, the isDeterministic()
method may influence the runtime behavior.
A runtime implementation might be called at two different stages.
During planning¶
During planning, in the so-called pre-flight phase, if a function is called
with constant expressions, or if constant expressions can be derived from the
given statement, a function is pre-evaluated for constant expression reduction
and might not be executed on the cluster. In these cases, you can use the
isDeterministic()
method to disable constant expression reduction. For
example, the following calls to ABS are executed during planning:
SELECT ABS(-1) FROM t;
SELECT ABS(field) FROM t WHERE field = -1;
But the following call to ABS is not executed during planning:
SELECT ABS(field) FROM t;
During runtime¶
If a function is called with non-constant expressions or isDeterministic()
returns false
, the function is executed on the cluster.
System function determinism¶
The determinism of system (built-in) functions is immutable. According to
Apache Calcite’s SqlOperator
definition, there are two kinds of functions
which are not deterministic: dynamic functions and non-deterministic
functions.
/**
* Returns whether a call to this operator is guaranteed to always return
* the same result given the same operands; true is assumed by default.
*/
public boolean isDeterministic() {
return true;
}
/**
* Returns whether it is unsafe to cache query plans referencing this
* operator; false is assumed by default.
*/
public boolean isDynamicFunction() {
return false;
}
The isDeterministic()
method indicates the determinism of a function is
evaluated per-record during runtime if it returns false
.
The isDynamicFunction()
method implies the function can be evaluated only
at query-start if it returns true
. It will be pre-evaluated during planning
only for batch mode. For streaming mode, it is equivalent to a
non-deterministic function, because the query is executed continuously under
the abstraction of a continuous query over dynamic tables,
so the dynamic functions are also re-evaluated for each query execution, which
is equivalent to per-record in the current implementation.
The isDynamicFunction
method applies only to system functions.
The following system functions are always non-deterministic, which means they are evaluated per-record during runtime, both in batch and streaming mode.
- CURRENT_ROW_TIMESTAMP
- RAND
- RAND_INTEGER
- UNIX_TIMESTAMP
- UUID
The following system temporal functions are dynamic and are pre-evaluated during planning (query-start) for batch mode and evaluated per-record for streaming mode.
- CURRENT_DATE
- CURRENT_TIME
- CURRENT_TIMESTAMP
- LOCALTIME
- LOCALTIMESTAMP
- NOW
UDF regional availability¶
Flink UDFs are available in the following AWS regions.
- ap-northeast-2
- ap-south-1
- ap-southeast-1
- ap-southeast-2
- ca-central-1
- eu-central-1
- eu-central-2
- eu-west-1
- eu-west-2
- sa-east-1
- us-east-1
- us-east-2
- us-west-2
UDF limitations¶
- Confluent CLI version 4.13.0 or later is required.
- External network calls from UDFs are not supported.
- JDK 17 is the latest supported Java version for uploaded JAR files.
- Each Flink statement can have no more than 10 UDFs.
- Each organization/cloud/region/environment can have no more than 100 Flink artifacts.
- The size limit of each artifact is 100 MB.
- Aggregates are not supported.
- Table aggregates are not supported.
- Temporary functions are not supported.
- The ALTER FUNCTION statement is not supported.
- Vararg functions are not supported.
- User-defined structured types are not supported.
- Python is not supported.
- Both inputs and outputs of the UDF have a row-size limit of 4MB.
- Custom type inference is not supported.
- Constant expression reduction is not supported.
- The UDF feature is optimized for streaming processing, so the initial query may be slow, but after the initial query, a UDF runs with low latency.
UDF logging limitations¶
- Public Kafka destinations only: Private networked cluster types aren’t supported as logging destinations.
- Log4j logging only: External UDF loggers can be composed only with the Apache Log4j logging framework.
- Burst rate to 1000/s: UDF logging supports up to 1000 log events per second for each UDF during a short burst of high activity. This helps to optimize performance and to reduce noise in logs. Events that exceed the maximum rate are dropped.