User-defined Functions in Confluent Cloud for Apache Flink

Confluent Cloud for Apache Flink®️ supports user-defined functions (UDFs), which are extension points for running custom logic that you can’t express in the system-provided Flink SQL queries.

You can implement user-defined functions in Java, and you can use third-party libraries within a UDF. Currently, Confluent Cloud for Apache Flink supports only scalar functions, which map scalar values to a new scalar value.

Implementation considerations

All UDFs adhere to a few common implementation principles, which are described in the following sections.

The following code example shows how to implement a simple scalar function and how to call it in Flink SQL and the Table API.

For the Table API, you can register the function or use it directly inline.

For SQL queries, your UDF must be registered by using the CREATE FUNCTION statement. For more information, see Create a User-defined Function.

import org.apache.flink.table.api.*;
import org.apache.flink.table.functions.ScalarFunction;
import static org.apache.flink.table.api.Expressions.*;

// define function logic
public static class SubstringFunction extends ScalarFunction {
  public String eval(String s, Integer begin, Integer end) {
    return s.substring(begin, end);
  }
}

TableEnvironment env = TableEnvironment.create(...);

// Call the function inline without registering it in the Table API.
env.from("MyTable").select(call(SubstringFunction.class, $("myField"), 5, 12));

// Register the function.
env.createTemporarySystemFunction("SubstringFunction", SubstringFunction.class);

// Call the registered function in the Table API.
env.from("MyTable").select(call("SubstringFunction", $("myField"), 5, 12));

// Call the registered function in SQL.
env.sqlQuery("SELECT SubstringFunction(myField, 5, 12) FROM MyTable");

// Call the registered function in SQL using named parameters.
env.sqlQuery("SELECT SubstringFunction(param1 => myField, param2 => 5, param3 => 12) FROM MyTable");

Function class

Your implementation class must extend one of the system-provided base classes. For example, scalar functions extend the org.apache.flink.table.functions.ScalarFunction class.

The class must be declared public, not abstract, and must be accessible globally. Non-static inner or anonymous classes are not supported.

For storing a user-defined function in a persistent catalog, the class must have a default constructor and must be instantiable during runtime. Anonymous functions in the Table API can only be persisted if the function is not stateful, which means that it contains only transient and static fields.

Evaluation methods

The ScalarFunction base class provides a set of methods that you can override, like open(), close(), isDeterministic(), and supportsConstantFolding().

You define the behavior of a scalar function by implementing a custom evaluation method, named eval, which must be declared public. You can overload evaluation methods by implementing multiple methods named eval.

The evaluation method is called by code-generated operators during runtime.

Regular JVM method-calling semantics apply, so these implementation options are available:

  • You can implement overloaded methods, like eval(Integer) and eval(LocalDateTime).
  • You can use var-args, like eval(Integer...).
  • You can use object inheritance, like eval(Object) that takes both LocalDateTime and Integer.
  • You can use combinations of these, like eval(Object...) that takes all kinds of arguments.

Internally, Table API and SQL code generation works with primitive values where possible. To reduce overhead during runtime, a user-defined scalar function should declare parameters and result types as primitive types instead of their boxed classes. For example, DATE/TIME is equal to int, and TIMESTAMP is equal to long.

The following code example shows a user-defined function that has overloaded eval methods.

import org.apache.flink.table.functions.ScalarFunction;

// function with overloaded evaluation methods
public static class SumFunction extends ScalarFunction {

  public Integer eval(Integer a, Integer b) {
    return a + b;
  }

  public Integer eval(String a, String b) {
    return Integer.valueOf(a) + Integer.valueOf(b);
  }

  public Integer eval(Double... d) {
    double result = 0;
    for (double value : d)
      result += value;
    return (int) result;
  }
}

Type inference

The Table API is strongly typed, so both function parameters and return types must be mapped to a data type.

The Flink planner needs information about expected types, precision, and scale. Also it needs information about how internal data structures are represented as JVM objects when calling a user-defined function.

Type inference is the process of validating input arguments and deriving data types for both the parameters and the result of a function.

User-defined functions in Flink implement automatic type-inference extraction that derives data types from the function’s class and its evaluation methods by using reflection. If this implicit extraction approach with reflection fails, you can help the extraction process by annotating affected parameters, classes, or methods with @DataTypeHint and @FunctionHint.

If more advanced type inference logic is required, you can override the getTypeInference() method explicitly in every user-defined function, but the annotation approach is preferable, because it keeps custom type inference logic close to the affected locations and falls back to the default behavior for the remaining implementation.

Automatic type inference

Automatic type inference inspects the function’s class and evaluation methods to derive data types for the arguments and return value of a function. The @DataTypeHint and @FunctionHint annotations support automatic extraction.

For a list of classes that implicitly map to a data type, see Data type extraction.

Data type hints

In some situations, you may need to support automatic extraction inline for parameters and return types of a function. In these cases you can use data type hints and the @DataTypeHint annotation to define data types.

The following code example shows how to use data type hints.

import org.apache.flink.table.annotation.DataTypeHint;
import org.apache.flink.table.annotation.InputGroup;
import org.apache.flink.table.functions.ScalarFunction;
import org.apache.flink.types.Row;

// user-defined function that has overloaded evaluation methods.
public static class OverloadedFunction extends ScalarFunction {

  // No hint required for type inference.
  public Long eval(long a, long b) {
    return a + b;
  }

  // Define the precision and scale of a decimal.
  public @DataTypeHint("DECIMAL(12, 3)") BigDecimal eval(double a, double b) {
    return BigDecimal.valueOf(a + b);
  }

  // Define a nested data type.
  @DataTypeHint("ROW<s STRING, t TIMESTAMP_LTZ(3)>")
  public Row eval(int i) {
    return Row.of(String.valueOf(i), Instant.ofEpochSecond(i));
  }

  // Enable wildcard input and custom serialized output.
  @DataTypeHint(value = "RAW", bridgedTo = ByteBuffer.class)
  public ByteBuffer eval(@DataTypeHint(inputGroup = InputGroup.ANY) Object o) {
    return MyUtils.serializeToByteBuffer(o);
  }
}

Function hints

In some situations, you may want one evaluation method to handle multiple different data types, or you may have overloaded evaluation methods with a common result type that should be declared only once.

The @FunctionHint annotation provides a mapping from argument data types to a result data type. It enables annotating entire function classes or evaluation methods for input, accumulator, and result data types. You can declare one or more annotations on a class or individually for each evaluation method for overloading function signatures.

All hint parameters are optional. If a parameter is not defined, the default reflection-based extraction is used. Hint parameters defined on a function class are inherited by all evaluation methods.

The following code example shows how to use function hints.

import org.apache.flink.table.annotation.DataTypeHint;
import org.apache.flink.table.annotation.FunctionHint;
import org.apache.flink.table.functions.TableFunction;
import org.apache.flink.types.Row;

// User-defined function with overloaded evaluation methods
// but globally defined output type.
@FunctionHint(output = @DataTypeHint("ROW<s STRING, i INT>"))
public static class OverloadedFunction extends ScalarFunction<Row> {

  public void eval(int a, int b) {
    collect(Row.of("Sum", a + b));
  }

  // Overloading arguments is still possible.
  public void eval() {
    collect(Row.of("Empty args", -1));
  }
}

// Decouples the type inference from evaluation methods.
// The type inference is entirely determined by the function hints.
@FunctionHint(
  input = {@DataTypeHint("INT"), @DataTypeHint("INT")},
  output = @DataTypeHint("INT")
)
@FunctionHint(
  input = {@DataTypeHint("BIGINT"), @DataTypeHint("BIGINT")},
  output = @DataTypeHint("BIGINT")
)
@FunctionHint(
  input = {},
  output = @DataTypeHint("BOOLEAN")
)

public static class OverloadedFunction extends ScalarFunction<Object> {

  // Ensure a method exists that the JVM can call.
  public void eval(Object... o) {
    if (o.length == 0) {
      collect(false);
    }
    collect(o[0]);
  }
}

Custom type inference

For most scenarios, the @DataTypeHint and @FunctionHint annotations are sufficient to model user-defined functions. But by overriding the automatic type inference defined in the getTypeInference() method, you can create arbitrary functions that behave like built-in system functions.

The following example shows the potential of custom type-inference logic. It uses a string literal argument to determine the result type of a function. The function takes two string arguments: the first argument represents the string to be parsed, the second argument represents the target type.

import org.apache.flink.table.api.DataTypes;
import org.apache.flink.table.catalog.DataTypeFactory;
import org.apache.flink.table.functions.ScalarFunction;
import org.apache.flink.table.types.inference.TypeInference;
import org.apache.flink.types.Row;

public static class LiteralFunction extends ScalarFunction {
  public Object eval(String s, String type) {
    switch (type) {
      case "INT":
        return Integer.valueOf(s);
      case "DOUBLE":
        return Double.valueOf(s);
      case "STRING":
      default:
        return s;
    }
  }

  // The automatic, reflection-based type inference is disabled and
  // replaced by the following logic.
  @Override
  public TypeInference getTypeInference(DataTypeFactory typeFactory) {
    return TypeInference.newBuilder()
      // Specify typed arguments.
      // Parameters are cast implicitly to these types, if necessary.
      .typedArguments(DataTypes.STRING(), DataTypes.STRING())
      // Specify a strategy for the result data type of the function.
      .outputTypeStrategy(callContext -> {
        if (!callContext.isArgumentLiteral(1) || callContext.isArgumentNull(1)) {
          throw callContext.newValidationError("Literal expected for second argument.");
        }
        // Return a data type based on a literal.
        final String literal = callContext.getArgumentValue(1, String.class).orElse("STRING");
        switch (literal) {
          case "INT":
            return Optional.of(DataTypes.INT().notNull());
          case "DOUBLE":
            return Optional.of(DataTypes.DOUBLE().notNull());
          case "STRING":
          default:
            return Optional.of(DataTypes.STRING());
        }
      })
      .build();
  }
}

Named parameters

When you call a user-define function, you can use parameter names to specify the values of the parameters. Named parameters enable passing both the parameter name and value to a function. This approach avoids confusion caused by incorrect parameter order, and it improves code readability and maintainability. Also, named parameters can omit optional parameters, which are filled with null by default. Use the @ArgumentHint annotation to specify the name, type, and whether a parameter is required or not.

The following code examples demonstrate how to use @ArgumentHint in different scopes.

  1. Use the @ArgumentHint annotation on the parameters of the eval method of the function:

    import com.sun.tracing.dtrace.ArgsAttributes;
    import org.apache.flink.table.annotation.ArgumentHint;
    import org.apache.flink.table.functions.ScalarFunction;
    
    public static class NamedParameterClass extends ScalarFunction {
    
        // Use the @ArgumentHint annotation to specify the name, type, and whether a parameter is required.
        public String eval(@ArgumentHint(name = "param1", isOptional = false, type = @DataTypeHint("STRING")) String s1,
                          @ArgumentHint(name = "param2", isOptional = true, type = @DataTypeHint("INT")) Integer s2) {
            return s1 + ", " + s2;
        }
    }
    
  2. Use the @ArgumentHint annotation on the eval method of the function.

    import org.apache.flink.table.annotation.ArgumentHint;
    import org.apache.flink.table.functions.ScalarFunction;
    
    public static class NamedParameterClass extends ScalarFunction {
    
      // Use the @ArgumentHint annotation to specify the name, type, and whether a parameter is required.
      @FunctionHint(
              argument = {@ArgumentHint(name = "param1", isOptional = false, type = @DataTypeHint("STRING")),
                      @ArgumentHint(name = "param2", isOptional = true, type = @DataTypeHint("INTEGER"))}
      )
      public String eval(String s1, Integer s2) {
        return s1 + ", " + s2;
      }
    }
    
  3. Use the @ArgumentHint annotation on the class of the function.

    import org.apache.flink.table.annotation.ArgumentHint;
    import org.apache.flink.table.functions.ScalarFunction;
    
    // Use the @ArgumentHint annotation to specify the name, type, and whether a parameter is required.
    @FunctionHint(
            argument = {@ArgumentHint(name = "param1", isOptional = false, type = @DataTypeHint("STRING")),
                    @ArgumentHint(name = "param2", isOptional = true, type = @DataTypeHint("INTEGER"))}
    )
    public static class NamedParameterClass extends ScalarFunction {
    
      public String eval(String s1, Integer s2) {
        return s1 + ", " + s2;
      }
    }
    

The @ArgumentHint annotation already contains the @DataTypeHint annotation, so you can’t use it with @DataTypeHint in @FunctionHint. When applied to function parameters, @ArgumentHint can’t be used with @DataTypeHint at the same time, so you should use @ArgumentHint instead.

Named parameters take effect only when the corresponding class doesn’t contain overloaded functions and variable parameter functions, otherwise using named parameters causes an error.

Determinism

Every user-defined function class can declare whether it produces deterministic results or not by overriding the isDeterministic() method. If the function is not purely functional, like random(), date(), or now(), the method must return false. By default, isDeterministic() returns true.

Also, the isDeterministic() method may influence the runtime behavior. A runtime implementation might be called at two different stages.

During planning

During planning, in the so-called pre-flight phase, if a function is called with constant expressions, or if constant expressions can be derived from the given statement, a function is pre-evaluated for constant expression reduction and might not be executed on the cluster. In these cases, you can use the isDeterministic() method to disable constant expression reduction. For example, the following calls to ABS are executed during planning:

SELECT ABS(-1) FROM t;
SELECT ABS(field) FROM t WHERE field = -1;

But the following call to ABS is not executed during planning:

SELECT ABS(field) FROM t;

During runtime

If a function is called with non-constant expressions or isDeterministic() returns false, the function is executed on the cluster.

System function determinism

The determinism of system (built-in) functions is immutable. According to Apache Calcite’s SqlOperator definition, there are two kinds of functions which are not deterministic: dynamic functions and non-deterministic functions.

/**
 * Returns whether a call to this operator is guaranteed to always return
 * the same result given the same operands; true is assumed by default.
 */
public boolean isDeterministic() {
  return true;
}

/**
 * Returns whether it is unsafe to cache query plans referencing this
 * operator; false is assumed by default.
 */
public boolean isDynamicFunction() {
  return false;
}

The isDeterministic() method indicates the determinism of a function is evaluated per-record during runtime if it returns false.

The isDynamicFunction() method implies the function can be evaluated only at query-start if it returns true. It will be pre-evaluated during planning only for batch mode. For streaming mode, it is equivalent to a non-deterministic function, because the query is executed continuously under the abstraction of a continuous query over dynamic tables, so the dynamic functions are also re-evaluated for each query execution, which is equivalent to per-record in the current implementation.

The isDynamicFunction method applies only to system functions.

The following system functions are always non-deterministic, which means they are evaluated per-record during runtime, both in batch and streaming mode.

  • CURRENT_ROW_TIMESTAMP
  • RAND
  • RAND_INTEGER
  • UNIX_TIMESTAMP
  • UUID

The following system temporal functions are dynamic and are pre-evaluated during planning (query-start) for batch mode and evaluated per-record for streaming mode.

  • CURRENT_DATE
  • CURRENT_TIME
  • CURRENT_TIMESTAMP
  • LOCALTIME
  • LOCALTIMESTAMP
  • NOW

Constant expression reduction

In some cases, calls to user-defined functions with constant arguments can be reduced and simplified. You can declare whether a function supports constant expression reduction by overriding the supportsConstantFolding() method. An example could be the user-defined function call PlusOne(10), which could be simplified to 11 in an expression. This optimization happens at planning time, resulting in a plan utilizing only the reduced value. Usually, this is desirable, so it’s enabled by default, but there are some cases in which it should be disabled.

One case is when the function call is not deterministic, as described in the Determinism section. Setting a function as non-deterministic has the effect of preventing function call expression reduction, even if supportsConstantFolding() is true.

A function call may also have some side effects, even if it always returns deterministic results. This means that the correctness of the query within Flink may permit constant expression reduction, but it may not be desired. In this case, setting the supportsConstantFolding() method to return false also has the effect of preventing constant expression reduction, ensuring invocation at runtime.

Scalar functions

A user-defined scalar function maps zero, one, or multiple scalar values to a new scalar value. You can use any data type listed in Data Types as a parameter or return type of an evaluation method.

To define a scalar function, extend the ScalarFunction base class in org.apache.flink.table.functions and implement one or more evaluation methods named eval(...).

The following code example shows how to define your own hash code function and call it in a query.

import org.apache.flink.table.annotation.InputGroup;
import org.apache.flink.table.api.*;
import org.apache.flink.table.functions.ScalarFunction;
import static org.apache.flink.table.api.Expressions.*;

public static class HashFunction extends ScalarFunction {

  // take any data type and return INT
  public int eval(@DataTypeHint(inputGroup = InputGroup.ANY) Object o) {
    return o.hashCode();
  }
}

TableEnvironment env = TableEnvironment.create(...);

// call function "inline" without registration in Table API
env.from("MyTable").select(call(HashFunction.class, $("myField")));

// register function
env.createTemporarySystemFunction("HashFunction", HashFunction.class);

// call registered function in Table API
env.from("MyTable").select(call("HashFunction", $("myField")));

// call registered function in SQL
env.sqlQuery("SELECT HashFunction(myField) FROM MyTable");