Important
You are viewing documentation for an older version of Confluent Platform. For the latest, click here.
Writing Streaming Queries Against Apache Kafka® Using KSQL (Docker)¶
This tutorial demonstrates a simple workflow using KSQL to write streaming queries against messages in Kafka in a Docker environment.
To get started, you must start a Kafka cluster, including ZooKeeper and a Kafka broker. KSQL will then query messages from this Kafka cluster. KSQL is installed in the Confluent Platform by default.
- Prerequisites
- Docker
- Docker version 1.11 or later is installed and running.
- Docker Compose is installed. It is installed by default with Docker for Mac and Windows.
- Docker memory resource is allocated minimally at 8 GB.
- Git
- Docker
Download the Tutorial and Start KSQL¶
Clone the Confluent KSQL repository.
git clone https://github.com/confluentinc/ksql.git cd ksql
Switch to the correct Confluent Platform release branch:
git checkout 4.1.0-post
Navigate to the KSQL repository
docs/tutorials/
directory and launch the tutorial in Docker. Depending on your network speed, this may take up to 5-10 minutes.cd docs/tutorials/ docker-compose up -d
From the host machine, start KSQL CLI on the container.
docker-compose exec ksql-cli ksql http://ksql-server:8088
=========================================== = _ __ _____ ____ _ = = | |/ // ____|/ __ \| | = = | ' /| (___ | | | | | = = | < \___ \| | | | | = = | . \ ____) | |__| | |____ = = |_|\_\_____/ \___\_\______| = = = = Streaming SQL Engine for Apache Kafka® = =========================================== Copyright 2018 Confluent Inc. CLI v4.1.3, Server v4.1.3 located at http://localhost:8088 Having trouble? Type 'help' (case-insensitive) for a rundown of how things work! ksql>
Note
The 4.1.0 ksql-cli
image will display version 4.1.1-SNAPSHOT due to a cosmetic bug.
Create a Stream and Table¶
These examples query messages from Kafka topics called pageviews
and users
using the following schemas:
Create a stream
pageviews_original
from the Kafka topicpageviews
, specifying thevalue_format
ofDELIMITED
.Describe the new STREAM. Notice that KSQL created additional columns called
ROWTIME
, which corresponds to the Kafka message timestamp, andROWKEY
, which corresponds to the Kafka message key.CREATE STREAM pageviews_original (viewtime bigint, userid varchar, pageid varchar) WITH \ (kafka_topic='pageviews', value_format='DELIMITED');
Your output should resemble:
Message --------------- Stream created ---------------
Tip
You can run
DESCRIBE pageviews_original;
to describe the stream.Create a table
users_original
from the Kafka topicusers
, specifying thevalue_format
ofJSON
.CREATE TABLE users_original (registertime BIGINT, gender VARCHAR, regionid VARCHAR, userid VARCHAR) WITH \ (kafka_topic='users', value_format='JSON', key = 'userid');
Your output should resemble:
Message --------------- Table created ---------------
Tip
You can run
DESCRIBE users_original;
to describe the table.Optional: Show all streams and tables.
ksql> SHOW STREAMS; Stream Name | Kafka Topic | Format ----------------------------------------------------------------- PAGEVIEWS_ORIGINAL | pageviews | DELIMITED ksql> SHOW TABLES; Table Name | Kafka Topic | Format | Windowed -------------------------------------------------------------- USERS_ORIGINAL | users | JSON | false
Write Queries¶
These examples write queries using KSQL.
Note: By default KSQL reads the topics for streams and tables from the latest offset.
Use
SELECT
to create a query that returns data from a STREAM. This query includes theLIMIT
keyword to limit the number of rows returned in the query result. Note that exact data output may vary because of the randomness of the data generation.SELECT pageid FROM pageviews_original LIMIT 3;
Your output should resemble:
Page_24 Page_73 Page_78 LIMIT reached for the partition. Query terminated
Create a persistent query by using the
CREATE STREAM
keywords to precede theSELECT
statement. The results from this query are written to thePAGEVIEWS_ENRICHED
Kafka topic. The following query enriches thepageviews_original
STREAM by doing aLEFT JOIN
with theusers_original
TABLE on the user ID.CREATE STREAM pageviews_enriched AS SELECT users_original.userid AS userid, pageid, regionid, gender \ FROM pageviews_original LEFT JOIN users_original ON pageviews_original.userid = users_original.userid;
Your output should resemble:
Message ---------------------------- Stream created and running ----------------------------
Tip
You can run
DESCRIBE pageviews_enriched;
to describe the stream.Use
SELECT
to view query results as they come in. To stop viewing the query results, press<ctrl-c>
. This stops printing to the console but it does not terminate the actual query. The query continues to run in the underlying KSQL application.SELECT * FROM pageviews_enriched;
Your output should resemble:
1519746861328 | User_4 | User_4 | Page_58 | Region_5 | OTHER 1519746861794 | User_9 | User_9 | Page_94 | Region_9 | MALE 1519746862164 | User_1 | User_1 | Page_90 | Region_7 | FEMALE ^CQuery terminated
Create a new persistent query where a condition limits the streams content, using
WHERE
. Results from this query are written to a Kafka topic calledPAGEVIEWS_FEMALE
.CREATE STREAM pageviews_female AS SELECT * FROM pageviews_enriched WHERE gender = 'FEMALE';
Your output should resemble:
Message ---------------------------- Stream created and running ----------------------------
Tip
You can run
DESCRIBE pageviews_female;
to describe the stream.Create a new persistent query where another condition is met, using
LIKE
. Results from this query are written to thepageviews_enriched_r8_r9
Kafka topic.CREATE STREAM pageviews_female_like_89 WITH (kafka_topic='pageviews_enriched_r8_r9', \ value_format='DELIMITED') AS SELECT * FROM pageviews_female WHERE regionid LIKE '%_8' OR regionid LIKE '%_9';
Your output should resemble:
Message ---------------------------- Stream created and running ----------------------------
Create a new persistent query that counts the pageviews for each region and gender combination in a tumbling window of 30 seconds when the count is greater than 1. Results from this query are written to the
PAGEVIEWS_REGIONS
Kafka topic in the Avro format. KSQL will register the Avro schema with the configured Schema Registry when it writes the first message to thePAGEVIEWS_REGIONS
topic.CREATE TABLE pageviews_regions WITH (value_format='avro') AS SELECT gender, regionid , COUNT(*) AS numusers \ FROM pageviews_enriched WINDOW TUMBLING (size 30 second) GROUP BY gender, regionid HAVING COUNT(*) > 1;
Your output should resemble:
Message --------------------------- Table created and running ---------------------------
Tip
You can run
DESCRIBE pageviews_regions;
to describe the table.Optional: View results from the above queries using
SELECT
.SELECT gender, regionid, numusers FROM pageviews_regions LIMIT 5;
Your output should resemble:
FEMALE | Region_6 | 3 FEMALE | Region_1 | 4 FEMALE | Region_9 | 6 MALE | Region_8 | 2 OTHER | Region_5 | 4 LIMIT reached for the partition. Query terminated ksql>
Optional: Show all persistent queries.
SHOW QUERIES;
Your output should resemble:
Query ID | Kafka Topic | Query String ---------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------- CTAS_PAGEVIEWS_REGIONS | PAGEVIEWS_REGIONS | CREATE TABLE pageviews_regions WITH (value_format='avro') AS SELECT gender, regionid , COUNT(*) AS numusers FROM pageviews_female WINDOW TUMBLING (size 30 second) GROUP BY gender, regionid HAVING COUNT(*) > 1; CSAS_PAGEVIEWS_FEMALE | PAGEVIEWS_FEMALE | CREATE STREAM pageviews_female AS SELECT users_original.userid AS userid, pageid, regionid, gender FROM pageviews_original LEFT JOIN users_original ON pageviews_original.userid = users_original.userid WHERE gender = 'FEMALE'; CSAS_PAGEVIEWS_FEMALE_LIKE_89 | pageviews_enriched_r8_r9 | CREATE STREAM pageviews_female_like_89 WITH (kafka_topic='pageviews_enriched_r8_r9', value_format='DELIMITED') AS SELECT * FROM pageviews_female WHERE regionid LIKE '%_8' OR regionid LIKE '%_9'; ----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------
Terminate and Exit¶
KSQL¶
Important: Queries will continuously run as KSQL applications until they are manually terminated. Exiting KSQL does not terminate persistent queries.
From the output of
SHOW QUERIES;
identify a query ID you would like to terminate. For example, if you wish to terminate query IDCTAS_PAGEVIEWS_REGIONS
:TERMINATE CTAS_PAGEVIEWS_REGIONS;
Run the
exit
command to leave the KSQL CLI.ksql> exit Exiting KSQL.
Appendix¶
The following instructions in the Appendix are not required to run the quick start. They are optional steps to produce extra topic data and verify the environment.
Produce more topic data¶
The Compose file automatically runs a data generator that continuously
produces data to two Kafka topics pageviews
and users
. No
further action is required if you want to use just the data available.
You can return to the main KSQL quick
start to start querying the
data in these two topics.
However, if you want to produce additional data, you can use any of the following methods.
Produce Kafka data with the Kafka command line
kafka-console-producer
. The following example generates data with a value in DELIMITED format.docker-compose exec kafka kafka-console-producer --topic t1 --broker-list kafka:29092 --property parse.key=true --property key.separator=:
Your data input should resemble this.
key1:v1,v2,v3 key2:v4,v5,v6 key3:v7,v8,v9 key1:v10,v11,v12
Produce Kafka data with the Kafka command line
kafka-console-producer
. The following example generates data with a value in JSON format.docker-compose exec kafka kafka-console-producer --topic t2 --broker-list kafka:29092 --property parse.key=true --property key.separator=:
Your data input should resemble this.
key1:{"id":"key1","col1":"v1","col2":"v2","col3":"v3"} key2:{"id":"key2","col1":"v4","col2":"v5","col3":"v6"} key3:{"id":"key3","col1":"v7","col2":"v8","col3":"v9"} key1:{"id":"key1","col1":"v10","col2":"v11","col3":"v12"}
Verify your environment¶
The next three steps are optional verification steps to ensure your environment is properly setup.
Verify that six Docker containers were created.
docker-compose ps
Your output should resemble this. Take note of the
Up
state.Name Command State Ports ------------------------------------------------------------------------------------------------------------------------- quickstart_kafka_1 /etc/confluent/docker/run Up 0.0.0.0:29092->29092/tcp, 0.0.0.0:9092->9092/tcp quickstart_ksql-cli_1 perl -e while(1){ sleep 99 ... Up quickstart_ksql-datagen-pageviews_1 bash -c echo Waiting for K ... Up quickstart_ksql-datagen-users_1 bash -c echo Waiting for K ... Up quickstart_schema-registry_1 /etc/confluent/docker/run Up 0.0.0.0:8081->8081/tcp quickstart_zookeeper_1 /etc/confluent/docker/run Up 2181/tcp, 2888/tcp, 0.0.0.0:32181->32181/tcp, 3888/tcp
The docker-compose file already runs a data generator that pre-populates two Kafka topics
pageviews
andusers
with mock data. Verify that the data generator created two Kafka topics, includingpageviews
andusers
.docker-compose exec kafka kafka-topics --zookeeper zookeeper:32181 --list
Your output should resemble this.
_confluent-metrics _schemas pageviews users
Use the
kafka-console-consumer
to view a few messages from each topic. The topicpageviews
has a key that is a mock time stamp and a value that is inDELIMITED
format. The topicusers
has a key that is the user ID and a value that is inJson
format.docker-compose exec kafka kafka-console-consumer --topic pageviews --bootstrap-server kafka:29092 --from-beginning --max-messages 3 --property print.key=true
Your output should resemble this.
1491040409254 1491040409254,User_5,Page_70 1488611895904 1488611895904,User_8,Page_76 1504052725192 1504052725192,User_8,Page_92
docker-compose exec kafka kafka-console-consumer --topic users --bootstrap-server kafka:29092 --from-beginning --max-messages 3 --property print.key=true
Your output should resemble this.
User_2 {"registertime":1509789307038,"gender":"FEMALE","regionid":"Region_1","userid":"User_2"} User_6 {"registertime":1498248577697,"gender":"OTHER","regionid":"Region_8","userid":"User_6"} User_8 {"registertime":1494834474504,"gender":"MALE","regionid":"Region_5","userid":"User_8"}
Next steps¶
Try the end-to-end Clickstream Analysis demo, which shows how to build an application that performs real-time user analytics.