Write Streaming Queries Against Apache Kafka® Using ksqlDB and Confluent Control Center

You can use ksqlDB in Confluent Control Center to write streaming queries against messages in Kafka.


  • Confluent Platform is installed and running. This installation includes a Kafka broker, ksqlDB, Control Center, ZooKeeper, Schema Registry, REST Proxy, and Connect.
  • If you installed Confluent Platform using TAR or ZIP, navigate into the installation directory. The paths and commands used throughout this tutorial assume that you are in this installation directory, indicated as $CONFLUENT_HOME.
  • Consider installing the Confluent CLI to start a local installation of Confluent Platform.
  • Java: Minimum version 1.8. Install Oracle Java JRE or JDK >= 1.8 on your local machine

Create Topics and Produce Data

Create and produce data to the Kafka topics pageviews and users. These steps use the ksqlDB datagen tool that’s included with Confluent Platform.

  1. Open a new terminal window and run the folowing command to create the pageviews topic and produce data using the data generator. The following example continuously generates data in DELIMITED format.

    $CONFLUENT_HOME/bin/ksql-datagen quickstart=pageviews format=avro topic=pageviews msgRate=5
  2. Open another terminal window and run the following command to produce Kafka data to the users topic using the data generator. The following example continuously generates data with in DELIMITED format.

    $CONFLUENT_HOME/bin/ksql-datagen quickstart=users format=avro topic=users msgRate=1


You can also produce Kafka data using the kafka-console-producer CLI provided with Confluent Platform.

Launch the ksqlDB CLI

Open a new terminal window and run the following command to set the LOG_DIR environment variable and launch the ksqlDB CLI.

LOG_DIR=./ksql_logs $CONFLUENT_HOME/bin/ksql

This command routes the CLI logs to the ./ksql_logs directory, relative to your current directory. By default, the CLI looks for a ksqlDB Server running at http://localhost:8088.

Inspect Topics By Using Control Center

  1. Open your browser to http://localhost:9021/. Confluent Control Center opens, showing the Home page for your clusters. In the navigation bar, click the cluster that you want to use with ksqlDB.

  2. In the navigation menu, click Topics to view the pageviews and users topics that you created previously.

    Screenshot of Confluent Control Center showing the Topics page

Inspect Topics By Using ksqlDB in Control Center

  1. In the navigation menu, click ksqlDB to open the ksqlDB clusters page, and click the listed ksqlDB application to open the ksqlDB Editor.

    Screenshot of Confluent Control Center showing the ksqldDB application list
  2. In the editing window, use the SHOW TOPICS statement to see the available topics on the Kafka cluster. Click Run query to start the query.

    Screenshot of Confluent Control Center showing the |ksqldb| Editor
  3. In the Query Results window, scroll to the bottom to view the pageviews and users topics that you created previously. Your output should resemble:

      "name": "pageviews",
      "replicaInfo": [
      "name": "users",
      "replicaInfo": [

    To see the count of consumers and consumer groups, use the SHOW TOPICS EXTENDED command.

  4. In the editing window, use the PRINT TOPIC statement to inspect the records in the users topic. Click Run query to start the query.


    Your output should resemble:

    Screenshot of the ksqlDB SHOW TOPIC statement in Confluent Control Center
  5. The query continues until you end it explicitly. Click Stop to end the query.

Create a Stream and Table

To write streaming queries against the pageviews and users topics, register the topics with ksqlDB as a stream and a table. You can use the CREATE STREAM and CREATE TABLE statements in the ksqlDB Editor, or you can use the Control Center UI .

These examples query records from the pageviews and users topics using the following schema.

ER diagram showing a pageviews stream and a users table with a common userid column

Create a stream in the ksqlDB editor

You can create a stream or table by using the CREATE STREAM and CREATE TABLE statements in ksqlDB Editor, just like you use them in the ksqlDB CLI.

  1. Copy the following code into the editing window and click Run.

    CREATE STREAM pageviews_original (viewtime bigint, userid varchar, pageid varchar) WITH
    (kafka_topic='pageviews', value_format='DELIMITED');

    Your output should resemble:

    Screenshot of the ksqlDB CREATE STREAM statement in Confluent Control Center
  2. Click Streams to inspect the pageviews_original stream that you created.

    Screenshot of the ksqlDB Streams page in Confluent Control Center

Create a table in the ksqlDB editor

Use the CREATE TABLE statement to register a table on a topic.

  1. Copy the following code into the editor window and click Run query.

    CREATE TABLE users (userid VARCHAR PRIMARY KEY, registertime BIGINT, id VARCHAR, regionid VARCHAR, gender VARCHAR) WITH

    Your output should resemble:

    Screenshot of the ksqlDB CREATE TABLE statement in Confluent Control Center
  2. In the editor window, use a SELECT query to inspect records in the users table.


    Your output should resemble:

    Screenshot of a ksqlDB SELECT query on a table in Confluent Cloud
  3. The query continues until you end it explicitly. Click Stop to end the query.

Write Persistent Queries

With the pageviews topic registered as a stream, and the users topic registered as a table, you can write streaming queries that run until you end them with the TERMINATE statement.

  1. Copy the following code into the editing window and click Run.

    CREATE STREAM pageviews_enriched AS
    SELECT users.userid AS userid, pageid, regionid, gender
    FROM pageviews_original
    LEFT JOIN users
      ON pageviews_original.userid = users.userid

    Your output should resemble:

    Screenshot of the ksqlDB CREATE STREAM AS SELECT statement in Confluent Control Center
  2. To inspect your persistent queries, navigate to the Running Queries page, which shows details about the pageviews_enriched stream that you created in the previous query.

    Screenshot of the ksqlDB Running Queries page in Confluent Control Center
  3. Click Explain to see the schema and query properties for the persistent query.

    Screenshot of the ksqlDB Explain Query page in Confluent Control Center

Monitor Persistent Queries

You can monitor your persistent queries visually by using Confluent Control Center.

  1. In the navigation menu, click Consumers and find the consumer group for the pageviews_enriched query, which has a name that starts with _confluent-ksql-default_query_CSAS_PAGEVIEWS_ENRICHED_. The Consumer lag page opens.

    Screenshot of the Consumer Lag page in Confluent Control Center
  2. Click Consumption to see the rate that the pageviews_enriched query is consuming records. Click Last four hours and from the list, select Last 30 minutes and Apply.

    Your output should resemble:

    Screenshot of the Consumption page in Confluent Control Center

Query Properties

You can assign properties in the ksqlDB Editor before you run your queries.

  1. In the navigation menu, click ksqlDB to open the ksqlDB clusters page, and click the default application to open the ksqlDB Editor.

  2. Click Add query properties and set the auto.offset.reset field to Earliest.

  3. Copy the following code into the editing window and click Run.

    CREATE STREAM pageviews_female AS
    SELECT * FROM pageviews_enriched
    WHERE gender = 'FEMALE'
    Screenshot showing how to set a query property in the ksqlDB Editor page

    The pageviews_female stream starts with the earliest record in the pageviews topic, which means that it consumes all of the available records from the beginning.

  4. Confirm that the auto.offset.reset property was applied to the pageviews_female stream. In the navigation menu, click Consumers and find the consumer group for the pageviews_female stream, which has a name that starts with _confluent-ksql-default_query_CSAS_PAGEVIEWS_FEMALE_.

    Click Consumption to see the rate that the pageviews_female query is consuming records. Ensure that the time scale is set to Last 30 minutes.

    Screenshot of the Consumption page in Confluent Control Center

    The graph is at 100 percent, because all of the records were consumed when the pageviews_female stream started.

View streams and tables

You can see all of your persistent queries, streams, and tables in a single, unified view.

  1. In the navigation menu, click ksqlDB to open the ksqlDB clusters page, and click the default application to open the ksqlDB Editor.

  2. On the right side of the page, find the All available streams and tables section.

  3. Click KSQL_PROCESSING_LOG to open the stream. The schema for the stream is displayed, including nested data structures.

    Screenshot of the unified ksqlDB streams and tables pane in Confluent Control Center

Download selected records

You can download records that you select in the query results window as a JSON file.

  1. Copy the following code into the editing window and click Run.

  2. In the query results window, click the pause button and select some records and click Download.

    Screenshot showing how to download SQL query results to a JSON file in Confluent Control Center

Use Flow View to inspect your topology

Control Center enables you to see how events flow through your ksqlDB application.

  1. In the ksqlDB page, click Flow.

    Screenshot showing the ksqlDB Flow View in Confluent Control Center
  2. Click the PAGEVIEWS_ENRICHED node in the graph to see details about the PAGEVIEWS_ENRICHED stream, including current messages and schema.

    Screenshot showing the ksqlDB Flow View in Confluent Control Center
  3. Click the other nodes in the graph to see details about the topology of your ksqlDB application.


Run shutdown and cleanup tasks.

  • You can stop each of the running producers (sending data to the users and pageviews topics) by using Ctrl+C in their respective command windows.
  • To stop Confluent Platform, type confluent local services stop.
  • If you would like to clear out existing data (topics, schemas, and messages) before starting again with another test, type confluent local destroy.