Create a User Defined Function for Flink SQL¶
Note
User-Defined Function (UDF) support is an Early Access Program feature in Confluent Cloud. This feature should be used only for evaluation and non-production testing purposes or to provide feedback to Confluent, particularly as it becomes more widely available in follow-on preview editions. To participate in this Early Access Program, contact your Confluent account manager.
Early Access Program features are intended for evaluation use in development and testing environments only, and not for production use. Early Access Program features are provided: (a) without support; (b) “AS IS”; and (c) without indemnification, warranty, or condition of any kind. No service level commitment will apply to Early Access Program features. Early Access Program features are considered to be a Proof of Concept as defined in the Confluent Cloud Terms of Service. Confluent may discontinue providing preview releases of the Early Access Program features at any time in Confluent’s sole discretion.
A user defined function (UDF) extends the capabilities of Confluent Cloud for Apache Flink® and enables you to implement custom logic beyond what is supported by SQL. For example, you can implement functions like encoding/decoding a string, performing geospatial calculations, encrypting/decrypting fields, or reusing an existing library or code from a third-party supplier.
Confluent Cloud for Apache Flink supports UDFs written in Java. Package your custom function and its dependencies into a jar file and upload it to Confluent Cloud. Register the function in a Flink database by using the CREATE FUNCTION statement, and invoke your UDF in Flink SQL. Confluent Cloud provides the infrastructure to run your code.
The following steps show how to implement a simple UDF, upload it to Confluent Cloud, and use it in a Flink SQL statement.
- Step 1: Build the uber jar
- Step 2: Upload the jar as a Flink artifact
- Step 3: Create the UDF
- Step 4: Use the UDF in a Flink SQL query
- Step 5: Delete the UDF
Prerequisites¶
You need the following prerequisites to use Confluent Cloud for Apache Flink.
Access to Confluent Cloud.
The organization ID, environment ID, and compute pool ID for your organization.
The OrganizationAdmin, EnvironmentAdmin, or FlinkAdmin role for creating compute pools, or the FlinkDeveloper role if you already have a compute pool. If you don’t have the appropriate role, reach out to your OrganizationAdmin or EnvironmentAdmin.
The Confluent CLI. To use the Flink SQL shell, update to the latest version of the Confluent CLI by running the following command:
confluent update --yes
If you used homebrew to install the Confluent CLI, update the CLI by using the
brew upgrade
command, instead ofconfluent update
.For more information, see Confluent CLI.
A provisioned Flink compute pool in Confluent Cloud.
Apache Maven software project management tool (see Installing Apache Maven)
Java 11 to Java 17
Sufficient permissions to upload and invoke UDFs in Confluent Cloud. For more information, see Flink RBAC.
Flink versions 1.18.x and 1.19.x of
flink-table-api-java
are supported.
Step 1: Build the uber jar¶
In this section, you compile a simple Java class, named
TShirtSizingIsSmaller
into a jar file. The project is based on the
ScalarFunction
class in the Flink Table API. The
TShirtSizingIsSmaller.java
class has an eval
function that compares
two T-shirt sizes and returns the smaller size.
Copy the following project object model into a file named pom.xml.
Important
You can’t use your own Flink-related jars. If you package Flink core dependencies as part of the jar, you may break the dependency.
Also, this example shows how to capture all dependencies greedily, possibly including more than needed. As an alternative, you can optimize on artifact size by listing all dependencies and including their transitive dependencies.
pom.xml
<?xml version="1.0" encoding="UTF-8"?> <project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"> <modelVersion>4.0.0</modelVersion> <groupId>example</groupId> <artifactId>udf_example</artifactId> <version>1.0</version> <properties> <maven.compiler.source>11</maven.compiler.source> <maven.compiler.target>11</maven.compiler.target> <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding> </properties> <dependencies> <dependency> <groupId>org.apache.flink</groupId> <artifactId>flink-table-api-java</artifactId> <version>1.18.1</version> <scope>provided</scope> </dependency> <!-- Dependencies --> </dependencies> <build> <sourceDirectory>./example</sourceDirectory> <plugins> <plugin> <groupId>org.apache.maven.plugins</groupId> <artifactId>maven-shade-plugin</artifactId> <version>3.6.0</version> <configuration> <artifactSet> <includes> <!-- Include all UDF dependencies and their transitive dependencies here. --> <!-- This example shows how to capture all of them greedily. --> <include>*:*</include> </includes> </artifactSet> <filters> <filter> <artifact>*</artifact> <excludes> <!-- Do not copy the signatures in the META-INF folder. Otherwise, this might cause SecurityExceptions when using the JAR. --> <exclude>META-INF/*.SF</exclude> <exclude>META-INF/*.DSA</exclude> <exclude>META-INF/*.RSA</exclude> </excludes> </filter> </filters> </configuration> <executions> <execution> <phase>package</phase> <goals> <goal>shade</goal> </goals> </execution> </executions> </plugin> </plugins> </build> </project>
Create a directory named “example”.
mkdir example && cd example
Create a file named
TShirtSizingIsSmaller.java
in theexample
directory.touch TShirtSizingIsSmaller.java
Copy the following code into TShirtSizingIsSmaller.java.
package io.confluent.flink.table.modules.remoteudf; import org.apache.flink.table.functions.ScalarFunction; import java.util.Arrays; import java.util.List; import java.util.stream.IntStream; /** TShirt sizing function for demo. */ public class TShirtSizingIsSmaller extends ScalarFunction { public static final String NAME = "IS_SMALLER"; private static final List<Size> ORDERED_SIZES = Arrays.asList( new Size("X-Small", "XS"), new Size("Small", "S"), new Size("Medium", "M"), new Size("Large", "L"), new Size("X-Large", "XL"), new Size("XX-Large", "XXL")); public boolean eval(String shirt1, String shirt2) { int size1 = findSize(shirt1); int size2 = findSize(shirt2); // If either can't be found just say false rather than throw an error if (size1 == -1 || size2 == -1) { return false; } return size1 < size2; } private int findSize(String shirt) { return IntStream.range(0, ORDERED_SIZES.size()) .filter( i -> { Size s = ORDERED_SIZES.get(i); return s.name.equalsIgnoreCase(shirt) || s.abbreviation.equalsIgnoreCase(shirt); }) .findFirst() .orElse(-1); } private static class Size { private final String name; private final String abbreviation; public Size(String name, String abbreviation) { this.name = name; this.abbreviation = abbreviation; } } }
Run the following command to build the jar file.
mvn clean package
Run the following command to check the contents of your jar.
jar -tf target/udf_example-1.0.jar | grep -i TShirtSizingIsSmaller
Your output should resemble:
io/confluent/flink/table/modules/remoteudf/TShirtSizingIsSmaller$Size.class io/confluent/flink/table/modules/remoteudf/TShirtSizingIsSmaller.class
Step 2: Upload the jar as a Flink artifact¶
Log in to Confluent Cloud.
confluent login --organization-id ${ORG_ID} --prompt
Run the following command to upload the jar to Confluent Cloud.
confluent flink artifact create udf_example \ --artifact-file target/udf_example-1.0.jar \ --cloud ${CLOUD_PROVIDER} \ --region ${CLOUD_REGION} \ --environment ${ENV_ID}
Your output should resemble:
+----------------+-------------+ | ID | cfa-4n06xv | | Name | udf_example | | Version | ver-4k5jmp | | Class | default | | Content Format | JAR | | Cloud | aws | | Region | us-east-1 | | Environment | env-z3q9rd | +----------------+-------------+
Run the following command to view all of the available UDFs.
confluent flink artifact list \ --cloud ${CLOUD_PROVIDER} \ --region ${CLOUD_REGION}
Your output should resemble:
ID | Name | Cloud | Region | Environment -------------+-------------+-------+-----------+-------------- cfa-4n06xv | udf_example | AWS | us-east-1 | env-z3q9rd
Run the following command to view the details of a UDF. You can use the Plugin ID from the previous step or the artifact name to specify your UDF.
# use the plugin ID confluent flink artifact describe \ cfa-4n06xv \ --cloud ${CLOUD_PROVIDER} \ --region ${CLOUD_REGION} # use the artifact name confluent flink artifact describe \ udf_example \ --cloud ${CLOUD_PROVIDER} \ --region ${CLOUD_REGION}
Your output should resemble:
+----------------+-------------+ | ID | cfa-4n06xv | | Name | udf_example | | Version | ver-4k5jmp | | Class | default | | Cloud | AWS | | Region | us-east-1 | | Environment | env-z3q9rd | | Content Format | JAR | +----------------+-------------+
Step 3: Create the UDF¶
UDFs are registered inside a database, which means that you must specify the Confluent Cloud environment (Flink catalog) and Kafka cluster (Flink database) you want to use.
You can use the Confluent Cloud Console or the Confluent CLI to register your UDF.
- Log in to Confluent Cloud and navigate to your Flink workspace.
- In the Use catalog dropdown, select your environment.
- In the Use database dropdown, select your Kafka cluster.
Run the following command to start the Flink shell.
confluent flink shell --environment ${ENV_ID} --compute-pool ${COMPUTE_POOL_ID}
Run the following statements to specify the catalog and database.
-- Specify your catalog. This example uses the default. use catalog default;
Your output should resemble:
+---------------------+---------+ | Key | Value | +---------------------+---------+ | sql.current-catalog | default | +---------------------+---------+
Specify the database you want to use, for example,
cluster_0
.-- Specify your database. This example uses cluster_0. use cluster_0;
Your output should resemble:
+----------------------+-----------+ | Key | Value | +----------------------+-----------+ | sql.current-database | cluster_0 | +----------------------+-----------+
Run the CREATE FUNCTION statement to register your UDF in the current catalog and database. Substitute your UDF’s values for
<plugin-id>
and<version-id>
.CREATE FUNCTION is_smaller AS 'io.confluent.flink.table.modules.remoteudf.TShirtSizingIsSmaller' USING JAR 'confluent-artifact://<plugin-id>/<version-id>';
Your output should resemble:
Function 'is_smaller' created.
Step 4: Use the UDF in a Flink SQL query¶
Once it is registered, your UDF is available to use in queries.
Run the following statement to view the UDFs in the current database.
show user functions;
Your output should resemble:
+---------------+ | function name | +---------------+ | is_smaller | +---------------+
Run the following statement to create a
sizes
table.CREATE TABLE sizes ( `size_1` STRING, `size_2` STRING );
Run the following statement to populate the
sizes
table with values.INSERT INTO sizes VALUES ('XL', 'L'), ('small', 'L'), ('M', 'L'), ('XXL', 'XL');
Run the following statement to view the rows in the
sizes
table.SELECT * FROM sizes;
Your output should resemble:
size_1 size_2 XL L small L M L XXL XL
Run the following statement to execute the
is_smaller
function on the data in thesizes
table.SELECT size_1, size_2, IS_SMALLER (size_1, size_2) AS is_smaller FROM sizes;
Your output should resemble:
size_1 size_2 is_smaller XL L FALSE small L TRUE M L TRUE XXL XL FALSE
Step 5: Delete the UDF¶
When you’re finished using the UDF, you can delete it from the current database.
Run the following statement to remove the
is_smaller
function from the current database.DROP FUNCTION is_smaller;
Your output should resemble:
Function 'is_smaller' dropped.
Currently running statements are not affected and continue running.