KSQL Testing Tool¶
Use the KSQL testing tool to test a set of KSQL statements. The KSQL testing tool is a command line utility that enables testing KSQL statements without requiring any infrastructure, like Apache Kafka® and KSQL clusters. The KSQL testing tool is a great way to design your KSQL pipeline and ensure the expected results are generated. You can collaborate on designing your KSQL statements by sharing the test files. To test a set of KSQL statements, you provide three files, one file containing the KSQL statements and two JSON files containing the input records and the expected output records.
NAME
ksql-test-runner - The KSQL testing tool
SYNOPSIS
ksql-test-runner {--input-file | -i} <inputFile>
{--output-file | -o} <outputFile>
{--sql-file | -s} <statementsFile>
OPTIONS
--input-file <inputFile>, -i <inputFile>
A JSON file containing the input records.
This option may occur a maximum of 1 time
--output-file <outputFile>, -o <outputFile>
A JSON file containing the expected output records.
This option may occur a maximum of 1 time
--sql-file <statementsFile>, -s <statementsFile>
A SQL file containing KSQL statements to be tested.
This option may occur a maximum of 1 time
Provide required jar files¶
Starting in version 5.3.0, ksql-test-runner throws a
ClassNotFoundException
, because the the following jars aren’t packaged in
KSQL.
- ksql-functional-tests/hamcrest-all-1.3.jar
- ksql-functional-tests/junit-4.12.jar
The workaround is to copy the required jars into the ksql
classpath at
confluent-5.3.1/share/java/ksql
. For example,
use the following commands for the classpath
/opt/confluent/confluent-5.3.1/share/java/ksql
.
cd /opt/confluent/confluent-5.3.1/share/java/ksql
wget https://repo1.maven.org/maven2/junit/junit/4.12/junit-4.12.jar
wget https://repo1.maven.org/maven2/org/hamcrest/hamcrest-all/1.3/hamcrest-all-1.3.jar
Test File Structure¶
Statements File¶
The statements file contains the KSQL statements to test. The following are the supported statements in the testing tool:
- CREATE STREAM
- CREATE TABLE
- CREATE STREAM AS SELECT
- CREATE TABLE AS SELECT
- INSERT INTO SELECT
Here is a sample statements file for the testing tool:
CREATE STREAM orders (ORDERUNITS double) WITH (kafka_topic='test_topic', value_format='JSON');
CREATE STREAM S1 AS SELECT ORDERUNITS, CASE WHEN orderunits < 2.0 THEN 'small' WHEN orderunits < 4.0 THEN 'medium' ELSE 'large' END AS case_resault FROM orders;
Input File¶
The input file is a JSON file with one array field named “inputs”. Each element in the array is the representation of input messages. The input messages array cannot be empty. A message should have a topic, a key, a value and a timestamp. The following is a sample input file for the above test:
{
"inputs": [
{"topic": "test_topic", "timestamp": 0, "value": {"ORDERUNITS": 2.0}, "key": 0},
{"topic": "test_topic", "timestamp": 0, "value": {"ORDERUNITS": 4.0}, "key": 100},
{"topic": "test_topic", "timestamp": 0, "value": {"ORDERUNITS": 6.0 }, "key": 101},
{"topic": "test_topic", "timestamp": 0, "value": {"ORDERUNITS": 3.0}, "key": 101},
{"topic": "test_topic", "timestamp": 0, "value": {"ORDERUNITS": 1.0}, "key": 101}
]
}
Output File¶
The output file is a JSON file with an array field named “outputs”. Similar to the input file, each element in the array is the representation of the expected output messages. The output messages array cannot be empty. An expected output message should have a topic, a key, a value and a timestamp. The following is a sample expected output file for the above test:
{
"outputs": [
{"topic": "S1", "timestamp": 0, "value": {"ORDERUNITS": 2.0, "CASE_RESAULT": "medium"}, "key": 0},
{"topic": "S1", "timestamp": 0, "value": {"ORDERUNITS": 4.0, "CASE_RESAULT": "large"}, "key": 100},
{"topic": "S1", "timestamp": 0, "value": {"ORDERUNITS": 6.0, "CASE_RESAULT": "large"}, "key": 101},
{"topic": "S1", "timestamp": 0, "value": {"ORDERUNITS": 3.0, "CASE_RESAULT": "medium"}, "key": 101},
{"topic": "S1", "timestamp": 0, "value": {"ORDERUNITS": 1.0, "CASE_RESAULT": "small"},"key": 101}
]
}
In the input and output files you can have messages with windowed keys. Such messages can be generated by windowed aggretations in KSQL. To specify a window for a message you can add a “window” field to the message. A window field has three fields:
- start: the start time for the window.
- end: the end time for the window.
- type: the type of the window. A window type can be
time
orsession
.
The following is an example expected output file with records that have a window field:
{
"outputs": [
{"topic": "S2", "key": 0, "value": "0,0", "timestamp": 0, "window": {"start": 0, "end": 30000, "type": "time"}},
{"topic": "S2", "key": 0, "value": "0,5", "timestamp": 10000, "window": {"start": 0, "end": 30000, "type": "time"}},
{"topic": "S2", "key": 100, "value": "100,100", "timestamp": 30000, "window": {"start": 30000, "end": 60000, "type": "time"}},
{"topic": "S2", "key": 100, "value": "100,100", "timestamp": 45000, "window": {"start": 30000, "end": 60000, "type": "time"}}
]
}
Currently, in the input files you can only have record with session window types.
The testing tool will indicate the success or failure of a test by printing the corresponding message. The following is the result of a successful test:
ksql-test-runner -s statements.sql -i input.json -o output.json
Your output should resemble:
Test passed!
If a test fails, the testing tool will indicate the failure along with the cause. Here is an example of the output for a failing test:
ksql-test-runner -s statements_bad.sql -i input_bad.json -o output_bad.json
Your output should resemble:
Test failed: Expected <900, {T_ID=90, NAME=ninety}> with timestamp=17000 but was <90, {T_ID=90, NAME=ninety}> with timestamp=17000