Table Functions in ksqlDB for Confluent Platform¶
Synopsis¶
A table function is a function that returns a set of zero or more rows. Contrast this to a scalar function, which returns a single value.
Table functions are analogous to the FlatMap
operation commonly
found in functional programming or stream processing frameworks such as
Kafka Streams.
Important
Table functions are supported only on stream sources.
Table functions are used in the SELECT clause of a query. They cause the query to output potentially more than one row for each input value.
The current implementation of table functions only allows a single column to be returned. This column can be any valid SQL type.
Here’s an example of the EXPLODE
built-in table function, which
takes an ARRAY and outputs one value for each element of the array:
{sensor_id:12345 readings: [23, 56, 3, 76, 75]}
{sensor_id:54321 readings: [12, 65, 38]}
The following stream:
CREATE STREAM exploded_stream AS
SELECT sensor_id, EXPLODE(readings) AS reading FROM batched_readings;
Would emit:
{sensor_id:12345 reading: 23}
{sensor_id:12345 reading: 56}
{sensor_id:12345 reading: 3}
{sensor_id:12345 reading: 76}
{sensor_id:12345 reading: 75}
{sensor_id:54321 reading: 12}
{sensor_id:54321 reading: 65}
{sensor_id:54321 reading: 38}
When scalar values are mixed with table function return values in a
SELECT clause, the scalar values, like sensor_id
in the previous
example, are copied for each value returned from the table function.
You can also use multiple table functions in a SELECT clause. In this
situation, the results of the table functions are “zipped” together. The
total number of rows returned is equal to the greatest number of values
returned from any of the table functions. If some of the functions
return fewer rows than others, the missing values are replaced with
null
.
Here’s an example that illustrates using multiple table functions in a SELECT clause.
With the following input data:
{country:'UK', customer_names: ['john', 'paul', 'george', 'ringo'], customer_ages: [23, 56, 3]}
{country:'USA', customer_names: ['chad', 'chip', 'brad'], customer_ages: [56, 34, 76, 84, 56]}
And the following stream:
CREATE STREAM country_customers AS
SELECT country, EXPLODE(customer_names) AS name, EXPLODE(customer_ages) AS age FROM country_batches;
Would give:
{country: 'UK', name: 'john', age: 23}
{country: 'UK', name: 'paul', age: 56}
{country: 'UK', name: 'george', age: 3}
{country: 'UK', name: 'ringo', age: null}
{country: 'USA', name: 'chad', age: 56}
{country: 'USA', name: 'chip', age: 34}
{country: 'USA', name: 'brad', age: 76}
{country: 'USA', name: null, age: 84}
{country: 'USA', name: null, age: 56}
Functions¶
CUBE_EXPLODE
¶
- Applies to: array
- Since: 0.7.0
CUBE_EXPLODE(array[col1, ..., colN])
For the specified array of columns, outputs all of their possible combinations.
The CUBE_EXPLODE
function produces 2^d new rows, where d is the
number of columns given as parameter.
Duplicate entries for columns with NULL
value are skipped.
For example, given the following input records:
{"topic": "test_topic", "key": "0", "value": {"col1": 1, "col2": 2}}
{"topic": "test_topic", "key": "1", "value": {"col1": 1, "col2": null}}
And the following stream:
CREATE STREAM TEST (col1 INT, col2 INT) WITH (kafka_topic='test_topic', value_format='JSON');
CREATE STREAM OUTPUT AS SELECT cube_explode(array[col1, col2]) VAL FROM TEST;
The CUBE_EXPLODE
function produces the following output:
{"topic": "OUTPUT", "key": "0", "value": {"VAL": [null, null]}}
{"topic": "OUTPUT", "key": "0", "value": {"VAL": [null, 2]}}
{"topic": "OUTPUT", "key": "0", "value": {"VAL": [1, null]}}
{"topic": "OUTPUT", "key": "0", "value": {"VAL": [1, 2]}}
{"topic": "OUTPUT", "key": "1", "value": {"VAL": [null, null]}}
{"topic": "OUTPUT", "key": "1", "value": {"VAL": [1, null]}}
EXPLODE
¶
- Applies to: array
- Since: 0.6.0
EXPLODE(array)
Outputs one value for each of the elements in array
.
The output values have the same type as the array elements.