Create a User-Defined Function with Confluent Cloud for Apache Flink

A user-defined function (UDF) extends the capabilities of Confluent Cloud for Apache Flink® and enables you to implement custom logic beyond what is supported by SQL. For example, you can implement functions like encoding and decoding a string, performing geospatial calculations, encrypting and decrypting fields, or reusing an existing library or code from a third-party supplier.

Confluent Cloud for Apache Flink supports UDFs written in Java. Package your custom function and its dependencies into a JAR file and upload it as an artifact to Confluent Cloud. Register the function in a Flink database by using the CREATE FUNCTION statement, and invoke your UDF in Flink SQL or the Table API. Confluent Cloud provides the infrastructure to run your code.

The following steps show how to implement a simple user-defined scalar function, upload it to Confluent Cloud, and use it in a Flink SQL statement.

After you build and run the scalar function, try building a table function.

For more code examples, see Flink UDF Java Examples.

Prerequisites

You need the following prerequisites to use Confluent Cloud for Apache Flink.

  • Access to Confluent Cloud.

  • The organization ID, environment ID, and compute pool ID for your organization.

  • The OrganizationAdmin, EnvironmentAdmin, or FlinkAdmin role for creating compute pools, or the FlinkDeveloper role if you already have a compute pool. If you don’t have the appropriate role, reach out to your OrganizationAdmin or EnvironmentAdmin.

  • The Confluent CLI. To use the Flink SQL shell, update to the latest version of the Confluent CLI by running the following command:

    confluent update --yes
    

    If you used homebrew to install the Confluent CLI, update the CLI by using the brew upgrade command, instead of confluent update.

    For more information, see Confluent CLI.

  • A provisioned Flink compute pool in Confluent Cloud.

  • Apache Maven software project management tool (see Installing Apache Maven)

  • Java 11 to Java 17

  • Sufficient permissions to upload and invoke UDFs in Confluent Cloud. For more information, see Flink RBAC.

  • If using the Table API only, Flink versions 1.18.x and 1.19.x of flink-table-api-java are supported.

Step 1: Build the uber jar

In this section, you compile a simple Java class, named TShirtSizingIsSmaller into a jar file. The project is based on the ScalarFunction class in the Flink Table API. The TShirtSizingIsSmaller.java class has an eval function that compares two T-shirt sizes and returns the smaller size.

  1. Copy the following project object model into a file named pom.xml.

    Important

    You can’t use your own Flink-related jars. If you package Flink core dependencies as part of the jar, you may break the dependency.

    Also, this example shows how to capture all dependencies greedily, possibly including more than needed. As an alternative, you can optimize on artifact size by listing all dependencies and including their transitive dependencies.

  2. Create a directory named “example”.

    mkdir example
    
  3. In the example directory, create a file named TShirtSizingIsSmaller.java.

    touch example/TShirtSizingIsSmaller.java
    
  4. Copy the following code into TShirtSizingIsSmaller.java.

    package com.example.my;
    
    import org.apache.flink.table.functions.ScalarFunction;
    
    import java.util.Arrays;
    import java.util.List;
    import java.util.stream.IntStream;
    
    /** TShirt sizing function for demo. */
    public class TShirtSizingIsSmaller extends ScalarFunction {
       public static final String NAME = "IS_SMALLER";
    
       private static final List<Size> ORDERED_SIZES =
                Arrays.asList(
                      new Size("X-Small", "XS"),
                      new Size("Small", "S"),
                      new Size("Medium", "M"),
                      new Size("Large", "L"),
                      new Size("X-Large", "XL"),
                      new Size("XX-Large", "XXL"));
    
       public boolean eval(String shirt1, String shirt2) {
          int size1 = findSize(shirt1);
          int size2 = findSize(shirt2);
          // If either can't be found just say false rather than throw an error
          if (size1 == -1 || size2 == -1) {
                return false;
          }
          return size1 < size2;
       }
    
       private int findSize(String shirt) {
          return IntStream.range(0, ORDERED_SIZES.size())
                   .filter(
                            i -> {
                               Size s = ORDERED_SIZES.get(i);
                               return s.name.equalsIgnoreCase(shirt)
                                        || s.abbreviation.equalsIgnoreCase(shirt);
                            })
                   .findFirst()
                   .orElse(-1);
       }
    
       private static class Size {
          private final String name;
          private final String abbreviation;
    
          public Size(String name, String abbreviation) {
                this.name = name;
                this.abbreviation = abbreviation;
          }
       }
    }
    
  5. Run the following command to build the jar file.

    mvn clean package
    
  6. Run the following command to check the contents of your jar.

    jar -tf target/udf_example-1.0.jar | grep -i TShirtSizingIsSmaller
    

    Your output should resemble:

    com/example/my/TShirtSizingIsSmaller$Size.class
    com/example/my/TShirtSizingIsSmaller.class
    

Step 2: Upload the jar as a Flink artifact

You can use the Confluent Cloud Console, the Confluent CLI, or the REST API to upload your UDF.

  1. Log in to Confluent Cloud and navigate to your Flink workspace.
  2. Navigate to the environment where you want to run the UDF.
  3. Click Flink, in the Flink page, click Artifacts.
  4. Click Upload artifact to open the upload pane.
  5. In the Cloud provider dropdown, select AWS, and in the Region dropdown, select the cloud region.
  6. Click Upload your JAR file and navigate to the location of your JAR file, which in the current example is target/udf_example-1.0.jar.
  7. When your JAR file is uploaded, it appears in the Artifacts list. In the list, click the row for your UDF artifact to open the details pane.

Step 3: Register the UDF

UDFs are registered inside a Flink database, which means that you must specify the Confluent Cloud environment (Flink catalog) and Kafka cluster (Flink database) where you want to use the UDF.

You can use the Confluent Cloud Console, the Confluent CLI, the Confluent Terraform provider, or the REST API to register your UDF.

  1. In the Flink page, click Compute pools.
  2. In the tile for the compute pool where you want to run the UDF, click Open SQL workspace.
  3. In the Use catalog dropdown, select the environment where you want to run the UDF.
  4. In the Use database dropdown, select Kafka cluster that you want to run the UDF.

  • In Cloud Console or the Confluent CLI, run the CREATE FUNCTION statement to register your UDF in the current catalog and database. Substitute your UDF’s values for <artifact-id> and <version-id>.

    CREATE FUNCTION is_smaller
      AS 'com.example.my.TShirtSizingIsSmaller'
      USING JAR 'confluent-artifact://<artifact-id>/<version-id>';
    

    Your output should resemble:

    Function 'is_smaller' created.
    

Step 5: Implement UDF logging (optional)

If you want to log UDF status messages to a Kafka topic, follow the steps in Enable UDF Logging.

Step 6: Delete the UDF

When you’re finished using the UDF, you can delete it from the current database.

You can use the Confluent Cloud Console, the Confluent CLI, the Confluent Terraform provider, or the REST API to delete your UDF.

Drop the function

  1. Run the following statement to remove the is_smaller function from the current database.

    DROP FUNCTION is_smaller;
    

    Your output should resemble:

    Function 'is_smaller' dropped.
    

    Currently running statements are not affected and continue running.

  2. Exit the Flink shell.

    exit;
    

Delete the JAR artifact

  1. Navigate to the environment where your UDF is registered.
  2. Click Flink, and in the Flink page, click Artifacts.
  3. In the artifacts list, find the UDF you want to delete.
  4. In the Actions column, click the icon, and in the context menu, select Delete artifact.
  5. In the confirmation dialog, type “udf_example”, and click Confirm. The “Artifact deleted successfully” message appears.

Implement a user-defined table function

In the previous steps, you implemented a UDF with a simple scalar function. Confluent Cloud for Apache Flink also supports user-defined table functions (UDTFs), which take multiple scalar values as input arguments and return multiple rows as output, instead of a single value.

The following steps show how to implement a simple UDTF, upload it to Confluent Cloud, and use it in a Flink SQL statement.

Step 2: Upload the UDTF jar as a Flink artifact

  1. Log in to Confluent Cloud and navigate to your Flink workspace.
  2. Navigate to the environment where you want to run the UDF.
  3. Click Flink, in the Flink page, click Artifacts.
  4. Click Upload artifact to open the upload pane.
  5. In the Cloud provider dropdown, select AWS, and in the Region dropdown, select the cloud region.
  6. Click Upload your JAR file and navigate to the location of your JAR file, which in the current example is target/udf_example-1.0.jar.
  7. When your JAR file is uploaded, it appears in the Artifacts list. In the list, click the row for your UDF artifact to open the details pane.

Step 3: Register the UDTF

  1. In the Flink shell or the Cloud Console, specify the catalog and database (environment and cluster) where you want to use the UDTF, as you did in the previous section.

  2. Run the CREATE FUNCTION statement to register your UDTF in the current catalog and database. Substitute your UDTF’s values for <artifact-id> and <version-id>.

    CREATE FUNCTION split_string
      AS 'com.example.my.SplitFunction'
      USING JAR 'confluent-artifact://<artifact-id>/<version-id>';
    

    Your output should resemble:

    Function 'split_string' created.