Writing Streaming Queries Against Kafka Using KSQL (Docker)

This tutorial demonstrates a simple workflow using KSQL to write streaming queries against messages in Kafka in a Docker environment.

To get started, you must start a Kafka cluster, including ZooKeeper and a Kafka broker. KSQL will then query messages from this Kafka cluster. KSQL is installed in the Confluent Platform by default.

Prerequisites:

Download the Tutorial and Start KSQL

  1. Clone the Confluent KSQL repository.

    $ git clone https://github.com/confluentinc/ksql.git
    $ cd ksql
    
  2. Switch to the correct Confluent Platform release branch:

    $ git checkout 4.1.0-post
    
  3. Navigate to the KSQL repository docs/tutorials/ directory and launch the tutorial in Docker. Depending on your network speed, this may take up to 5-10 minutes.

    $ cd docs/tutorials/
    $ docker-compose up -d
    
  4. From the host machine, start KSQL CLI on the container.

    $ docker-compose exec ksql-cli ksql http://ksql-server:8088
    
                      ===========================================
                      =        _  __ _____  ____  _             =
                      =       | |/ // ____|/ __ \| |            =
                      =       | ' /| (___ | |  | | |            =
                      =       |  <  \___ \| |  | | |            =
                      =       | . \ ____) | |__| | |____        =
                      =       |_|\_\_____/ \___\_\______|       =
                      =                                         =
                      =  Streaming SQL Engine for Apache Kafka® =
                      ===========================================
    
    Copyright 2018 Confluent Inc.
    
    CLI v4.1.0, Server v4.1.0 located at http://localhost:8088
    
    Having trouble? Type 'help' (case-insensitive) for a rundown of how things work!
    
    ksql>
    

Note

The 4.1.0 ksql-cli image will display version 4.1.1-SNAPSHOT due to a cosmetic bug.

Create a Stream and Table

These examples query messages from Kafka topics called pageviews and users using the following schemas:

../../../_images/ksql-quickstart-schemas.jpg
  1. Create a stream pageviews_original from the Kafka topic pageviews, specifying the value_format of DELIMITED.

    Describe the new STREAM. Notice that KSQL created additional columns called ROWTIME, which corresponds to the Kafka message timestamp, and ROWKEY, which corresponds to the Kafka message key.

    ksql> CREATE STREAM pageviews_original (viewtime bigint, userid varchar, pageid varchar) WITH \
    (kafka_topic='pageviews', value_format='DELIMITED');
    

    Your output should resemble:

     Message
    ---------------
     Stream created
    ---------------
    

    Tip

    You can run DESCRIBE pageviews_original; to describe the stream.

  2. Create a table users_original from the Kafka topic users, specifying the value_format of JSON.

    ksql> CREATE TABLE users_original (registertime BIGINT, gender VARCHAR, regionid VARCHAR, userid VARCHAR) WITH \
    (kafka_topic='users', value_format='JSON', key = 'userid');
    

    Your output should resemble:

     Message
    ---------------
     Table created
    ---------------
    

    Tip

    You can run DESCRIBE users_original; to describe the table.

  3. Optional: Show all streams and tables.

    ksql> SHOW STREAMS;
    
     Stream Name              | Kafka Topic              | Format
    -----------------------------------------------------------------
     PAGEVIEWS_ORIGINAL       | pageviews                | DELIMITED
    
    ksql> SHOW TABLES;
    
     Table Name        | Kafka Topic       | Format    | Windowed
    --------------------------------------------------------------
     USERS_ORIGINAL    | users             | JSON      | false
    

Write Queries

These examples write queries using KSQL.

Note: By default KSQL reads the topics for streams and tables from the latest offset.

  1. Use SELECT to create a query that returns data from a STREAM. This query includes the LIMIT keyword to limit the number of rows returned in the query result. Note that exact data output may vary because of the randomness of the data generation.

    ksql> SELECT pageid FROM pageviews_original LIMIT 3;
    

    Your output should resemble:

    Page_24
    Page_73
    Page_78
    LIMIT reached for the partition.
    Query terminated
    
  2. Create a persistent query by using the CREATE STREAM keywords to precede the SELECT statement. The results from this query are written to the PAGEVIEWS_ENRICHED Kafka topic. The following query enriches the pageviews STREAM by doing a LEFT JOIN with the users_original TABLE on the user ID.

    ksql> CREATE STREAM pageviews_enriched AS SELECT users_original.userid AS userid, pageid, regionid, gender \
    FROM pageviews_original LEFT JOIN users_original ON pageviews_original.userid = users_original.userid;
    

    Your output should resemble:

     Message
    ----------------------------
     Stream created and running
    ----------------------------
    

    Tip

    You can run DESCRIBE pageviews_enriched; to describe the stream.

  3. Use SELECT to view query results as they come in. To stop viewing the query results, press <ctrl-c>. This stops printing to the console but it does not terminate the actual query. The query continues to run in the underlying KSQL application.

    ksql> SELECT * FROM pageviews_enriched;
    

    Your output should resemble:

    1519746861328 | User_4 | User_4 | Page_58 | Region_5 | OTHER
    1519746861794 | User_9 | User_9 | Page_94 | Region_9 | MALE
    1519746862164 | User_1 | User_1 | Page_90 | Region_7 | FEMALE
    ^CQuery terminated
    
  4. Create a new persistent query where a condition limits the streams content, using WHERE. Results from this query are written to a Kafka topic called PAGEVIEWS_FEMALE.

    ksql> CREATE STREAM pageviews_female AS SELECT * FROM pageviews_enriched WHERE gender = 'FEMALE';
    

    Your output should resemble:

     Message
    ----------------------------
     Stream created and running
    ----------------------------
    

    Tip

    You can run DESCRIBE pageviews_female; to describe the stream.

  5. Create a new persistent query where another condition is met, using LIKE. Results from this query are written to the pageviews_enriched_r8_r9 Kafka topic.

    ksql> CREATE STREAM pageviews_female_like_89 WITH (kafka_topic='pageviews_enriched_r8_r9', \
    value_format='DELIMITED') AS SELECT * FROM pageviews_female WHERE regionid LIKE '%_8' OR regionid LIKE '%_9';
    

    Your output should resemble:

     Message
    ----------------------------
     Stream created and running
    ----------------------------
    
  6. Create a new persistent query that counts the pageviews for each region and gender combination in a tumbling window of 30 seconds when the count is greater than 1. Results from this query are written to the PAGEVIEWS_REGIONS Kafka topic in the Avro format. KSQL will register the Avro schema with the configured schema registry when it writes the first message to the PAGEVIEWS_REGIONS topic.

    ksql> CREATE TABLE pageviews_regions WITH (value_format='avro') AS SELECT gender, regionid , COUNT(*) AS numusers \
    FROM pageviews_enriched WINDOW TUMBLING (size 30 second) GROUP BY gender, regionid HAVING COUNT(*) > 1;
    

    Your output should resemble:

     Message
    ---------------------------
     Table created and running
    ---------------------------
    

    Tip

    You can run DESCRIBE pageviews_regions; to describe the table.

  7. Optional: View results from the above queries using SELECT.

    ksql> SELECT gender, regionid, numusers FROM pageviews_regions LIMIT 5;
    

    Your output should resemble:

    FEMALE | Region_6 | 3
    FEMALE | Region_1 | 4
    FEMALE | Region_9 | 6
    MALE | Region_8 | 2
    OTHER | Region_5 | 4
    LIMIT reached for the partition.
    Query terminated
    ksql>
    
  8. Optional: Show all persistent queries.

    ksql> SHOW QUERIES;
    

    Your output should resemble:

    Query ID                      | Kafka Topic              | Query String
    ----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------
    CTAS_PAGEVIEWS_REGIONS        | PAGEVIEWS_REGIONS        | CREATE TABLE pageviews_regions WITH (value_format='avro') AS SELECT gender, regionid , COUNT(*) AS numusers FROM pageviews_female WINDOW TUMBLING (size 30 second) GROUP BY gender, regionid HAVING COUNT(*) > 1;
    CSAS_PAGEVIEWS_FEMALE         | PAGEVIEWS_FEMALE         | CREATE STREAM pageviews_female AS SELECT users_original.userid AS userid, pageid, regionid, gender FROM pageviews_original LEFT JOIN users_original ON pageviews_original.userid = users_original.userid WHERE gender = 'FEMALE';
    CSAS_PAGEVIEWS_FEMALE_LIKE_89 | pageviews_enriched_r8_r9 | CREATE STREAM pageviews_female_like_89 WITH (kafka_topic='pageviews_enriched_r8_r9', value_format='DELIMITED') AS SELECT * FROM pageviews_female WHERE regionid LIKE '%_8' OR regionid LIKE '%_9';
    ----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------
    

Terminate and Exit

KSQL

Important: Queries will continuously run as KSQL applications until they are manually terminated. Exiting KSQL does not terminate persistent queries.

  1. From the output of SHOW QUERIES; identify a query ID you would like to terminate. For example, if you wish to terminate query ID CTAS_PAGEVIEWS_REGIONS:

    ksql> TERMINATE CTAS_PAGEVIEWS_REGIONS;
    
  2. Run this command to exit the KSQL CLI.

    ksql> exit
    

Appendix

The following instructions in the Appendix are not required to run the quick start. They are optional steps to produce extra topic data and verify the environment.

Produce more topic data

The Compose file automatically runs a data generator that continuously produces data to two Kafka topics pageviews and users. No further action is required if you want to use just the data available. You can return to the main KSQL quick start to start querying the data in these two topics.

However, if you want to produce additional data, you can use any of the following methods.

  • Produce Kafka data with the Kafka command line kafka-console-producer. The following example generates data with a value in DELIMITED format.

    $ docker-compose exec kafka kafka-console-producer --topic t1 --broker-list kafka:29092  --property parse.key=true --property key.separator=:
    

    Your data input should resemble this.

    key1:v1,v2,v3
    key2:v4,v5,v6
    key3:v7,v8,v9
    key1:v10,v11,v12
    
  • Produce Kafka data with the Kafka command line kafka-console-producer. The following example generates data with a value in JSON format.

    $ docker-compose exec kafka kafka-console-producer --topic t2 --broker-list kafka:29092  --property parse.key=true --property key.separator=:
    

    Your data input should resemble this.

    key1:{"id":"key1","col1":"v1","col2":"v2","col3":"v3"}
    key2:{"id":"key2","col1":"v4","col2":"v5","col3":"v6"}
    key3:{"id":"key3","col1":"v7","col2":"v8","col3":"v9"}
    key1:{"id":"key1","col1":"v10","col2":"v11","col3":"v12"}
    

Verify your environment

The next three steps are optional verification steps to ensure your environment is properly setup.

  1. Verify that six Docker containers were created.

    $ docker-compose ps
    

    Your output should resemble this. Take note of the Up state.

            Name                        Command               State                           Ports
    -------------------------------------------------------------------------------------------------------------------------
    quickstart_kafka_1                    /etc/confluent/docker/run        Up      0.0.0.0:29092->29092/tcp, 0.0.0.0:9092->9092/tcp
    quickstart_ksql-cli_1                 perl -e while(1){ sleep 99 ...   Up
    quickstart_ksql-datagen-pageviews_1   bash -c echo Waiting for K ...   Up
    quickstart_ksql-datagen-users_1       bash -c echo Waiting for K ...   Up
    quickstart_schema-registry_1          /etc/confluent/docker/run        Up      0.0.0.0:8081->8081/tcp
    quickstart_zookeeper_1                /etc/confluent/docker/run        Up      2181/tcp, 2888/tcp, 0.0.0.0:32181->32181/tcp, 3888/tcp
    
  2. The docker-compose file already runs a data generator that pre-populates two Kafka topics pageviews and users with mock data. Verify that the data generator created two Kafka topics, including pageviews and users.

    $ docker-compose exec kafka kafka-topics --zookeeper zookeeper:32181 --list
    

    Your output should resemble this.

    _confluent-metrics
    _schemas
    pageviews
    users
    
  3. Use the kafka-console-consumer to view a few messages from each topic. The topic pageviews has a key that is a mock time stamp and a value that is in DELIMITED format. The topic users has a key that is the user ID and a value that is in Json format.

    $ docker-compose exec kafka kafka-console-consumer --topic pageviews --bootstrap-server kafka:29092 --from-beginning --max-messages 3 --property print.key=true
    

    Your output should resemble this.

    1491040409254    1491040409254,User_5,Page_70
    1488611895904    1488611895904,User_8,Page_76
    1504052725192    1504052725192,User_8,Page_92
    
    $ docker-compose exec kafka kafka-console-consumer --topic users --bootstrap-server kafka:29092 --from-beginning --max-messages 3 --property print.key=true
    

    Your output should resemble this.

    User_2   {"registertime":1509789307038,"gender":"FEMALE","regionid":"Region_1","userid":"User_2"}
    User_6   {"registertime":1498248577697,"gender":"OTHER","regionid":"Region_8","userid":"User_6"}
    User_8   {"registertime":1494834474504,"gender":"MALE","regionid":"Region_5","userid":"User_8"}
    

Next steps

Try the end-to-end Clickstream Analysis demo, which shows how to build an application that performs real-time user analytics.