Create KSQL Streaming Queries in Confluent Cloud

This quick start gets you up and running with KSQL on Confluent Cloud. It shows how to query KSQL streams and tables and write streaming queries on cloud-hosted KSQL.

Important

In Confluent Cloud Quick Start, you ran a small JavaScript file to produce data to the users topic in your Confluent Cloud cluster.

In this quick start, you create a KSQL table on the users topic. Also, you create a pageviews topic and register a KSQL stream on it.

With the stream and table, you create a pageviews_enriched persistent query that joins the streaming data from both sources into a single, combined stream that continues until you stop it.

Create a KSQL Application in Confluent Cloud

To write queries against KSQL streams and tables, create a new KSQL application in Confluent Cloud.

  1. In the the navigation menu, click KSQL.

    Screenshot of Confluent Cloud showing the KSQL Add Application page
  2. Click Add Application, and in the Application name field, enter ksql-app1. Click Continue. The KSQL app should have a PROVISIONING status. You must set ACLs to allow it to come up using the Confluent Cloud CLI.

    Screenshot of Confluent Cloud showing the KSQL Add Application wizard
  3. Configure the Confluent Cloud ACL for KSQL. For more information about Confluent Cloud ACLs, see Access Control Lists.

    1. In the Confluent Cloud CLI, find the KSQL cluster ID.

      ccloud ksql app list
      

      Your output should resemble:

             Id      |   Name    | Topic Prefix |   Kafka   | Storage |                         Endpoint                          | Status
      +--------------+-----------+--------------+-----------+---------+-----------------------------------------------------------+--------+
        lksqlc-lg0g3 | ksql-app1 | pksqlc-emko7 | lkc-emmox |     100 | https://pksqlc-emko7.us-central1.gcp.confluent.cloud:8088 | UP
      
    2. Grant KSQL access to the users topic.

      ccloud ksql app configure-acls <ksql-cluster-id> users --cluster <kafka-cluster-id>
      
  4. The new KSQL application appears in the Application list.

    Screenshot of Confluent Cloud showing the KSQL Applications page

    Note

    It may take a few minutes to provision the KSQL cluster. When KSQL is ready, its Status changes from Provisioning to Up.

Create the Pageviews Topic

In Confluent Cloud Quick Start, you created the users topic by using the Confluent Cloud CLI. In this step, you create the pageviews topic by using the Confluent Cloud UI.

  1. In the navigation bar, click Topics.

  2. Click Add a topic, and in the New Topic Page, enter "pageviews" for the topic name.

  3. Click Create with defaults to create the pageviews topic.

  4. In the Confluent Cloud CLI, grant KSQL access to the pageviews topic.

    ccloud ksql app configure-acls <ksql-cluster-id> pageviews --cluster <kafka-cluster-id>
    

Produce Pageview Data to Confluent Cloud

Use a JavaScript file to produce data to the pageviews topic, the same way that you produced data to the users topic in Confluent Cloud Quick Start.

  1. Copy and paste the following code into a file and save as generate-pageviews.js.

    const userIdBase = 'User_'
    const pageIdBase = 'Page_'
    const maxUserId = 100
    const maxPageId = 1000
    const interval = 1000;
    
    function generateUserId() {
        return generateId(userIdBase, maxUserId);
    }
    
    function generatePageId() {
        return generateId(pageIdBase, maxPageId);
    }
    
    function generateId(idBase, maxIdValue) {
        var id = Math.floor(Math.random() * maxIdValue);
        var idString = idBase + id;
        return idString;
    }
    
    function sleep(ms) {
      return new Promise(resolve=>{
          setTimeout(resolve,ms)
      })
    }
    
    const json = {};
    
    async function begin() {
      while(true) {
    
        // Get a user ID.
        userId = generateUserId();
    
        // Concatenate a delimited record of the form <key>:<value>.
        // - The record key is userId, delimited by a ':' character.
        // - The record value is a comma-delimited list of fields.
        //
        //   <userId>:<view-time>,<userId>,<pageId>
        //
        // Example record:
        //   User_77:1567552403986,User_77,Page_132
    
        record = userId + ":";
        record += new Date().valueOf()  + ",";
        record += userId + ",";
        record += generatePageId();
        console.log(record);
    
        await sleep(interval);
      }
    }
    begin();
    
  2. Open a new terminal and run the following command to pipe the output of the generate-pageviews.js file to the pageviews topic in Confluent Cloud. Leave this running in your terminal. Press Ctrl+C to end.

    node generate-pageviews.js | ccloud kafka topic produce pageviews
    

    Your output should resemble:

    Starting Kafka Producer. ^C to exit
    
  3. If you stopped the generate-users.js script previously, start it again:

    node generate-users.js | ccloud kafka topic produce users
    

Two producers are running and sending event streams to topics in your Kafka cluster in Confluent Cloud.

Create a KSQL Stream and KSQL Table

To write streaming queries against the pageviews and users topics, register the the topics with KSQL as a stream and a table. Use the CREATE STREAM and CREATE TABLE statements in the KSQL Editor.

These examples query records from the pageviews and users topics using the following schema.

ER diagram showing a pageviews stream and a users table with a common userid column

Create a Stream in the KSQL Editor

You can create a stream or table by using the CREATE STREAM and CREATE TABLE statements in the KSQL Editor, just like how you use them in the KSQL CLI.

  1. Copy the following code into the editor window and click Run.

    CREATE STREAM pageviews_original (viewtime bigint, userid varchar, pageid varchar) WITH
    (kafka_topic='pageviews', value_format='DELIMITED');
    

    Your output should resemble:

    Screenshot of the KSQL CREATE STREAM statement in Confluent Cloud
  2. In the editor window, run the SHOW TOPICS statement to inspect the status of the pageviews topic. Click Run to start the query.

    SHOW TOPICS;
    
  3. In the Query Results window, scroll to view the pageviews topic. Your output should resemble:

    {
      "@type": "kafka_topics",
      "statementText": "SHOW TOPICS;",
      "topics": [
        {
          "name": "pageviews",
          "registered": true,
          "replicaInfo": [
            3,
            3,
            3,
            3,
            3,
            3
          ],
          "consumerCount": 0,
          "consumerGroupCount": 0
        }
    }
    

    The "registered": true indicator means that you have registered the topic and you can write streaming queries against it.

  4. In the editor window, use a SELECT query to inspect records in the pageviews stream.

    SELECT * FROM PAGEVIEWS_ORIGINAL;
    

    Your output should resemble:

    Screenshot of a KSQL SELECT query in Confluent Cloud
  5. The query continues until you end it explicitly. Click Stop to end the query.

Create a Table in Confluent Cloud KSQL

Confluent Cloud KSQL guides you through the process of registering a topic as a stream or a table.

  1. In the KSQL Editor, navigate to Tables and click Add a table. The Create a KSQL Table dialog opens.

    Screenshot of the Create a KSQL Table wizard in Confluent Cloud
  2. Click users to fill in the details for the table.

    • In the Encoding dropdown, select DELIMITED.
    • In the Field(s) you'd like to include in your TABLE, add these fields:
      • registertime with type BIGINT
      • userid with type VARCHAR
      • regionid with type VARCHAR
      • gender with type VARCHAR
    • KSQL tables have an optional a key field. In the Key dropdown, select userid.
    Screenshot of the Create a KSQL Table wizard in Confluent Cloud
  3. Click Save TABLE to create a KSQL table on the the users topic.

  4. In the tables list, find the USERS table, click the ellipses (...), and select Query to open the KSQL Editor with a suggested query.

    Screenshot of the KSQL Table context menu in Confluent Cloud
  5. The query is automatically run and the query results are displayed.

    Screenshot of a KSQL SELECT query in Confluent Cloud

    The Query Results pane displays query status information, like Messages/sec, and it shows the fields that the query returns.

  6. The query continues until you end it explicitly. Click Stop to end the query.

Write Persistent Queries

With the pageviews topic registered as a stream, and the users topic registered as a table, you can write streaming queries that run until you end them with the TERMINATE statement.

  1. Copy the following code into the editing window and click Run.

    CREATE STREAM pageviews_enriched AS
    SELECT users.userid AS userid, pageid, regionid, gender
    FROM pageviews_original
    LEFT JOIN users
      ON pageviews_original.userid = users.userid;
    

    Your output should resemble:

    Screenshot of the KSQL CREATE STREAM AS SELECT statement in Confluent Cloud
  2. To inspect your persistent queries, navigate to the Running Queries page, which shows details about the pageviews_enriched stream that you created in the previous query.

    Screenshot of the KSQL Running Queries page in Confluent Cloud
  3. Click Explain to see the schema and query properties for the persistent query.

    Screenshot of the KSQL Explain Query dialog in Confluent Cloud

Monitor Persistent Queries

You can monitor your persistent queries visually by using Confluent Cloud.

In the navigation menu, click Consumers and find the group that corresponds with your pageviews_enriched stream, for example _confluent-ksql-pksqlc-lgwpnquery_CSAS_PAGEVIEWS_ENRICHED_2. This view shows how well your persistent query is keeping up with the incoming data.

Screenshot of the Consumer Lag page in Confluent Cloud