Write streaming queries against Apache Kafka® using ksqlDB (Local)¶
This tutorial demonstrates a simple workflow using ksqlDB to write streaming queries against messages in Kafka.
To get started, you must start a Kafka cluster, including ZooKeeper and a Kafka broker. ksqlDB will then query messages from this Kafka cluster. ksqlDB is installed in Confluent Platform by default.
Prerequisites:
- Confluent Platform is installed and running. This installation includes a Kafka broker, ksqlDB, Control Center, ZooKeeper, Schema Registry, REST Proxy, and Connect.
- If you installed Confluent Platform using TAR or ZIP, navigate into the installation directory. The paths and commands used throughout this tutorial assume that you are in this installation directory.
- Consider installing the Confluent CLI to start a local installation of Confluent Platform.
- Java: Minimum version 1.8. Install Oracle Java JRE or JDK >= 1.8 on your local machine
Create Topics and Produce Data¶
Create and produce data to the Kafka topics pageviews
and users
. These
steps use the ksqlDB datagen tool that’s included with Confluent Platform.
Open a new terminal window and run the folowing command to create the
pageviews
topic and produce data using the data generator. The following example continuously generates data in DELIMITED format.$CONFLUENT_HOME/bin/ksql-datagen quickstart=pageviews format=json topic=pageviews msgRate=5
Open another terminal window and run the following command to produce Kafka data to the
users
topic using the data generator. The following example continuously generates data with in DELIMITED format.$CONFLUENT_HOME/bin/ksql-datagen quickstart=users format=avro topic=users msgRate=1
Tip
You can also produce Kafka data using the kafka-console-producer
CLI provided with Confluent Platform.
Launch the ksqlDB CLI¶
Open a new terminal window and run the following command to set the LOG_DIR
environment variable and launch the ksqlDB CLI.
LOG_DIR=./ksql_logs $CONFLUENT_HOME/bin/ksql
This command routes the CLI logs to the ./ksql_logs
directory, relative to
your current directory. By default, the CLI looks for a ksqlDB Server running
at http://localhost:8088
.
Important
By default ksqlDB attempts to store its logs in a directory called logs
that is relative to the location
of the ksql
executable. For example, if ksql
is installed at /usr/local/bin/ksql
, then it would
attempt to store its logs in /usr/local/logs
. If you are running ksql
from the default Confluent Platform
location, $CONFLUENT_HOME/bin
, you must override this default behavior by using the LOG_DIR
variable.
After ksqlDB is started, your terminal should resemble this.
===========================================
= _ _ ____ ____ =
= | | _____ __ _| | _ \| __ ) =
= | |/ / __|/ _` | | | | | _ \ =
= | <\__ \ (_| | | |_| | |_) | =
= |_|\_\___/\__, |_|____/|____/ =
= |_| =
= Event Streaming Database purpose-built =
= for stream processing apps =
===========================================
Copyright 2017-2021 Confluent Inc.
CLI v6.2.15, Server v6.2.15 located at http://localhost:8088
Server Status: RUNNING
Having trouble? Type 'help' (case-insensitive) for a rundown of how things work!
ksql>
Inspect Kafka Topics By Using SHOW and PRINT Statements¶
ksqlDB enables inspecting Kafka topics and messages in real time.
- Use the SHOW TOPICS statement to list the available topics in the Kafka cluster.
- Use the PRINT statement to see a topic’s messages as they arrive.
In the ksqlDB CLI, run the following statement:
SHOW TOPICS;
Your output should resemble:
Kafka Topic | Partitions | Partition Replicas
--------------------------------------------------------------
default_ksql_processing_log | 1 | 1
pageviews | 1 | 1
users | 1 | 1
--------------------------------------------------------------
By default, ksqlDB hides internal and system topics. Use the SHOW ALL TOPICS statement to see the full list of topics in the Kafka cluster:
SHOW ALL TOPICS;
Your output should resemble:
Kafka Topic | Partitions | Partition Replicas
---------------------------------------------------------------------------------------------------------------------------------
_confluent-command | 1 | 1
_confluent-controlcenter-... | 2 | 1
...
_confluent-ksql-default__command_topic | 1 | 1
_confluent-license | 1 | 1
_confluent-metrics | 12 | 1
_confluent-monitoring | 2 | 1
_confluent-telemetry-metrics | 12 | 1
_confluent_balancer_api_state | 1 | 1
_confluent_balancer_broker_samples | 32 | 1
_confluent_balancer_partition_samples | 32 | 1
_schemas | 1 | 1
connect-configs | 1 | 1
connect-offsets | 25 | 1
connect-statuses | 5 | 1
default_ksql_processing_log | 1 | 1
pageviews | 1 | 1
users | 1 | 1
---------------------------------------------------------------------------------------------------------------------------------
Note
Your output should show numerous _confluent-controlcenter
topics. These
have been removed for clarity.
Inspect the users
topic by using the PRINT statement:
PRINT users;
Note
The PRINT statement is one of the few case-sensitive commands in ksqlDB, even when the topic name is not quoted.
Your output should resemble:
Key format: KAFKA_STRING
Value format: AVRO
rowtime: 2021/05/06 21:39:36.663 Z, key: User_5, value: {"registertime":1512340535197,"userid":"User_5","regionid":"Region_2","gender":"FEMALE"}, partition: 0
rowtime: 2021/05/06 21:39:37.662 Z, key: User_7, value: {"registertime":1506274681444,"userid":"User_7","regionid":"Region_7","gender":"OTHER"}, partition: 0
rowtime: 2021/05/06 21:39:38.662 Z, key: User_1, value: {"registertime":1497598939522,"userid":"User_1","regionid":"Region_5","gender":"OTHER"}, partition: 0
^CTopic printing ceased
Press CTRL+C to stop printing messages.
Inspect the pageviews
topic by using the PRINT statement:
PRINT pageviews;
Your output should resemble:
Key format: KAFKA_BIGINT or KAFKA_DOUBLE
Value format: JSON or KAFKA_STRING
rowtime: 2021/05/06 21:40:25.843 Z, key: 1620337225843, value: {"viewtime":1620337225843,"userid":"User_6","pageid":"Page_40"}, partition: 0
rowtime: 2021/05/06 21:40:26.044 Z, key: 1620337226044, value: {"viewtime":1620337226044,"userid":"User_7","pageid":"Page_83"}, partition: 0
rowtime: 2021/05/06 21:40:26.243 Z, key: 1620337226243, value: {"viewtime":1620337226243,"userid":"User_1","pageid":"Page_63"}, partition: 0
^CTopic printing ceased
Press CTRL+C to stop printing messages.
For more information, see ksqlDB Syntax Reference.
Create a Stream and Table¶
These examples query messages from Kafka topics called pageviews
and users
using the following schemas:
Create a stream, named
pageviews_original
, from thepageviews
Kafka topic, specifying thevalue_format
ofJSON
.CREATE STREAM pageviews_original (viewtime bigint, userid varchar, pageid varchar) WITH (kafka_topic='pageviews', value_format='JSON');
Your output should resemble:
Message --------------- Stream created ---------------
Tip
You can run
DESCRIBE pageviews_original;
to see the schema for the stream. Notice that ksqlDB created an additional column, namedROWTIME
, which corresponds with the Kafka message timestamp.Create a table, named
users_original
, from theusers
Kafka topic, specifying thevalue_format
ofAVRO
.CREATE TABLE users_original (id VARCHAR PRIMARY KEY) WITH (kafka_topic='users', value_format='AVRO');
Your output should resemble:
Message --------------- Table created ---------------
Tip
You can run
DESCRIBE users_original;
to see the schema for the table.Note
You may have noticed the CREATE TABLE did not define the set of columns like the CREATE STREAM statement did. This is because the value format is Avro, and the DataGen tool publishes the Avro schema to Schema Registry. ksqlDB retrieves the schema from Schema Registry and uses this to build the SQL schema for the table. You may still provide the schema if you wish.
Optional: Show all streams and tables.
ksql> SHOW STREAMS; Stream Name | Kafka Topic | Key Format | Value Format | Windowed ------------------------------------------------------------------------------------------ KSQL_PROCESSING_LOG | default_ksql_processing_log | KAFKA | JSON | false PAGEVIEWS_ORIGINAL | pageviews | KAFKA | JSON | false ------------------------------------------------------------------------------------------ ksql> SHOW TABLES; Table Name | Kafka Topic | Key Format | Value Format | Windowed --------------------------------------------------------------------- USERS_ORIGINAL | users | KAFKA | AVRO | false ---------------------------------------------------------------------
Tip
Notice the
KSQL_PROCESSING_LOG
stream listed in the SHOW STREAMS output? ksqlDB appends messages that describe any issues it encountered while processing your data. If things aren’t working as you expect, check the contents of this stream to see if ksqlDB is encountering data errors.
View your data¶
Use SELECT to create a query that returns data from a TABLE. This query includes the LIMIT keyword to limit the number of rows returned in the query result, and the EMIT CHANGES keywords to indicate we wish to stream results back. This is known as a pull query. for an explanation of the different query types, see Queries. Note that exact data output may vary because of the randomness of the data generation.
SELECT * FROM users_original EMIT CHANGES LIMIT 5;
Your output should resemble:
+---------------+---------------+---------------+---------------+---------------+ |ID |REGISTERTIME |USERID |REGIONID |GENDER | +---------------+---------------+---------------+---------------+---------------+ |User_2 |1502155111606 |User_2 |Region_8 |OTHER | |User_1 |1499783711681 |User_1 |Region_3 |OTHER | |User_9 |1504556621362 |User_9 |Region_5 |FEMALE | |User_6 |1488869543103 |User_6 |Region_4 |OTHER | |User_3 |1512248344223 |User_3 |Region_9 |FEMALE | Limit Reached Query terminated
Note
Push queries on tables output the full history of the table that is stored in the Kafka changelog topic, which means that it outputs historic data, followed by the stream of updates to the table. It is therefore likely that rows with matching
ID
are output as existing rows in the table are updated.View the data in your
pageviews_original
stream by issuing the following push query:SELECT viewtime, userid, pageid FROM pageviews_original emit changes LIMIT 3;
Your output should resemble:
+--------------+--------------+--------------+ |VIEWTIME |USERID |PAGEID | +--------------+--------------+--------------+ |1581078296791 |User_1 |Page_54 | |1581078297792 |User_8 |Page_93 | |1581078298792 |User_6 |Page_26 | Limit Reached Query terminated
Note
By default, push queries on streams output only changes that occur after the query is started, which means that historic data is not included. Run
set 'auto.offset.reset'='earliest';
to update your session properties if you want to see the historic data.
Write Queries¶
These examples write queries using ksqlDB.
Note
By default ksqlDB reads the topics for streams and tables from the latest offset.
Create a query that enriches the
pageviews
data with the user’sgender
andregionid
from theusers
table. The following query enriches thepageviews_original
stream by doing a LEFT JOIN with theusers_original
table on theuserid
column.SELECT users_original.id AS userid, pageid, regionid, gender FROM pageviews_original LEFT JOIN users_original ON pageviews_original.userid = users_original.id EMIT CHANGES LIMIT 5;
Your output should resemble:
+-------------------+-------------------+-------------------+-------------------+ |ID |PAGEID |REGIONID |GENDER | +-------------------+-------------------+-------------------+-------------------+ |User_7 |Page_23 |Region_2 |OTHER | |User_3 |Page_42 |Region_2 |MALE | |User_7 |Page_87 |Region_2 |OTHER | |User_2 |Page_57 |Region_5 |FEMALE | |User_9 |Page_59 |Region_1 |OTHER | Limit Reached Query terminated
Create a persistent query by using the
CREATE STREAM
keywords to precede theSELECT
statement. The results from this query are written to thePAGEVIEWS_ENRICHED
Kafka topic. The following query enriches thepageviews_original
STREAM by doing aLEFT JOIN
with theusers_original
TABLE on the user ID.CREATE STREAM pageviews_enriched AS SELECT users_original.id AS userid, pageid, regionid, gender FROM pageviews_original LEFT JOIN users_original ON pageviews_original.userid = users_original.id EMIT CHANGES;
Your output should resemble:
Message -------------------------------------------------- Created query with ID CSAS_PAGEVIEWS_ENRICHED_33 --------------------------------------------------
Tip
You can run
DESCRIBE pageviews_enriched;
to describe the stream.Use
SELECT
to view query results as they come in. To stop viewing the query results, press Ctrl+C. This stops printing to the console but it does not terminate the actual query. The query continues to run in the underlying ksqlDB application.SELECT * FROM pageviews_enriched EMIT CHANGES;
Your output should resemble:
+---------------------+---------------------+---------------------+---------------------+ |ID |PAGEID |REGIONID |GENDER | +---------------------+---------------------+---------------------+---------------------+ |User_8 |Page_41 |Region_4 |FEMALE | |User_2 |Page_87 |Region_3 |OTHER | |User_3 |Page_84 |Region_8 |FEMALE | ^CQuery terminated
Use Ctrl+C to terminate the query.
Create a new persistent query where a condition limits the streams content, using
WHERE
. Results from this query are written to a Kafka topic namedPAGEVIEWS_FEMALE
.CREATE STREAM pageviews_female AS SELECT * FROM pageviews_enriched WHERE gender = 'FEMALE' EMIT CHANGES;
Your output should resemble:
Message ------------------------------------------------ Created query with ID CSAS_PAGEVIEWS_FEMALE_35 ------------------------------------------------
Tip
You can run
DESCRIBE pageviews_female;
to describe the stream.Create a new persistent query where another condition is met, using LIKE. Results from this query are written to the
pageviews_enriched_r8_r9
Kafka topic.CREATE STREAM pageviews_female_like_89 WITH (kafka_topic='pageviews_enriched_r8_r9') AS SELECT * FROM pageviews_female WHERE regionid LIKE '%_8' OR regionid LIKE '%_9' EMIT CHANGES;
Your output should resemble:
Message -------------------------------------------------------- Created query with ID CSAS_PAGEVIEWS_FEMALE_LIKE_89_37 --------------------------------------------------------
Create a new persistent query that counts the pageviews for each region and gender combination in a tumbling window of 30 seconds when the count is greater than one. Results from this query are written to the
PAGEVIEWS_REGIONS
Kafka topic in the Avro format. ksqlDB registers the Avro schema with the configured Schema Registry when it writes the first message to thePAGEVIEWS_REGIONS
topic.CREATE TABLE pageviews_regions WITH (KEY_FORMAT='json') AS SELECT gender, regionid , COUNT(*) AS numusers FROM pageviews_enriched WINDOW TUMBLING (size 30 second) GROUP BY gender, regionid EMIT CHANGES;
Your output should resemble:
Message ------------------------------------------------- Created query with ID CTAS_PAGEVIEWS_REGIONS_39 -------------------------------------------------
Tip
You can run
DESCRIBE pageviews_regions;
to describe the table.Optional: View results from the above queries by using a push query.
SELECT * FROM pageviews_regions EMIT CHANGES LIMIT 5;
Your output should resemble:
+----------------------+----------------------+----------------------+----------------------+----------------------+ |GENDER |REGIONID |WINDOWSTART |WINDOWEND |NUMUSERS | +----------------------+----------------------+----------------------+----------------------+----------------------+ |OTHER |Region_1 |1620681750000 |1620681780000 |3 | |FEMALE |Region_5 |1620681750000 |1620681780000 |3 | |OTHER |Region_3 |1620681750000 |1620681780000 |5 | |MALE |Region_4 |1620681750000 |1620681780000 |7 | |FEMALE |Region_9 |1620681750000 |1620681780000 |5 | Limit Reached Query terminated
Note
Notice the addition of the WINDOWSTART and WINDOWEND columns. These are available because
pageviews_regions
is aggregating data per 30 second window. ksqlDB automatically adds these system columns for windowed results.Optional: View results from the previous queries by using pull query.
When a CREATE TABLE statement contains a GROUP BY clause, ksqlDB internally builds a table that contains the results of the aggregation. ksqlDB supports pull queries against such aggregation results.
Unlike the push query used in the previous step, which pushes a stream of results to you, pull queries pull a result set and automatically terminate.
Pull queries do not have the EMIT CHANGES clause.
View all of the windows and user counts available for a specific gender and region by using a pull query:
SELECT * FROM pageviews_regions WHERE gender='FEMALE' AND regionid='Region_4';
Your output should resemble:
+----------------------+----------------------+----------------------+----------------------+----------------------+ |GENDER |REGIONID |WINDOWSTART |WINDOWEND |NUMUSERS | +----------------------+----------------------+----------------------+----------------------+----------------------+ |FEMALE |Region_4 |1620681780000 |1620681810000 |17 | |FEMALE |Region_4 |1620681810000 |1620681840000 |19 | |FEMALE |Region_4 |1620681840000 |1620681870000 |19 | Query terminated
Pull queries on windowed tables like
pageviews_regions
also support querying a single window’s result:SELECT NUMUSERS FROM pageviews_regions WHERE gender='FEMALE' AND regionid='Region_4' AND WINDOWSTART=1620681780000;
Note
You must change the value of
WINDOWSTART
in the previous SQL to match one of the window boundaries in your data. Otherwise, no results are returned.Your output should resemble:
+----------+ |NUMUSERS | +----------+ |17 | Query terminated
To query a range of windows:
SELECT WINDOWSTART, WINDOWEND, NUMUSERS FROM pageviews_regions WHERE gender='FEMALE' AND regionid='Region_4' AND 1620681780000 <= WINDOWSTART AND WINDOWSTART <= 1620681840000;
Note
You must change the value of
WINDOWSTART
in the previous SQL to match one of the window boundaries in your data. Otherwise, no results are returned.Your output should resemble:
+--------------------------------------+--------------------------------------+--------------------------------------+ |WINDOWSTART |WINDOWEND |NUMUSERS | +--------------------------------------+--------------------------------------+--------------------------------------+ |1620681780000 |1620681810000 |17 | |1620681810000 |1620681840000 |19 | |1620681840000 |1620681870000 |19 | Query terminated
Optional: Show all persistent queries.
SHOW QUERIES;
Your output should resemble:
Query ID | Query Type | Status | Sink Name | Sink Kafka Topic | Query String ------------------------------------------------------------------------------------------------------------------------ CSAS_PAGEVIEWS_ENRICHED_85 | PERSISTENT | RUNNING:1 | PAGEVIEWS_ENRICHED | PAGEVIEWS_ENRICHED | CREATE STREAM PAGEVIEWS_ENRICHED WITH (KAFKA_TOPIC='PAGEVIEWS_ENRICHED', PARTITIONS=1, REPLICAS=1) AS SELECT USERS_ORIGINAL.ID USERID, PAGEVIEWS_ORIGINAL.PAGEID PAGEID, USERS_ORIGINAL.REGIONID REGIONID, USERS_ORIGINAL.GENDER GENDER FROM PAGEVIEWS_ORIGINAL PAGEVIEWS_ORIGINAL LEFT OUTER JOIN USERS_ORIGINAL USERS_ORIGINAL ON ((PAGEVIEWS_ORIGINAL.USERID = USERS_ORIGINAL.ID)) EMIT CHANGES; CSAS_PAGEVIEWS_FEMALE_LIKE_89_89 | PERSISTENT | RUNNING:1 | PAGEVIEWS_FEMALE_LIKE_89 | pageviews_enriched_r8_r9 | CREATE STREAM PAGEVIEWS_FEMALE_LIKE_89 WITH (KAFKA_TOPIC='pageviews_enriched_r8_r9', PARTITIONS=1, REPLICAS=1) AS SELECT * FROM PAGEVIEWS_FEMALE PAGEVIEWS_FEMALE WHERE ((PAGEVIEWS_FEMALE.REGIONID LIKE '%_8') OR (PAGEVIEWS_FEMALE.REGIONID LIKE '%_9')) EMIT CHANGES; CSAS_PAGEVIEWS_FEMALE_87 | PERSISTENT | RUNNING:1 | PAGEVIEWS_FEMALE | PAGEVIEWS_FEMALE | CREATE STREAM PAGEVIEWS_FEMALE WITH (KAFKA_TOPIC='PAGEVIEWS_FEMALE', PARTITIONS=1, REPLICAS=1) AS SELECT * FROM PAGEVIEWS_ENRICHED PAGEVIEWS_ENRICHED WHERE (PAGEVIEWS_ENRICHED.GENDER = 'FEMALE') EMIT CHANGES; CTAS_PAGEVIEWS_REGIONS_91 | PERSISTENT | RUNNING:1 | PAGEVIEWS_REGIONS | PAGEVIEWS_REGIONS | CREATE TABLE PAGEVIEWS_REGIONS WITH (KAFKA_TOPIC='PAGEVIEWS_REGIONS', KEY_FORMAT='json', PARTITIONS=1, REPLICAS=1) AS SELECT PAGEVIEWS_ENRICHED.GENDER GENDER, PAGEVIEWS_ENRICHED.REGIONID REGIONID, COUNT(*) NUMksql> FROM PAGEVIEWS_ENRICHED PAGEVIEWS_ENRICHED WINDOW TUMBLING ( SIZE 30 SECONDS ) GROUP BY PAGEVIEWS_ENRICHED.GENDER, PAGEVIEWS_ENRICHED.REGIONID EMIT CHANGES; ------------------------------------------------------------------------------------------------------------------------ For detailed information on a Query run: EXPLAIN <Query ID>;
Optional: Examine query run-time metrics and details. Information including the target Kafka topic is available, as well as throughput figures for the messages being processed.
DESCRIBE pageviews_regions EXTENDED;
Your output should resemble:
Name : PAGEVIEWS_REGIONS Type : TABLE Timestamp field : Not set - using <ROWTIME> Key format : KAFKA Value format : AVRO Kafka topic : PAGEVIEWS_REGIONS (partitions: 1, replication: 1) Statement : CREATE TABLE PAGEVIEWS_REGIONS WITH (KAFKA_TOPIC='PAGEVIEWS_REGIONS', PARTITIONS=1, REPLICAS=1, VALUE_FORMAT='avro') AS SELECT PAGEVIEWS_ENRICHED.GENDER GENDER, PAGEVIEWS_ENRICHED.REGIONID REGIONID, COUNT(*) NUMUSERS FROM PAGEVIEWS_ENRICHED PAGEVIEWS_ENRICHED WINDOW TUMBLING ( SIZE 30 SECONDS ) GROUP BY PAGEVIEWS_ENRICHED.GENDER, PAGEVIEWS_ENRICHED.REGIONID EMIT CHANGES; Field | Type --------------------------------------------------------------------- KSQL_COL_0 | VARCHAR(STRING) (primary key) (Window type: TUMBLING) NUMUSERS | BIGINT --------------------------------------------------------------------- Queries that write from this TABLE ----------------------------------- CTAS_PAGEVIEWS_REGIONS_39 (RUNNING) : CREATE TABLE PAGEVIEWS_REGIONS WITH (KAFKA_TOPIC='PAGEVIEWS_REGIONS', PARTITIONS=1, REPLICAS=1, VALUE_FORMAT='avro') AS SELECT PAGEVIEWS_ENRICHED.GENDER GENDER, PAGEVIEWS_ENRICHED.REGIONID REGIONID, COUNT(*) NUMUSERS FROM PAGEVIEWS_ENRICHED PAGEVIEWS_ENRICHED WINDOW TUMBLING ( SIZE 30 SECONDS ) GROUP BY PAGEVIEWS_ENRICHED.GENDER, PAGEVIEWS_ENRICHED.REGIONID EMIT CHANGES; For query topology and execution plan please run: EXPLAIN <QueryId> Local runtime statistics ------------------------ messages-per-sec: 2.89 total-messages: 3648 last-message: 2021-01-27T19:36:11.197Z (Statistics of the local KSQL server interaction with the Kafka topic PAGEVIEWS_REGIONS) Consumer Groups summary: Consumer Group : _confluent-ksql-default_query_CTAS_PAGEVIEWS_REGIONS_39 Kafka topic : PAGEVIEWS_ENRICHED Max lag : 5 Partition | Start Offset | End Offset | Offset | Lag ------------------------------------------------------ 0 | 0 | 7690 | 7685 | 5 ------------------------------------------------------ Kafka topic : _confluent-ksql-default_query_CTAS_PAGEVIEWS_REGIONS_39-Aggregate-GroupBy-repartition Max lag : 5 Partition | Start Offset | End Offset | Offset | Lag ------------------------------------------------------ 0 | 6224 | 6229 | 6224 | 5 ------------------------------------------------------
Using Nested Schemas (STRUCT) in ksqlDB¶
Struct support enables the modeling and access of nested data in Kafka topics, from both JSON and Avro.
Here we’ll use the ksql-datagen
tool to create some sample data
which includes a nested address
field. Run this in a new window, and
leave it running.
$CONFLUENT_HOME/bin/ksql-datagen \
quickstart=orders \
format=avro \
topic=orders \
msgRate=1
From the ksqlDB command prompt, register the topic in ksqlDB:
CREATE STREAM orders
(
ordertime BIGINT,
orderid INT,
itemid STRING,
orderunits DOUBLE,
address STRUCT<city STRING, state STRING, zipcode BIGINT>
)
WITH (KAFKA_TOPIC='orders', VALUE_FORMAT='avro');
Your output should resemble:
Message
----------------
Stream created
----------------
Use the DESCRIBE
function to observe the schema, which includes a
STRUCT
:
DESCRIBE orders;
Your output should resemble:
Field | Type
----------------------------------------------------------------------------------
ORDERTIME | BIGINT
ORDERID | INTEGER
ITEMID | VARCHAR(STRING)
ORDERUNITS | DOUBLE
ADDRESS | STRUCT<CITY VARCHAR(STRING), STATE VARCHAR(STRING), ZIPCODE BIGINT>
----------------------------------------------------------------------------------
For runtime statistics and query details run: DESCRIBE <Stream,Table> EXTENDED;
Query the data by using ->
notation to access the Struct contents:
SELECT ORDERID, ADDRESS->CITY FROM ORDERS EMIT CHANGES LIMIT 5;
Your output should resemble:
+-----------------------------------+-----------------------------------+
|ORDERID |ADDRESS__CITY |
+-----------------------------------+-----------------------------------+
|1188 |City_95 |
|1189 |City_24 |
|1190 |City_57 |
|1191 |City_37 |
|1192 |City_82 |
Limit Reached
Query terminated
INSERT INTO¶
You can use the INSERT INTO syntax to merge the contents of multiple streams. An example of this could be where the same event type is coming from different sources.
Open two new terminals and run two Datagen processes, each writing to a different topic. This simulates order data arriving from a local installation and from a third-party.
Tip
Each of these commands should be run in a separate window. When the exercise is finished, exit them by pressing Ctrl-C.
$CONFLUENT_HOME/bin/ksql-datagen \
quickstart=orders \
format=json \
topic=orders_local \
msgRate=2
$CONFLUENT_HOME/bin/ksql-datagen \
quickstart=orders \
format=json \
topic=orders_3rdparty \
msgRate=2
In the ksqlDB CLI, register the source topic for each:
CREATE STREAM orders_src_local
(
ordertime BIGINT,
orderid INT,
itemid STRING,
orderunits DOUBLE,
address STRUCT<city STRING, state STRING, zipcode BIGINT>
)
WITH (KAFKA_TOPIC='orders_local', VALUE_FORMAT='JSON');
CREATE STREAM orders_src_3rdparty
(
ordertime BIGINT,
orderid INT,
itemid STRING,
orderunits DOUBLE,
address STRUCT<city STRING, state STRING, zipcode BIGINT>
)
WITH (KAFKA_TOPIC='orders_3rdparty', VALUE_FORMAT='JSON');
After each CREATE STREAM statement you should get the message:
Message
----------------
Stream created
----------------
Create the output stream, using the standard CREATE STREAM … AS
syntax. Because multiple sources of data are being joined into a common target,
it is useful to add in lineage information. This can be done by simply including it
as part of the SELECT
:
CREATE STREAM all_orders AS SELECT 'LOCAL' AS SRC, * FROM orders_src_local EMIT CHANGES;
Your output should resemble:
Message
------------------------------------------
Created query with ID CSAS_ALL_ORDERS_71
------------------------------------------
Use the DESCRIBE
command to observe the schema of the target stream.
DESCRIBE all_orders;
Your output should resemble:
Name : ALL_ORDERS
Field | Type
----------------------------------------------------------------------------------
SRC | VARCHAR(STRING)
ORDERTIME | BIGINT
ORDERID | INTEGER
ITEMID | VARCHAR(STRING)
ORDERUNITS | DOUBLE
ADDRESS | STRUCT<CITY VARCHAR(STRING), STATE VARCHAR(STRING), ZIPCODE BIGINT>
----------------------------------------------------------------------------------
For runtime statistics and query details run: DESCRIBE <Stream,Table> EXTENDED;
Add stream of third-party orders into the existing output stream:
INSERT INTO all_orders SELECT '3RD PARTY' AS SRC, * FROM orders_src_3rdparty EMIT CHANGES;
Your output should resemble:
Message
--------------------------------------
Created query with ID INSERTQUERY_73
--------------------------------------
Query the output stream to verify that data from each source is being written to it:
SELECT * FROM all_orders EMIT CHANGES;
Your output should resemble the following. Note that there are messages from both source
topics (denoted by LOCAL
and 3RD PARTY
respectively).
+---------------------+---------------------+---------------------+---------------------+---------------------+---------------------+
|SRC |ORDERTIME |ORDERID |ITEMID |ORDERUNITS |ADDRESS |
+---------------------+---------------------+---------------------+---------------------+---------------------+---------------------+
|3RD PARTY |1491006356222 |1583 |Item_169 |2.091966572094054 |{CITY=City_91, STATE=|
| | | | | |State_51, ZIPCODE=184|
| | | | | |74} |
|LOCAL |1504382324241 |1630 |Item_410 |0.6462658185260942 |{CITY=City_55, STATE=|
| | | | | |State_38, ZIPCODE=372|
| | | | | |44} |
|3RD PARTY |1512567250385 |1584 |Item_357 |7.205193136057381 |{CITY=City_91, STATE=|
| | | | | |State_19, ZIPCODE=457|
| | | | | |45} |
^CQuery terminated
Press Ctrl+C to cancel the SELECT
query and return to the ksqlDB prompt.
Terminate and Exit¶
ksqlDB¶
Note
Persisted queries will continuously run as ksqlDB applications until they are manually terminated. Exiting ksqlDB CLI does not terminate persistent queries.
From the output of
SHOW QUERIES;
identify a query ID you would like to terminate. For example, if you wish to terminate query IDCTAS_PAGEVIEWS_REGIONS_15
:TERMINATE CTAS_PAGEVIEWS_REGIONS_15;
Tip
The actual name of the query running may vary; refer to the output of
SHOW QUERIES;
.Run the
exit
command to leave the ksqlDB CLI.ksql> exit Exiting ksqlDB.
Confluent CLI¶
If you are running Confluent Platform using the CLI, you can stop it with this command.
$CONFLUENT_HOME/bin/confluent local services stop