User-defined Functions in Confluent Cloud for Apache Flink¶
Confluent Cloud for Apache Flink®️ supports user-defined functions (UDFs), which are extension points for running custom logic that you can’t express in the system-provided Flink SQL queries.
You can implement user-defined functions in Java, and you can use third-party libraries within a UDF. Currently, Confluent Cloud for Apache Flink supports only scalar functions, which map scalar values to a new scalar value.
Implementation considerations¶
All UDFs adhere to a few common implementation principles, which are described in the following sections.
- Function class
- Evaluation methods
- Type inference
- Named parameters
- Constant expression reduction
- Scalar functions
The following code example shows how to implement a simple scalar function and how to call it in Flink SQL and the Table API.
For the Table API, you can register the function or use it directly inline.
For SQL queries, your UDF must be registered by using the CREATE FUNCTION statement. For more information, see Create a User-defined Function.
import org.apache.flink.table.api.*;
import org.apache.flink.table.functions.ScalarFunction;
import static org.apache.flink.table.api.Expressions.*;
// define function logic
public static class SubstringFunction extends ScalarFunction {
public String eval(String s, Integer begin, Integer end) {
return s.substring(begin, end);
}
}
TableEnvironment env = TableEnvironment.create(...);
// Call the function inline without registering it in the Table API.
env.from("MyTable").select(call(SubstringFunction.class, $("myField"), 5, 12));
// Register the function.
env.createTemporarySystemFunction("SubstringFunction", SubstringFunction.class);
// Call the registered function in the Table API.
env.from("MyTable").select(call("SubstringFunction", $("myField"), 5, 12));
// Call the registered function in SQL.
env.sqlQuery("SELECT SubstringFunction(myField, 5, 12) FROM MyTable");
// Call the registered function in SQL using named parameters.
env.sqlQuery("SELECT SubstringFunction(param1 => myField, param2 => 5, param3 => 12) FROM MyTable");
Function class¶
Your implementation class must extend one of the system-provided base classes.
For example, scalar functions extend the org.apache.flink.table.functions.ScalarFunction
class.
The class must be declared public, not abstract, and must be accessible globally. Non-static inner or anonymous classes are not supported.
For storing a user-defined function in a persistent catalog, the class must have a default constructor and must be instantiable during runtime. Anonymous functions in the Table API can only be persisted if the function is not stateful, which means that it contains only transient and static fields.
Evaluation methods¶
The ScalarFunction
base class provides a set of methods that you can
override, like open()
, close()
, isDeterministic()
, and
supportsConstantFolding()
.
You define the behavior of a scalar function by implementing a custom
evaluation method, named eval
, which must be declared public
.
You can overload evaluation methods by implementing multiple methods named
eval
.
The evaluation method is called by code-generated operators during runtime.
Regular JVM method-calling semantics apply, so these implementation options are available:
- You can implement overloaded methods, like
eval(Integer)
andeval(LocalDateTime)
. - You can use var-args, like
eval(Integer...)
. - You can use object inheritance, like
eval(Object)
that takes bothLocalDateTime
andInteger
. - You can use combinations of these, like
eval(Object...)
that takes all kinds of arguments.
Internally, Table API and SQL code generation works with primitive values where
possible. To reduce overhead during runtime, a user-defined scalar function
should declare parameters and result types as primitive types instead of their
boxed classes. For example, DATE/TIME is equal to int
, and TIMESTAMP is
equal to long
.
The following code example shows a user-defined function that has overloaded
eval
methods.
import org.apache.flink.table.functions.ScalarFunction;
// function with overloaded evaluation methods
public static class SumFunction extends ScalarFunction {
public Integer eval(Integer a, Integer b) {
return a + b;
}
public Integer eval(String a, String b) {
return Integer.valueOf(a) + Integer.valueOf(b);
}
public Integer eval(Double... d) {
double result = 0;
for (double value : d)
result += value;
return (int) result;
}
}
Type inference¶
The Table API is strongly typed, so both function parameters and return types must be mapped to a data type.
The Flink planner needs information about expected types, precision, and scale. Also it needs information about how internal data structures are represented as JVM objects when calling a user-defined function.
Type inference is the process of validating input arguments and deriving data types for both the parameters and the result of a function.
User-defined functions in Flink implement automatic type-inference extraction
that derives data types from the function’s class and its evaluation methods
by using reflection. If this implicit extraction approach with reflection
fails, you can help the extraction process by annotating affected parameters,
classes, or methods with @DataTypeHint
and @FunctionHint
.
If more advanced type inference logic is required, you can override the
getTypeInference()
method explicitly in every user-defined function,
but the annotation approach is preferable, because it keeps custom type
inference logic close to the affected locations and falls back to the default
behavior for the remaining implementation.
Automatic type inference¶
Automatic type inference inspects the function’s class and evaluation methods
to derive data types for the arguments and return value of a function. The
@DataTypeHint
and @FunctionHint
annotations support automatic
extraction.
For a list of classes that implicitly map to a data type, see Data type extraction.
Data type hints¶
In some situations, you may need to support automatic extraction inline for
parameters and return types of a function. In these cases you can use data
type hints and the @DataTypeHint
annotation to define data types.
The following code example shows how to use data type hints.
import org.apache.flink.table.annotation.DataTypeHint;
import org.apache.flink.table.annotation.InputGroup;
import org.apache.flink.table.functions.ScalarFunction;
import org.apache.flink.types.Row;
// user-defined function that has overloaded evaluation methods.
public static class OverloadedFunction extends ScalarFunction {
// No hint required for type inference.
public Long eval(long a, long b) {
return a + b;
}
// Define the precision and scale of a decimal.
public @DataTypeHint("DECIMAL(12, 3)") BigDecimal eval(double a, double b) {
return BigDecimal.valueOf(a + b);
}
// Define a nested data type.
@DataTypeHint("ROW<s STRING, t TIMESTAMP_LTZ(3)>")
public Row eval(int i) {
return Row.of(String.valueOf(i), Instant.ofEpochSecond(i));
}
// Enable wildcard input and custom serialized output.
@DataTypeHint(value = "RAW", bridgedTo = ByteBuffer.class)
public ByteBuffer eval(@DataTypeHint(inputGroup = InputGroup.ANY) Object o) {
return MyUtils.serializeToByteBuffer(o);
}
}
Function hints¶
In some situations, you may want one evaluation method to handle multiple different data types, or you may have overloaded evaluation methods with a common result type that should be declared only once.
The @FunctionHint
annotation provides a mapping from argument data types
to a result data type. It enables annotating entire function classes or
evaluation methods for input, accumulator, and result data types. You can
declare one or more annotations on a class or individually for each evaluation
method for overloading function signatures.
All hint parameters are optional. If a parameter is not defined, the default reflection-based extraction is used. Hint parameters defined on a function class are inherited by all evaluation methods.
The following code example shows how to use function hints.
import org.apache.flink.table.annotation.DataTypeHint;
import org.apache.flink.table.annotation.FunctionHint;
import org.apache.flink.table.functions.TableFunction;
import org.apache.flink.types.Row;
// User-defined function with overloaded evaluation methods
// but globally defined output type.
@FunctionHint(output = @DataTypeHint("ROW<s STRING, i INT>"))
public static class OverloadedFunction extends ScalarFunction<Row> {
public void eval(int a, int b) {
collect(Row.of("Sum", a + b));
}
// Overloading arguments is still possible.
public void eval() {
collect(Row.of("Empty args", -1));
}
}
// Decouples the type inference from evaluation methods.
// The type inference is entirely determined by the function hints.
@FunctionHint(
input = {@DataTypeHint("INT"), @DataTypeHint("INT")},
output = @DataTypeHint("INT")
)
@FunctionHint(
input = {@DataTypeHint("BIGINT"), @DataTypeHint("BIGINT")},
output = @DataTypeHint("BIGINT")
)
@FunctionHint(
input = {},
output = @DataTypeHint("BOOLEAN")
)
public static class OverloadedFunction extends ScalarFunction<Object> {
// Ensure a method exists that the JVM can call.
public void eval(Object... o) {
if (o.length == 0) {
collect(false);
}
collect(o[0]);
}
}
Custom type inference¶
For most scenarios, the @DataTypeHint
and @FunctionHint
annotations
are sufficient to model user-defined functions. But by overriding the automatic
type inference defined in the getTypeInference()
method, you can create
arbitrary functions that behave like built-in system functions.
The following example shows the potential of custom type-inference logic. It uses a string literal argument to determine the result type of a function. The function takes two string arguments: the first argument represents the string to be parsed, the second argument represents the target type.
import org.apache.flink.table.api.DataTypes;
import org.apache.flink.table.catalog.DataTypeFactory;
import org.apache.flink.table.functions.ScalarFunction;
import org.apache.flink.table.types.inference.TypeInference;
import org.apache.flink.types.Row;
public static class LiteralFunction extends ScalarFunction {
public Object eval(String s, String type) {
switch (type) {
case "INT":
return Integer.valueOf(s);
case "DOUBLE":
return Double.valueOf(s);
case "STRING":
default:
return s;
}
}
// The automatic, reflection-based type inference is disabled and
// replaced by the following logic.
@Override
public TypeInference getTypeInference(DataTypeFactory typeFactory) {
return TypeInference.newBuilder()
// Specify typed arguments.
// Parameters are cast implicitly to these types, if necessary.
.typedArguments(DataTypes.STRING(), DataTypes.STRING())
// Specify a strategy for the result data type of the function.
.outputTypeStrategy(callContext -> {
if (!callContext.isArgumentLiteral(1) || callContext.isArgumentNull(1)) {
throw callContext.newValidationError("Literal expected for second argument.");
}
// Return a data type based on a literal.
final String literal = callContext.getArgumentValue(1, String.class).orElse("STRING");
switch (literal) {
case "INT":
return Optional.of(DataTypes.INT().notNull());
case "DOUBLE":
return Optional.of(DataTypes.DOUBLE().notNull());
case "STRING":
default:
return Optional.of(DataTypes.STRING());
}
})
.build();
}
}
Named parameters¶
When you call a user-define function, you can use parameter names to specify
the values of the parameters. Named parameters enable passing both the
parameter name and value to a function. This approach avoids confusion caused
by incorrect parameter order, and it improves code readability and
maintainability. Also, named parameters can omit optional parameters, which
are filled with null
by default. Use the @ArgumentHint
annotation to
specify the name, type, and whether a parameter is required or not.
The following code examples demonstrate how to use @ArgumentHint
in
different scopes.
Use the
@ArgumentHint
annotation on the parameters of theeval
method of the function:import com.sun.tracing.dtrace.ArgsAttributes; import org.apache.flink.table.annotation.ArgumentHint; import org.apache.flink.table.functions.ScalarFunction; public static class NamedParameterClass extends ScalarFunction { // Use the @ArgumentHint annotation to specify the name, type, and whether a parameter is required. public String eval(@ArgumentHint(name = "param1", isOptional = false, type = @DataTypeHint("STRING")) String s1, @ArgumentHint(name = "param2", isOptional = true, type = @DataTypeHint("INT")) Integer s2) { return s1 + ", " + s2; } }
Use the
@ArgumentHint
annotation on theeval
method of the function.import org.apache.flink.table.annotation.ArgumentHint; import org.apache.flink.table.functions.ScalarFunction; public static class NamedParameterClass extends ScalarFunction { // Use the @ArgumentHint annotation to specify the name, type, and whether a parameter is required. @FunctionHint( argument = {@ArgumentHint(name = "param1", isOptional = false, type = @DataTypeHint("STRING")), @ArgumentHint(name = "param2", isOptional = true, type = @DataTypeHint("INTEGER"))} ) public String eval(String s1, Integer s2) { return s1 + ", " + s2; } }
Use the
@ArgumentHint
annotation on the class of the function.import org.apache.flink.table.annotation.ArgumentHint; import org.apache.flink.table.functions.ScalarFunction; // Use the @ArgumentHint annotation to specify the name, type, and whether a parameter is required. @FunctionHint( argument = {@ArgumentHint(name = "param1", isOptional = false, type = @DataTypeHint("STRING")), @ArgumentHint(name = "param2", isOptional = true, type = @DataTypeHint("INTEGER"))} ) public static class NamedParameterClass extends ScalarFunction { public String eval(String s1, Integer s2) { return s1 + ", " + s2; } }
The @ArgumentHint
annotation already contains the @DataTypeHint
annotation, so you can’t use it with @DataTypeHint
in @FunctionHint
.
When applied to function parameters, @ArgumentHint
can’t be used with
@DataTypeHint
at the same time, so you should use @ArgumentHint
instead.
Named parameters take effect only when the corresponding class doesn’t contain overloaded functions and variable parameter functions, otherwise using named parameters causes an error.
Determinism¶
Every user-defined function class can declare whether it produces deterministic
results or not by overriding the isDeterministic()
method. If the function
is not purely functional, like random()
, date()
, or now()
, the
method must return false
. By default, isDeterministic()
returns
true
.
Also, the isDeterministic()
method may influence the runtime behavior.
A runtime implementation might be called at two different stages.
During planning¶
During planning, in the so-called pre-flight phase, if a function is called
with constant expressions, or if constant expressions can be derived from the
given statement, a function is pre-evaluated for constant expression reduction
and might not be executed on the cluster. In these cases, you can use the
isDeterministic()
method to disable constant expression reduction. For
example, the following calls to ABS are executed during planning:
SELECT ABS(-1) FROM t;
SELECT ABS(field) FROM t WHERE field = -1;
But the following call to ABS is not executed during planning:
SELECT ABS(field) FROM t;
During runtime¶
If a function is called with non-constant expressions or isDeterministic()
returns false
, the function is executed on the cluster.
System function determinism¶
The determinism of system (built-in) functions is immutable. According to
Apache Calcite’s SqlOperator
definition, there are two kinds of functions
which are not deterministic: dynamic functions and non-deterministic
functions.
/**
* Returns whether a call to this operator is guaranteed to always return
* the same result given the same operands; true is assumed by default.
*/
public boolean isDeterministic() {
return true;
}
/**
* Returns whether it is unsafe to cache query plans referencing this
* operator; false is assumed by default.
*/
public boolean isDynamicFunction() {
return false;
}
The isDeterministic()
method indicates the determinism of a function is
evaluated per-record during runtime if it returns false
.
The isDynamicFunction()
method implies the function can be evaluated only
at query-start if it returns true
. It will be pre-evaluated during planning
only for batch mode. For streaming mode, it is equivalent to a
non-deterministic function, because the query is executed continuously under
the abstraction of a continuous query over dynamic tables,
so the dynamic functions are also re-evaluated for each query execution, which
is equivalent to per-record in the current implementation.
The isDynamicFunction
method applies only to system functions.
The following system functions are always non-deterministic, which means they are evaluated per-record during runtime, both in batch and streaming mode.
- CURRENT_ROW_TIMESTAMP
- RAND
- RAND_INTEGER
- UNIX_TIMESTAMP
- UUID
The following system temporal functions are dynamic and are pre-evaluated during planning (query-start) for batch mode and evaluated per-record for streaming mode.
- CURRENT_DATE
- CURRENT_TIME
- CURRENT_TIMESTAMP
- LOCALTIME
- LOCALTIMESTAMP
- NOW
Constant expression reduction¶
In some cases, calls to user-defined functions with constant arguments can
be reduced and simplified. You can declare whether a function supports constant
expression reduction by overriding the supportsConstantFolding()
method. An
example could be the user-defined function call PlusOne(10)
, which could be
simplified to 11
in an expression. This optimization happens at planning
time, resulting in a plan utilizing only the reduced value. Usually, this is
desirable, so it’s enabled by default, but there are some cases in which it
should be disabled.
One case is when the function call is not deterministic, as described in the
Determinism section. Setting a function as
non-deterministic has the effect of preventing function call expression
reduction, even if supportsConstantFolding()
is true
.
A function call may also have some side effects, even if it always returns
deterministic results. This means that the correctness of the query within
Flink may permit constant expression reduction, but it may not be desired.
In this case, setting the supportsConstantFolding()
method to return
false
also has the effect of preventing constant expression reduction,
ensuring invocation at runtime.
Scalar functions¶
A user-defined scalar function maps zero, one, or multiple scalar values to a new scalar value. You can use any data type listed in Data Types as a parameter or return type of an evaluation method.
To define a scalar function, extend the ScalarFunction
base class in
org.apache.flink.table.functions
and implement one or more evaluation
methods named eval(...)
.
The following code example shows how to define your own hash code function and call it in a query.
import org.apache.flink.table.annotation.InputGroup;
import org.apache.flink.table.api.*;
import org.apache.flink.table.functions.ScalarFunction;
import static org.apache.flink.table.api.Expressions.*;
public static class HashFunction extends ScalarFunction {
// take any data type and return INT
public int eval(@DataTypeHint(inputGroup = InputGroup.ANY) Object o) {
return o.hashCode();
}
}
TableEnvironment env = TableEnvironment.create(...);
// call function "inline" without registration in Table API
env.from("MyTable").select(call(HashFunction.class, $("myField")));
// register function
env.createTemporarySystemFunction("HashFunction", HashFunction.class);
// call registered function in Table API
env.from("MyTable").select(call("HashFunction", $("myField")));
// call registered function in SQL
env.sqlQuery("SELECT HashFunction(myField) FROM MyTable");