Use Lambda Functions to Transform Columns with Structured Data in ksqlDB for Confluent Platform

Context

You want to transform a column with structured data in a particular way, but there doesn’t exist a built-in function that suits your needs and you’re unable to implement and deploy a user-defined function. ksqlDB is capable of composing existing functions to create new expressions over structured data. These are called lambda functions.

In action

CREATE STREAM stream1 (
  id INT,
  lambda_map MAP<STRING, INTEGER>
) WITH (
  kafka_topic = 'stream1',
  partitions = 1,
  value_format = 'avro'
);

CREATE STREAM output AS
  SELECT id,
  TRANSFORM(lambda_map, (k, v) => UCASE(k), (k, v) => v + 5)
  FROM stream1
  EMIT CHANGES;

Syntax

The arguments for the lambda function are separated from the body of the lambda with the lambda operator, =>.

When there are two or more arguments, you must enclose the arguments with parentheses. Parentheses are optional for lambda functions with one argument.

Currently, ksqlDB supports up to three arguments in a single lambda function.

x => x + 5

(x,y) => x - y

(x,y,z) => z AND x OR y

Invocation UDFs

Lambda functions must be used inside designated invocation functions. These are the available Invocations:

Create a lambda-compatible stream

Invocation functions require either a map or array input. The following example creates a stream with a column type of MAP<STRING, INTEGER>.

CREATE STREAM stream1 (
  id INT,
  lambda_map MAP<STRING, INTEGER>
) WITH (
  kafka_topic = 'stream1',
  partitions = 1,
  value_format = 'avro'
);

Apply a lambda invocation function

A lambda invocation function is a scalar UDF, and you use it like other scalar functions.

The following example lambda function transforms both the key and value of a map and produces a new map. A built-in UDF transforms the key into an uppercase string using a built-in UDF, and the value is transformed through addition. The order of the variables is important: the first item in the arguments list, named k in this example, is treated as the key, and the second, named v in this example, is treated as the value. Pay attention to this if your map has different types. Note that transform on a map requires two lambda functions, while transform on an array requires one.

CREATE STREAM output AS
  SELECT id,
  TRANSFORM(lambda_map, (k, v) => UCASE(k), (k, v) => v + 5)
  FROM stream1;

Insert some values into stream1.

INSERT INTO stream1 (
  id, lambda_map
) VALUES (
  3, MAP('hello':= 15, 'goodbye':= -5)
);

Query the output. Make sure to set auto.offset.reset = earliest.

SELECT * FROM output AS final_output EMIT CHANGES;

Your output should resemble:

+------------------------------+------------------------------+
|id                            |final_output                  |
+------------------------------+------------------------------+
|3                             |{HELLO: 20}                   |
|4                             |{GOODBYE: 0}                  |

Use a reduce lambda invocation function

The following example creates a stream with a column type ARRAY<INTEGER> and applies the reduce lambda invocation function.

CREATE STREAM stream2 (
  id INT,
  lambda_arr ARRAY<INTEGER>
) WITH (
  kafka_topic = 'stream2',
  partitions = 1,
  value_format = 'avro'
);

CREATE STREAM output2 AS
  SELECT id,
  REDUCE(lambda_arr, 2, (s, x) => ceil(x/s))
  FROM stream2
  EMIT CHANGES;

Insert some values into stream2.

INSERT INTO stream2 (
  id, lambda_arr
) VALUES (
  1, ARRAY[2, 3, 4, 5]
);

Query the output. Make sure to set auto.offset.reset = earliest.

SELECT * FROM output2 AS final_output EMIT CHANGES;

You should see something similar to:

+------------------------------+------------------------------+
|id                            |final_output                  |
+------------------------------+------------------------------+
|1                             |{output:3}                    |

Use a filter lambda invocation function

Create a stream with a column type MAP<STRING, INTEGER>and apply the filter lambda invocation function.

CREATE STREAM stream3 (
  id INT,
  lambda_map MAP<STRING, INTEGER>
) WITH (
  kafka_topic = 'stream3',
  partitions = 1,
  value_format = 'avro'
);

CREATE STREAM output3 AS
  SELECT id,
  FILTER(lambda_map, (k, v) => instr(k, 'name') > 0 AND v != 0)
  FROM stream3
  EMIT CHANGES;

Insert some values into stream3.

INSERT INTO stream3 (
  id, lambda_map
) VALUES (
  1, MAP('first name':= 15, 'middle':= 25, 'last name':= 0, 'alt name':= 33)
);

Query the output. Make sure to set auto.offset.reset = earliest.

SELECT * FROM output3 AS final_output EMIT CHANGES;

Your output should resemble:

+------------------------------+-----------------------------------------------+
|id                            |final_output                                   |
+------------------------------+-----------------------------------------------+
|1                             |{first name: 15, alt name: 33}                 |

Advanced lambda use cases

The following example creates a stream with a column type MAP<STRING, ARRAY<DECIMAL(2,3)> and applies the transform lambda invocation function with a nested transform lambda invocation function.

CREATE STREAM stream4 (
  id INT,
  lambda_map MAP<STRING, ARRAY<DECIMAL(2,3)>>
) WITH (
  kafka_topic = 'stream4',
  partitions = 1,
  value_format = 'avro'
);

CREATE STREAM output4 AS
  SELECT id,
  TRANSFORM(lambda_map, (k, v) => concat(k, '_new')  (k, v) => transform(v, x => round(x)))
  FROM stream4
  EMIT CHANGES;

Insert some values into stream4.

INSERT INTO stream4 (
  id, lambda_map
) VALUES (
  1, MAP('Mary':= ARRAY[1.23, 3.65, 8.45], 'Jose':= ARRAY[5.23, 1.65]})
);

Query the output. Make sure to set auto.offset.reset = earliest.

SELECT * FROM output4 AS final_output EMIT CHANGES;

Your output should resemble:

+------------------------------+----------------------------------------------------------+
|id                            |final_output                                              |
+------------------------------+----------------------------------------------------------+
|1                             |{Mary_new: [1, 4, 8], Jose_new: [5, 2]}                   |