Important

You are viewing documentation for an older version of Confluent Platform. For the latest, click here.

Writing Streaming Queries Against Apache Kafka® Using KSQL and Confluent Control Center

You can use KSQL in Confluent Control Center to write streaming queries against messages in Kafka.

Prerequisites:

  • Confluent Platform is installed and running. This installation includes a Kafka broker, KSQL, Control Center, ZooKeeper, Schema Registry, REST Proxy, and Kafka Connect.
  • If you installed Confluent Platform via TAR or ZIP, navigate into the installation directory. The paths and commands used throughout this tutorial assume that you are in this installation directory.
  • Java: Minimum version 1.8. Install Oracle Java JRE or JDK >= 1.8 on your local machine

Create Topics and Produce Data

Create and produce data to the Kafka topics pageviews and users. These steps use the KSQL datagen that is included Confluent Platform.

  1. Create the pageviews topic and produce data using the data generator. The following example continuously generates data with a value in DELIMITED format.

    $ <path-to-confluent>/bin/ksql-datagen quickstart=pageviews format=delimited topic=pageviews maxInterval=500
    
  2. Produce Kafka data to the users topic using the data generator. The following example continuously generates data with a value in JSON format.

    $ <path-to-confluent>/bin/ksql-datagen quickstart=users format=json topic=users maxInterval=100
    

Tip

You can also produce Kafka data using the kafka-console-producer CLI provided with Confluent Platform.

Launch the KSQL CLI

To launch the CLI, run the following command. It will route the CLI logs to the ./ksql_logs directory, relative to your current directory. By default, the CLI will look for a KSQL Server running at http://localhost:8088.

$ LOG_DIR=./ksql_logs <path-to-confluent>/bin/ksql

Inspect Topics By Using Control Center

  1. Open your browser to http://localhost:9021. Confluent Control Center opens, showing the System health view.

  2. In the navigation menu, click Topics to view the pageviews and users topics that you created previously.

    Screenshot of Confluent Control Center showing the Topics page

Inspect Topics By Using KSQL in Control Center

  1. In the navigation menu, click KSQL to open the KSQL Editor.

  2. In the editing window, use the SHOW TOPICS statement to see the available topics on the Kafka cluster. Click Run to start the query.

    SHOW TOPICS;
    
    Screenshot of Confluent Control Center showing the KSQL Editor
  3. In the Query Results window, scroll to the bottom to view the pageviews and users topics that you created previously. Your output should resemble:

    {
      "name": "pageviews",
      "registered": false,
      "replicaInfo": [
        1
      ],
      "consumerCount": 0,
      "consumerGroupCount": 0
    },
    {
      "name": "users",
      "registered": false,
      "replicaInfo": [
        1
      ],
      "consumerCount": 0,
      "consumerGroupCount": 0
    }
    

    The "registered": false indicator means that you haven’t created a stream or table on top of these topics, so you can’t write streaming queries against them yet.

  4. In the editing window, use the PRINT TOPIC statement to inspect the records in the users topic. Click Run to start the query.

    PRINT 'users' FROM BEGINNING;
    

    Your output should resemble:

    Screenshot of the KSQL SHOW TOPIC statement in Confluent Control Center
  5. The query continues until you end it explicitly. Click Stop to end the query.

Create a Stream and Table

To write streaming queries against the pageviews and users topics, register the the topics with KSQL as a stream and a table. You can use the CREATE STREAM and CREATE TABLE statements in the KSQL Editor, or you can use the Control Center UI .

These examples query records from the pageviews and users topics using the following schema.

ER diagram showing a pageviews stream and a users table with a common userid column

Create a Stream in the KSQL Editor

You can create a stream or table by using the CREATE STREAM and CREATE TABLE statements in KSQL Editor, just like you use them in the KSQL CLI.

  1. Copy the following code into the editing window and click Run.

    CREATE STREAM pageviews_original (viewtime bigint, userid varchar, pageid varchar) WITH
    (kafka_topic='pageviews', value_format='DELIMITED');
    

    Your output should resemble:

    Screenshot of the KSQL CREATE STREAM statement in Confluent Control Center
  2. In the editing window, use the SHOW TOPICS statement to inspect the status of the pageviews topic. Click Run to start the query.

    SHOW TOPICS;
    
  3. In the Query Results window, scroll to the bottom to view the pageviews topic. Your output should resemble:

    {
      "name": "pageviews",
      "registered": true,
      "replicaInfo": [
        1
      ],
      "consumerCount": 0,
      "consumerGroupCount": 0
    },
    

    The "registered": true indicator means that you have registered the topic and you can write streaming queries against it.

Create a Table in the Control Center UI

Confluent Control Center guides you through the process of registering a topic as a stream or a table.

  1. In the KSQL Editor, navigate to Tables and click Add a table. The Create a KSQL Table dialog opens.

    Screenshot of the Create a KSQL Table wizard in Confluent Control Center
  2. Click users to fill in the details for the table. KSQL infers the table schema and displays the field names and types from the topic. You need to choose a few more settings.

    • In the How are your messages encoded? dropdown, select JSON.
    • In the Key dropdown, select userid.
  3. Click Save Table to create a table on the the users topic.

    Screenshot of the Create a KSQL Table wizard in Confluent Control Center
  4. The KSQL Editor opens with a suggested query. Click Run to display the query results.

    Screenshot of a KSQL SELECT query in Confluent Control Center

    The Query Results pane displays query status information, like Messages/sec, and it shows the fields that the query returns.

  5. The query continues until you end it explicitly. Click Stop to end the query.

Write Persistent Queries

With the pageviews topic registered as a stream, and the users topic registered as a table, you can write streaming queries that run until you end them with the TERMINATE statement.

  1. Copy the following code into the editing window and click Run.

    CREATE STREAM pageviews_enriched AS
    SELECT users.userid AS userid, pageid, regionid, gender
    FROM pageviews_original
    LEFT JOIN users
      ON pageviews_original.userid = users.userid;
    

    Your output should resemble:

    Screenshot of the KSQL CREATE STREAM AS SELECT statement in Confluent Control Center
  2. To inspect your persistent queries, navigate to the Running Queries page, which shows details about the pageviews_enriched stream that you created in the previous query.

    Screenshot of the KSQL Running Queries page in Confluent Control Center
  3. Click Explain to see the schema and query properties for the persistent query.

Monitor Persistent Queries

You can monitor your persistent queries visually by using Confluent Control Center.

  1. In the navigation menu, click Data streams and find the consumer group for the pageviews_enriched query, which is named _confluent-ksql-default_query_CSAS_PAGEVIEWS_ENRICHED_0.

  2. Change the time scale from Last 4 hours to Last 30 minutes.

    Your output should resemble:

    Screenshot of the Data Streams page in Confluent Control Center

    The graph is shaded red, because in the time since you started ksql-datagen, records have accumulated in the pageviews and users topics, with no consumer groups to consume them. The green bar on the right indicates that the pageviews_enriched query has recently started consuming records.

  3. In the navigation menu, click Consumer lag and find the consumer group for the pageviews_enriched query, which is named _confluent-ksql-default_query_CSAS_PAGEVIEWS_ENRICHED_0. This view shows how well your persistent query is keeping up with the incoming data.

    Screenshot of the Consumer Lag page in Confluent Control Center

Query Properties

You can assign properties in the KSQL Editor before you run your queries.

  1. In the navigation menu, click KSQL to open the KSQL Editor.

  2. Click Query properties and set the auto.offset.reset field to Earliest.

  3. Copy the following code into the editing window and click Run.

    CREATE STREAM pageviews_female AS
    SELECT * FROM pageviews_enriched
    WHERE gender = 'FEMALE';
    

    The pageviews_female stream starts with the earliest record in the pageviews topic, which means that it consumes all of the available records from the beginning.

  4. Confirm that the auto.offset.reset property was applied to the pageviews_female stream. In the navigation menu, click Data streams and find the consumer group for the pageviews_female stream, which is named _confluent-ksql-default_query_CSAS_PAGEVIEWS_FEMALE_1.

    Screenshot of the Data Streams page in Confluent Control Center

    The graph is shaded green, because all of the records were consumed when the pageviews_female stream started.