Important
You are viewing documentation for an older version of Confluent Platform. For the latest, click here.
Implement a User-defined Function (UDF and UDAF)¶
- Prerequisites
- Apache Maven
- Confluent Platform installed locally
- Internet connectivity for downloading Confluent POM files
Create a user-defined function (UDF) or a user-defined aggregation function (UDAF) by following these steps:
- Create the KSQL extensions directory that contains your UDF and UDAF packages.
- Create Java source and project files for your implementation.
- Build the package for your UDF or UDAF.
- Use your custom function in a KSQL query or statement.
For more information on custom functions, see KSQL Custom Function Reference (UDF and UDAF).
Create the KSQL Extensions Directory¶
When you create a custom user-defined function (UDF), you implement it in Java and deploy it as a JAR to the KSQL extensions directory. By default, this directory doesn’t exist, so you need to create it and assign it in the KSQL Server configuration properties.
Create the KSQL extensions directory, <path-to-confluent>/etc/ksql/ext
:
mkdir confluent-5.1.4/etc/ksql/ext
Edit the ksql-server.properties
configuration file in
<path-to-confluent>/etc/ksql
to add the fully qualified path to the
ext
directory:
ksql.extension.dir=/home/my-home-dir/confluent-5.1.4/etc/ksql/ext
Note
Use the fully qualified path or the relative path from
<path-to-confluent>/bin
, which is ../etc/ksql/ext
.
KSQL Server won’t load extensions if the path begins with ~
.
Create the Source and Project Files¶
The following steps shows how to implement your UDF in a Java class and build it by defining a Maven POM file.
- Create a root directory for your UDF’s source code and project files.
- Create the source code directory, which has a path that corresponds with the package name.
- Create the Java source code file in the source code directory.
- Create a Project Object Model (POM) file that defines how Maven builds the source code.
Create a Project Root Directory¶
Create the directory that holds your UDF or UDAF project:
mkdir ksql-udf-demo && cd ksql-udf-demo
Create the Source Code Directory¶
From the root directory for your UDF, create the source code directory. In this
example, the package name is my.company.ksql.udfdemo
.
mkdir -p src/main/java/my/company/ksql/udfdemo
Create the Java Source Code File¶
The following Java code defines four overloads for a multiply
function.
The UdfDescription
and Udf
annotations tell KSQL Server to load the
Multiply
class and look for methods to add to its list of available
functions. For more information, see KSQL Custom Function Reference (UDF and UDAF).
Copy the following code into a new file, named Multiply.java
:
package my.company.ksql.udfdemo;
import io.confluent.ksql.function.udf.Udf;
import io.confluent.ksql.function.udf.UdfDescription;
@UdfDescription(name = "multiply", description = "multiplies 2 numbers")
public class Multiply {
@Udf(description = "multiply two non-nullable INTs.")
public long multiply(final int v1, final int v2) {
return v1 * v2;
}
@Udf(description = "multiply two non-nullable BIGINTs.")
public long multiply(final long v1, final long v2) {
return v1 * v2;
}
@Udf(description = "multiply two nullable BIGINTs. If either param is null, null is returned.")
public Long multiply(final Long v1, final Long v2) {
return v1 == null || v2 == null ? null : v1 * v2;
}
@Udf(description = "multiply two non-nullable DOUBLEs.")
public double multiply(final double v1, double v2) {
return v1 * v2;
}
}
Save the file to the source code directory that you created in the previous
step, src/main/java/my/company/ksql/udfdemo
.
Create the POM File¶
In the root directory for your custom UDF implementation, create the Project
Object Model (POM) file for the Maven build, and name it pom.xml
:
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>
<!-- Specify the package details for the custom UDF -->
<groupId>my.company.ksql.udfdemo</groupId>
<artifactId>ksql-udf-demo</artifactId>
<version>1.0</version>
<!-- Specify the repository for Confluent dependencies -->
<repositories>
<repository>
<id>confluent</id>
<url>http://packages.confluent.io/maven/</url>
</repository>
</repositories>
<!-- Specify build properties -->
<properties>
<exec.mainClass>my.company.ksql.udfdemo.thisisignored</exec.mainClass>
<java.version>1.8</java.version>
<kafka.version>2.0.0</kafka.version>
<kafka.scala.version>2.11</kafka.scala.version>
<scala.version>${kafka.scala.version}.8</scala.version>
<confluent.version>5.1.0</confluent.version>
<project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
</properties>
<!-- Specify the ksql-udf dependency -->
<dependencies>
<!-- KSQL dependency is needed to write your own UDF -->
<dependency>
<groupId>io.confluent.ksql</groupId>
<artifactId>ksql-udf</artifactId>
<version>5.1.4</version>
</dependency>
</dependencies>
<!-- Build boilerplate -->
<build>
<plugins>
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-compiler-plugin</artifactId>
<version>3.6.1</version>
<configuration>
<source>1.8</source>
<target>1.8</target>
</configuration>
</plugin>
<!-- Package all dependencies as one jar -->
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-assembly-plugin</artifactId>
<version>2.5.2</version>
<configuration>
<descriptorRefs>
<descriptorRef>jar-with-dependencies</descriptorRef>
</descriptorRefs>
<archive>
<manifest>
<addClasspath>true</addClasspath>
<mainClass>${exec.mainClass}</mainClass>
</manifest>
</archive>
</configuration>
<executions>
<execution>
<id>assemble-all</id>
<phase>package</phase>
<goals>
<goal>single</goal>
</goals>
</execution>
</executions>
</plugin>
</plugins>
</build>
</project>
Important
For production environments, we strongly recommend that you write comprehensive tests to cover your custom functions.
Build the UDF Package¶
Use Maven to build the package and create a JAR. Copy the JAR to the KSQL extensions directory.
In the root folder for your UDF, run Maven to build the package:
mvn clean package
After a great deal of build info, your output should resemble:
...
[INFO] --- maven-assembly-plugin:2.5.2:single (assemble-all) @ ksql-udf-demo ---
[INFO] Building jar: /home/my-home-dir/ksql-udf-demo/target/ksql-udf-demo-1.0-jar-with-dependencies.jar
[INFO] ------------------------------------------------------------------------
[INFO] BUILD SUCCESS
[INFO] ------------------------------------------------------------------------
[INFO] Total time: 17.511 s
[INFO] Finished at: 2018-12-17T22:07:08Z
[INFO] Final Memory: 26M/280M
[INFO] ------------------------------------------------------------------------
The Maven build creates a directory named target
and saves the build output
there. Copy the JAR file, ksql-udf-demo-1.0-jar-with-dependencies.jar
, from
the target
directory to the ext
directory of your KSQL installation.
For example, if your Confluent Platform installation is at /home/my-home-dir/confluent-5.1.4
,
copy the JAR to /home/my-home-dir/confluent-5.1.4/etc/ksql/ext
.
cp target/ksql-udf-demo-1.0-jar-with-dependencies.jar <path-to-confluent>/etc/ksql/ext
The custom UDF is deployed and ready to run.
Use Your Custom UDF in a KSQL Query¶
When your custom UDF is deployed in the KSQL extensions directory, it’s loaded automatically when you start KSQL Server, and you can use it like you use the other KSQL functions.
Note
KSQL loads UDFs and UDAFs only on startup, so when you make changes to your UDF code and re-deploy the JAR, you must restart KSQL Server to get the latest version of your UDF.
Start Confluent Platform and KSQL Server:
<path-to-confluent>/bin/confluent start ksql-server
Start the KSQL CLI:
LOG_DIR=./ksql_logs <path-to-confluent>/bin/ksql
In the KSQL CLI, list the available functions to ensure that KSQL Server loaded the MULTIPLY user-defined function:
LIST FUNCTIONS;
Your output should resemble:
Function Name | Type
-------------------------------
ABS | SCALAR
ARRAYCONTAINS | SCALAR
... |
MULTIPLY | SCALAR
... |
SUBSTRING | SCALAR
SUM | AGGREGATE
... |
-------------------------------
Inspect the details of the MULTIPLY function:
DESCRIBE FUNCTION MULTIPLY;
Your output should resemble:
Name : MULTIPLY
Overview : multiplies 2 numbers
Type : scalar
Jar : /home/my-home-dir/confluent-5.1.4|/etc/ksql/ext/ksql-udf-demo-1.0-jar-with-dependencies.jar
Variations :
Variation : MULTIPLY(BIGINT, BIGINT)
Returns : BIGINT
Description : multiply two nullable BIGINTs. If either param is null, null is
returned.
Variation : MULTIPLY(DOUBLE, DOUBLE)
Returns : DOUBLE
Description : multiply two non-nullable DOUBLEs.
Variation : MULTIPLY(INT, INT)
Returns : BIGINT
Description : multiply two non-nullable INTs.
Use the MULTIPLY function in a query. If you follow the steps in
Writing Streaming Queries Against Apache Kafka® Using KSQL (Local), you can multiply the two BIGINT fields in the
pageviews_original
stream:
SELECT MULTIPLY(rowtime, viewtime) FROM pageviews_original;
Your output should resemble:
2027398056717155428
2028560009956135428
2029465468198408945
2030608879630876785
2031171314443704673
2032147849613387385
2032926605508340785
^CQuery terminated
Press Ctrl+C to terminate the query.
Custom Aggregation Function (UDAF)¶
Implementing a user-defined aggregation function (UDAF) is similar to the way
that you implement a UDF. You use the UdafDescription
and UdafFactory
annotations in your Java code, and you deploy a JAR to the KSQL extensions
directory. For more information, see UDAFs.