Writing Streaming Queries Against Apache Kafka® Using KSQL and Confluent Control Center¶
You can use KSQL in Confluent Control Center to write streaming queries against messages in Kafka.
Prerequisites:
- Confluent Platform is installed and running. This installation includes a Kafka broker, KSQL, Control Center, ZooKeeper, Schema Registry, REST Proxy, and Connect.
- If you installed Confluent Platform via TAR or ZIP, navigate into the installation directory. The paths and commands used throughout this tutorial assume that you are in this installation directory.
- Consider installing the Confluent CLI to start a local installation of Confluent Platform.
- Java: Minimum version 1.8. Install Oracle Java JRE or JDK >= 1.8 on your local machine
Create Topics and Produce Data¶
Create and produce data to the Kafka topics pageviews
and users
. These steps use the KSQL datagen that is included
Confluent Platform.
Create the
pageviews
topic and produce data using the data generator. The following example continuously generates data with a value in DELIMITED format.$ <path-to-confluent>/bin/ksql-datagen quickstart=pageviews format=delimited topic=pageviews maxInterval=500
Produce Kafka data to the
users
topic using the data generator. The following example continuously generates data with a value in JSON format.$ <path-to-confluent>/bin/ksql-datagen quickstart=users format=json topic=users maxInterval=100
Tip
You can also produce Kafka data using the kafka-console-producer
CLI provided with Confluent Platform.
Launch the KSQL CLI¶
To launch the CLI, run the following command. It will route the CLI logs to the ./ksql_logs
directory, relative to
your current directory. By default, the CLI will look for a KSQL Server running at http://localhost:8088
.
$ LOG_DIR=./ksql_logs <path-to-confluent>/bin/ksql
Inspect Topics By Using Control Center¶
Open your browser to
http://localhost:9021
. Confluent Control Center opens, showing the Home page for your clusters. In the navigation bar, click the cluster that you want to use with KSQL.In the navigation menu, click Topics to view the
pageviews
andusers
topics that you created previously.
Inspect Topics By Using KSQL in Control Center¶
In the cluster submenu, click KSQL to open the KSQL clusters page, and click KSQL to open the KSQL Editor on the default application.
In the editing window, use the SHOW TOPICS statement to see the available topics on the Kafka cluster. Click Run to start the query.
SHOW TOPICS;
In the Query Results window, scroll to the bottom to view the
pageviews
andusers
topics that you created previously. Your output should resemble:{ "name": "pageviews", "registered": false, "replicaInfo": [ 1 ], "consumerCount": 0, "consumerGroupCount": 0 }, { "name": "users", "registered": false, "replicaInfo": [ 1 ], "consumerCount": 0, "consumerGroupCount": 0 }
The
"registered": false
indicator means that you haven’t created a stream or table on top of these topics, so you can’t write streaming queries against them yet.In the editing window, use the PRINT TOPIC statement to inspect the records in the
users
topic. Click Run to start the query.PRINT 'users' FROM BEGINNING;
Your output should resemble:
The query continues until you end it explicitly. Click Stop to end the query.
Create a Stream and Table¶
To write streaming queries against the pageviews
and users
topics,
register the the topics with KSQL as a stream and a table. You can use the
CREATE STREAM and CREATE TABLE statements in the KSQL Editor, or you can use
the Control Center UI .
These examples query records from the pageviews
and users
topics using
the following schema.

Create a Stream in the KSQL Editor¶
You can create a stream or table by using the CREATE STREAM and CREATE TABLE statements in KSQL Editor, just like you use them in the KSQL CLI.
Copy the following code into the editing window and click Run.
CREATE STREAM pageviews_original (viewtime bigint, userid varchar, pageid varchar) WITH (kafka_topic='pageviews', value_format='DELIMITED');
Your output should resemble:
In the editing window, use the SHOW TOPICS statement to inspect the status of the
pageviews
topic. Click Run to start the query.SHOW TOPICS;
In the Query Results window, scroll to the bottom to view the
pageviews
topic. Your output should resemble:{ "name": "pageviews", "registered": true, "replicaInfo": [ 1 ], "consumerCount": 0, "consumerGroupCount": 0 },
The
"registered": true
indicator means that you have registered the topic and you can write streaming queries against it.
Create a Table in the Control Center UI¶
Confluent Control Center guides you through the process of registering a topic as a stream or a table.
In the KSQL Editor, navigate to Tables and click Add a table. The Create a KSQL Table dialog opens.
Click users to fill in the details for the table. KSQL infers the table schema and displays the field names and types from the topic. You need to choose a few more settings.
- In the Encoding dropdown, select JSON.
- In the Key dropdown, select userid.
Click Save Table to create a table on the the
users
topic.The KSQL Editor opens with a suggested query.
The Query Results pane displays query status information, like Messages/sec, and it shows the fields that the query returns.
The query continues until you end it explicitly. Click Stop to end the query.
Write Persistent Queries¶
With the pageviews
topic registered as a stream, and the users
topic
registered as a table, you can write streaming queries that run until you
end them with the TERMINATE statement.
Copy the following code into the editing window and click Run.
CREATE STREAM pageviews_enriched AS SELECT users.userid AS userid, pageid, regionid, gender FROM pageviews_original LEFT JOIN users ON pageviews_original.userid = users.userid;
Your output should resemble:
To inspect your persistent queries, navigate to the Running Queries page, which shows details about the
pageviews_enriched
stream that you created in the previous query.Click Explain to see the schema and query properties for the persistent query.
Monitor Persistent Queries¶
You can monitor your persistent queries visually by using Confluent Control Center.
In the cluster submenu, click Consumers and find the consumer group for the
pageviews_enriched
query, which is named_confluent-ksql-default_query_CSAS_PAGEVIEWS_ENRICHED_0
.Click Consumption to see the rate that the
pageviews_enriched
query is consuming records.Change the time scale from Last 4 hours to Last 30 minutes.
Your output should resemble:
In the navigation menu, click Consumer lag and find the consumer group for the
pageviews_enriched
query, which is named_confluent-ksql-default_query_CSAS_PAGEVIEWS_ENRICHED_0
. This view shows how well your persistent query is keeping up with the incoming data.
Query Properties¶
You can assign properties in the KSQL Editor before you run your queries.
In the cluster submenu, click KSQL to open the KSQL clusters page, and click KSQL to open the KSQL Editor on the default application.
Click Add query properties and set the
auto.offset.reset
field to Earliest.Copy the following code into the editing window and click Run.
CREATE STREAM pageviews_female AS SELECT * FROM pageviews_enriched WHERE gender = 'FEMALE';
The
pageviews_female
stream starts with the earliest record in thepageviews
topic, which means that it consumes all of the available records from the beginning.Confirm that the
auto.offset.reset
property was applied to thepageviews_female
stream. In the cluster submenu, click Consumers and find the consumer group for thepageviews_female
stream, which is named_confluent-ksql-default_query_CSAS_PAGEVIEWS_FEMALE_1
.Click Consumption to see the rate that the
pageviews_female
query is consuming records.The graph is at 100 percent, because all of the records were consumed when the
pageviews_female
stream started.
View streams and tables¶
You can see all of your persistent queries, streams, and tables in a single, unified view.
Click KSQl Editor and find the All available streams and tables pane on the right side of the page,
Click KSQL_PROCESSING_LOG to open the processing log stream. The schema for the stream is displayed, including nested data structures.
Download selected records¶
You can download records that you select in the query results window as a JSON file.
Copy the following code into the editing window and click Run.
SELECT * FROM PAGEVIEWS_FEMALE;
In the query results window, select some records and click Download.