User-defined Functions (UDFs) in ksqlDB for Confluent Platform

User-defined functions enable you to extend ksqlDB’s suite of built-in functions using Java hooks. This section is a reference for how they work. Use the how-to guide to learn how to create and use them.

Data type mapping

Because SQL has a type system that is independent from Java’s, user-defined functions (UDFs) need to use specific Java types so that ksqlDB can manage the correspondence from SQL to Java. Below is the mapping to use for all UDF parameters and return types. Use boxed types when you want to tolerate null values.

SQL Type

Java Type

INT

int, java.lang.Integer

BOOLEAN

boolean, java.lang.Boolean

BIGINT

long, java.lang.Long

DOUBLE

double, java.lang.Double

DECIMAL

java.math.BigDecimal

VARCHAR

java.lang.String

BYTES

java.nio.ByteBuffer

TIME

java.sql.Time

DATE

java.sql.Date

TIMESTAMP

java.sql.Timestamp

ARRAY

java.util.List

MAP

java.util.Map

STRUCT

org.apache.kafka.connect.data.Struct

BYTES

java.nio.ByteBuffer

Note

Using Struct or BigDecimal in your functions requires specifying the schema by using paramSchema, returnSchema, aggregateSchema, or a schema provider.

Classloading

How does ksqlDB choose which classes to load as user-defined functions? At start up time, ksqlDB scans the jars in its extensions directory looking for classes with UDF annotated. Each function that is found is parsed and, if successful, loaded into ksqlDB.

Each function instance has its own child-first ClassLoader that is isolated from other functions. If you need to use any third-party libraries with your functions, they should also be part of your jar, which means that you should create an uberjar. The classes in your uberjar are loaded in preference to any classes on the ksqlDB classpath, excluding anything vital to the running of ksqlDB, i.e., classes that are part of org.apache.kafka and io.confluent.

Annotations

Annotations not only help ksqlDB figure out which classes will be used as UDFs, they also help commands like DESCRIBE FUNCTION display helpful metadata.

Scalar functions

When a class is annotated with @UdfDescription, it’s scanned for any public methods that are annotated with @Udf. If it matches, the class is loaded as a scalar function. Each method’s parameters may optionally be annotated with @UdfParameter. Here is what each of these annotations can be parameterized with.

@UdfDescription

The @UdfDescription annotation is applied at the class level.

Field

Description

Required

author

The author of the UDF.

No

category

For grouping similar functions in the output of SHOW FUNCTIONS.

No

description

A string describing generally what the function(s) in this class do.

Yes

name

The case-insensitive name of the UDF(s) represented by this class.

Yes

version

The version of the UDF.

No

@Udf

The @Udf annotation is applied to public methods of a class annotated with @UdfDescription. Each annotated method will become an invocable function in SQL.

Field

Description

Required

description

A string describing generally what a particular version of the UDF does.

No

schema

The ksqlDB schema for the return type of this UDF.

For complex types such as STRUCT if schemaProvider is not passed in.

schemaProvider

A reference to a method that computes the return schema of this UDF (e.g. dynamic return type).

For complex types, like STRUCT, if schema is not provided.

@UdfParameter

The @UdfParameter annotation is applied to parameters of methods annotated with @Udf. ksqlDB uses the information in the @UdfParameter annotation to specify the parameter schema (if it can’t be inferred from the Java type) and to convey metadata.

Field

Description

Required

value

The case-insensitive name of the parameter

Required if the UDF JAR was not compiled with the -parameters javac argument.

description

A string describing generally what the parameter represents

No

schema

The ksqlDB schema for the parameter.

For complex types, like STRUCT

Note

If schema is supplied in the @UdfParameter annotation for a STRUCT it is considered “strict” - any inputs must match exactly, including order and names of the fields.

If your Java 8 class is compiled with the -parameters compiler flag, the name of the parameter will be inferred from the method declaration.

Tabular functions

When a class is annotated with @UdtfDescription, it’s scanned for any public methods that are annotated with @Udtf. If it matches, the class is loaded as a tabular function. Each method’s parameters may optionally be annotated with @UdfParameter. Here is what each of these annotations can be parameterized with.

@UdtfDescription

The @UdtfDescription annotation is applied at the class level.

Field

Description

Required

author

The author of the UDTF.

No

description

A string describing generally what the function(s) in this class do.

Yes

name

The case-insensitive name of the UDTF(s) represented by this class.

Yes

version

The version of the UDTF.

No

@Udtf

The @Udtf annotation is applied to public methods of a class annotated with @UdtfDescription. Each annotated method becomes an invocable function in SQL. This annotation supports the following fields:

Field

Required

Description

description

No

A string describing generally what a particular version of the UDTF does.

schema

For complex types like STRUCT if schemaProvider is not provided.

The ksqlDB schema for the return type of this UDTF.

schemaProvider

For complex types like STRUCT if schema is not provided.

A reference to a method that computes the return schema of this UDTF, e.g. dynamic return type.

@UdfParameter

You can use the @UdfParameter annotation to provide extra information for UDTF parameters. This is the same annotation as used for UDFs. Please see the earlier documentation on this for further information.

Aggregation functions

When a class is annotated with @UdafDescription, it’s scanned for any public static methods that are annotated with @UdafFactory that return either Udaf or TableUdaf. If it matches, the class is loaded as an aggregation function. The factory function represents a collection of UDAFs all with the same name but may have different arguments and return types. Here is what each of these annotations can be parameterized with.

Both Udaf and TableUdaf are parameterized by three generic types:

  1. I is the input type of the UDAF. I can be a tuple type, one of Pair, Triple, Quadruple, or Quintuple, when there are multiple column arguments. VariadicArgs can be nested inside a tuple to create a variadic column argument. A function can have at most one variadic argument anywhere in its signature (including the parameters of UdafFactory). A variadic column argument may have Object as its type parameter to accept any number of columns of any type, though a variadic Object factory argument is not supported. A variadic column argument outside a tuple is not supported.

  2. A is the data type of the intermediate storage used to keep track of the state of the UDAF.

  3. O is the data type of the return value.

Decoupling the data types of the state and return value enables you to define UDAFs like average, as shown in the following example.

When you create a UDAF, you can use the map method to provide the logic that transforms an intermediate aggregate value to the returned value.

The merge method is only called when merging sessions when session windowing is used.

@UdafDescription

The @UdafDescription annotation is applied at the class level.

Field

Description

Required

name

The case-insensitive name of the UDAF(s) represented by this class.

Yes

descrip tion

A string describing generally what the function(s) in this class do.

Yes

author

The author of the UDF.

No

version

The version of the UDF.

No

@UdafFactory

The @UdafFactory annotation is applied to public static methods of a class annotated with @UdafDescription. The method must return either Udaf, or, if it supports table aggregations, TableUdaf. Each annotated method is a factory for an invocable aggregate function in SQL. The annotation supports the following fields:

Field

Required

Description

aggregateSchema

For complex types, like STRUCT

The ksqlDB schema for the intermediate state.

description

Yes

A string describing generally what the function(s) in this class do.

paramSchema

For complex types, like STRUCT

The ksqlDB schema(s) for the input parameter(s). If you provide fewer schemas than there are parameters, the schemas for the remaining parameters default to being empty. If you provide more schemas than there are arguments, the extra schemas are ignored.

returnSchema

For complex types, like STRUCT

The ksqlDB schema for the return value.

Note

If paramSchema , aggregateSchema or returnSchema is supplied in the @UdafFactory annotation for a STRUCT, it’s considered “strict”: any inputs must match exactly, including order and names of the fields.

Null values

If a user defined function uses primitive types in its signature it is indicating that the parameter should never be null. Conversely, using boxed types indicates the function can accept null values for the parameter. It’s up to the implementer of the UDF to choose which is the more appropriate. A common pattern is to return null if the input is null, though generally this is only for parameters that are expected to be supplied from the source row being processed.

For example, a substring(String str, int pos) UDF might return null if str is null, but a null value for the pos parameter would be treated as an error, and so should be a primitive. In fact, the built-in substring is more lenient and would return null if pos is null).

The return type of a UDF can also be a primitive or boxed type. A primitive return type indicates the function will never return null, whereas a boxed type indicates that it may return null.

ksqlDB checks the value that’s passed to each parameter and reports an error to the server log for any null values being passed to a primitive type. The associated column in the output row will be null.

Dynamic types

UDFs support dynamic return types that are resolved at runtime. This is useful if you want to implement a UDF with a non-deterministic return type, like DECIMAL or STRUCT. For example, a UDF that returns BigDecimal, which maps to the SQL DECIMAL type, may vary the precision and scale of the output based on the input schema.

To use this functionality, you need to specify a method with signature public SqlType <your-method-name>(final List<SqlArgument> params) and annotate it with @UdfSchemaProvider. Also, you need to link it to the corresponding UDF by using the schemaProvider=<your-method-name> parameter of the @Udf annotation.

When implementing dynamic returns for a UDTF function, if your method returns a value of type List<T>, the type referred to by the schema provider method is the type T, not the type List<T>.

For dynamic UDAFs, the aggregate or map methods may depend on the input SQL type, so implementations of the Udaf interface override some of the following three methods: initializeTypeArguments(List<SqlArgument> argTypeList), getAggregateSqlType(), and getReturnSqlType().

Generics

A UDF declaration can utilize generics if they match the following conditions:

  1. Any generic in the return value of a method must appear in at least one of the method parameters

  2. The generic must not adhere to any interface. For example, <T extends Number> is not valid.

  3. The generic does not support type coercion or inheritance. For example, add(T a, T b) will accept BIGINT, BIGINT but not INT, BIGINT.

External parameters

If the UDF class needs access to the ksqlDB Server configuration, it can implement org.apache.kafka.common.Configurable. configure() will be invoked with the map of server parameters. This can be useful for parameterizing a function on a per-deployment basis.

For security reasons, only settings whose name is prefixed with ksql.functions.<lowercase-udfname>. or ksql.functions._global_. are propagated to the UDF.

Security

Blacklisting

In some deployment environments, it may be necessary to restrict the classes that UDFs have access to, as they may represent a security risk. To reduce the attack surface of ksqlDB user defined functions you can optionally blacklist classes and packages so that they can’t be used from a UDF. An example blacklist is in a file named resource-blacklist.txt in the extensions directory. All of the entries in the default version of the file are commented out, but it shows how you can use the blacklist.

This file contains one entry per line, where each line is a class or package that should be blacklisted. The matching of the names is based on a regular expression, so if you have an entry, java.lang.Process like this:

java.lang.Process

This matches any paths that begin with java.lang.Process, like java.lang.Process, java.lang.ProcessBuilder, etc.

If you want to blacklist a single class, for example, java.lang.Compiler, then you would add:

java.lang.Compiler$

Any blank lines or lines beginning with # are ignored. If the file is not present, or is empty, then no classes are blacklisted.

Security Manager

By default, ksqlDB installs a simple Java security manager for executing user defined functions. The security manager blocks attempts by any functions to fork processes from the ksqlDB Server. It also prevents them from calling System.exit(..).

You can disable the security manager by setting ksql.udf.enable.security.manager to false.

Disabling ksqlDB Custom Functions

You can disable the loading of all UDFs in the extensions directory by setting ksql.udfs.enabled to false. By default, they are enabled.

Metrics

Metric collection can be enabled by setting the config ksql.udf.collect.metrics to true. This defaults to false and is generally not recommended for production usage, as metrics are collected on each invocation and introduce some overhead to processing time. See more details in the UDF metrics reference section.