Important

You are viewing documentation for an older version of Confluent Platform. For the latest, click here.

KSQL Testing Tool

Use the KSQL testing tool to test a set of KSQL statements. The KSQL testing tool is a command line utility that enables testing KSQL statements without requiring any infrastructure, like Apache Kafka® and KSQL clusters. The KSQL testing tool is a great way to design your KSQL pipeline and ensure the expected results are generated. You can collaborate on designing your KSQL statements by sharing the test files. To test a set of KSQL statements, you provide three files, one file containing the KSQL statements and two JSON files containing the input records and the expected output records.

NAME
        ksql-test-runner - The KSQL testing tool

SYNOPSIS
        ksql-test-runner {--input-file | -i} <inputFile>
                {--output-file | -o} <outputFile>
                {--sql-file | -s} <statementsFile>

OPTIONS
        --input-file <inputFile>, -i <inputFile>
            A JSON file containing the input records.

            This option may occur a maximum of 1 time


        --output-file <outputFile>, -o <outputFile>
            A JSON file containing the expected output records.

            This option may occur a maximum of 1 time


        --sql-file <statementsFile>, -s <statementsFile>
            A SQL file containing KSQL statements to be tested.

            This option may occur a maximum of 1 time

Provide required jar files

Starting in version 5.3.0, ksql-test-runner throws a ClassNotFoundException, because the the following jars aren’t packaged in KSQL.

  • ksql-functional-tests/hamcrest-all-1.3.jar
  • ksql-functional-tests/junit-4.12.jar

The workaround is to copy the required jars into the ksql classpath at confluent-5.4.11/share/java/ksql. For example, use the following commands for the classpath /opt/confluent/confluent-5.4.11/share/java/ksql.

cd /opt/confluent/confluent-5.4.11/share/java/ksql
wget https://repo1.maven.org/maven2/junit/junit/4.12/junit-4.12.jar
wget https://repo1.maven.org/maven2/org/hamcrest/hamcrest-all/1.3/hamcrest-all-1.3.jar

Test File Structure

Statements File

The statements file contains the KSQL statements to test. The following are the supported statements in the testing tool:

  • CREATE STREAM
  • CREATE TABLE
  • CREATE STREAM AS SELECT
  • CREATE TABLE AS SELECT
  • INSERT INTO SELECT

Here is a sample statements file for the testing tool:

CREATE STREAM orders (ORDERUNITS double) WITH (kafka_topic='test_topic', value_format='JSON');
CREATE STREAM S1 AS SELECT ORDERUNITS, CASE WHEN orderunits < 2.0 THEN 'small' WHEN orderunits < 4.0 THEN 'medium' ELSE 'large' END AS case_resault FROM orders EMIT CHANGES;

Input File

The input file is a JSON file with one array field named “inputs”. Each element in the array is the representation of input messages. The input messages array cannot be empty. A message should have a topic, a key, a value and a timestamp. The following is a sample input file for the above test:

{
  "inputs": [
          {"topic": "test_topic", "timestamp": 0, "value": {"ORDERUNITS": 2.0}, "key": 0},
          {"topic": "test_topic", "timestamp": 0, "value": {"ORDERUNITS": 4.0}, "key": 100},
          {"topic": "test_topic", "timestamp": 0, "value": {"ORDERUNITS": 6.0 }, "key": 101},
          {"topic": "test_topic", "timestamp": 0, "value": {"ORDERUNITS": 3.0}, "key": 101},
          {"topic": "test_topic", "timestamp": 0, "value": {"ORDERUNITS": 1.0}, "key": 101}
        ]
}

Output File

The output file is a JSON file with an array field named “outputs”. Similar to the input file, each element in the array is the representation of the expected output messages. The output messages array cannot be empty. An expected output message should have a topic, a key, a value and a timestamp. The following is a sample expected output file for the above test:

{
  "outputs": [
          {"topic": "S1", "timestamp": 0, "value": {"ORDERUNITS": 2.0, "CASE_RESAULT": "medium"}, "key": 0},
          {"topic": "S1", "timestamp": 0, "value": {"ORDERUNITS": 4.0, "CASE_RESAULT": "large"}, "key": 100},
          {"topic": "S1", "timestamp": 0, "value": {"ORDERUNITS": 6.0, "CASE_RESAULT": "large"}, "key": 101},
          {"topic": "S1", "timestamp": 0, "value": {"ORDERUNITS": 3.0, "CASE_RESAULT": "medium"}, "key": 101},
          {"topic": "S1", "timestamp": 0, "value": {"ORDERUNITS": 1.0, "CASE_RESAULT": "small"},"key": 101}
        ]
}

In the input and output files you can have messages with windowed keys. Such messages can be generated by windowed aggretations in KSQL. To specify a window for a message you can add a “window” field to the message. A window field has three fields:

  • start: the start time for the window.
  • end: the end time for the window.
  • type: the type of the window. A window type can be time or session.

The following is an example expected output file with records that have a window field:

{
   "outputs": [
     {"topic": "S2", "key": 0, "value": "0,0", "timestamp": 0, "window": {"start": 0, "end": 30000, "type": "time"}},
     {"topic": "S2", "key": 0, "value": "0,5", "timestamp": 10000, "window": {"start": 0, "end": 30000, "type": "time"}},
     {"topic": "S2", "key": 100, "value": "100,100", "timestamp": 30000, "window": {"start": 30000, "end": 60000, "type": "time"}},
     {"topic": "S2", "key": 100, "value": "100,100", "timestamp": 45000, "window": {"start": 30000, "end": 60000, "type": "time"}}
   ]
}

Currently, in the input files you can only have record with session window types.

The testing tool will indicate the success or failure of a test by printing the corresponding message. The following is the result of a successful test:

ksql-test-runner -s statements.sql -i input.json -o output.json

Your output should resemble:

Test passed!

If a test fails, the testing tool will indicate the failure along with the cause. Here is an example of the output for a failing test:

ksql-test-runner -s statements_bad.sql -i input_bad.json -o output_bad.json

Your output should resemble:

Test failed: Expected <900, {T_ID=90, NAME=ninety}> with timestamp=17000 but was <90, {T_ID=90, NAME=ninety}> with timestamp=17000

Query Execution in the KSQL Testing Tool

To use the KSQL testing tool effectively, you need to understand the query execution logic in the testing tool. Although the final results should be deterministic, the intermediate results in KSQL queries (Kafka Streams Apps) may vary based on several factors, such as order of reading input or config properties like the producer buffer size. In order to make the composition of output for the test cases simpler, the KSQL testing tool executes queries in a predictable way. Consider the following guidance when you prepare the output for your tests.

Input Consumption

Before processing the next input message, the testing tool processes input messages for each query one-by-one and writes the generated message(s) for each input message into the result topic. This means that for the queries running in the testing tool, we have the same behavior as when cache.max.bytes.buffering = 0. This is especially important in aggregate queries where we may not see some of the intermediate results in real executions because of buffering, while during execution of the testing tool every possible intermediate result is created.

Kafka Cluster

The KSQL testing tool doesn’t use a real Kafka cluster and simulates the behavior of a cluster with single broker for the KSQL queries. This means that the testing tool ignores configuration settings for the input and output topics, like the number of partitions or replicas.

Processing Order

The testing tool processes the statements in the order that you provide them. So, for a given statement, only the statements before it can potentially affect its results. This is a different behavior than KSQ cluster where statements that are submitted later can affect the output of a query. For example, consider the following set of statements:

CREATE STREAM orders (ORDERUNITS double) WITH (kafka_topic='test_topic', value_format='JSON');
INSERT INTO orders VALUES(10.0);
INSERT INTO orders VALUES(15.0);
INSERT INTO orders VALUES(20.0);
CREATE STREAM S1 AS SELECT ORDERUNITS, CASE WHEN orderunits < 2.0 THEN 'small' WHEN orderunits < 4.0 THEN 'medium' ELSE 'large' END AS case_resault FROM orders EMIT CHANGES;
INSERT INTO orders VALUES(25.0);
INSERT INTO orders VALUES(30.0);

If you run the above statements in a real KSQL cluster, you see one result generated for each INSERT INTO statements, and you have five messages in the output. On the other hand, if you run the previous statements in the testing tool, only the INSERT INTO statements before the CSAS query generate results, and the testing tool won’t run the query for the messages generated by the INSERT INTO statements after the CSAS statement.

Another important detail about processing order to keep in mind is the order in which the input data for a query is processed. For a given query, the testing tool first processes the input messages provided in the input file. After fully processing these messages, the testing tool inspects the source topics for the query in the simulated Kafka cluster and processes any messages in these topics. For the JOIN queries where we have more than one source topic, the testing tool first processes the left-side topic and then processes the right hand side topic.