Create streaming queries in Confluent Cloud ksqlDB

Easily build stream processing applications with a simple and lightweight SQL syntax. Continuously transform, enrich, join together, and aggregate your Kafka events without writing any complex application code. As a fully managed service with a 99.9% uptime SLA, Confluent Cloud ksqlDB eliminates the operational overhead of running and operating infrastructure, empowering you to focus on application development.

This quick start gets you up and running with Confluent Cloud ksqlDB. It shows how to create streams and tables, and how to write streaming queries on cloud-hosted ksqlDB.

Important

In Confluent Cloud Quick Start, you ran a small JavaScript file to produce data to the users topic in your Confluent Cloud cluster.

In this quick start, you create a ksqlDB table on the users topic. Also, you create a pageviews topic and register a ksqlDB stream on it.

With the stream and table, you create a pageviews_enriched persistent query that joins the streaming data from both sources into a single, combined stream that continues until you stop it.

Finally, you inspect the topology of your streaming queries by using the Flow page.

Kafka cluster ID

Some ccloud CLI commands require the ID of the Kafka cluster that your ksqlDB cluster communicates with. To get the Kafka cluster ID, use the following command.

ccloud kafka cluster list

Your output should resemble:

      Id      |       Name        | Provider |   Region    | Durability | Status
+-------------+-------------------+----------+-------------+------------+--------+
    lkc-o350p | My first cluster  | gcp      | us-central1 | LOW        | UP

Create a ksqlDB application in Confluent Cloud

To write queries against streams and tables, create a new ksqlDB application in Confluent Cloud.

  1. In the the navigation menu, click ksqlDB.

    Screenshot of Confluent Cloud showing the ksqlDB Add Application page
  2. Click Add Application, and in the Application name field, enter ksqldb-app1. Click Continue. The ksqlDB app should have a PROVISIONING status.

    Screenshot of Confluent Cloud showing the ksqlDB Add Application wizard

    Note

    It may take a few minutes to provision the ksqlDB cluster. When ksqlDB is ready, its Status changes from Provisioning to Up.

    You must set ACLs to enable accessing the app by using the Confluent Cloud CLI.

  3. Configure the Confluent Cloud ACLs for ksqlDB. For more information about Confluent Cloud ACLs, see Access Control Lists (ACLs) for Confluent Cloud.

    1. In the Confluent Cloud CLI, find the ksqlDB cluster ID.

      ccloud ksql app list
      

      Your output should resemble:

             Id      |   Name    | Topic Prefix |   Kafka   | Storage |                         Endpoint                          | Status
      +--------------+-----------+--------------+-----------+---------+-----------------------------------------------------------+--------+
        lksqlc-lg0g3 | ksql-app1 | pksqlc-emko7 | lkc-emmox |     100 | https://pksqlc-emko7.us-central1.gcp.confluent.cloud:8088 | UP
      
    2. Grant ksqlDB access to the users topic.

      ccloud ksql app configure-acls <ksql-cluster-id> users --cluster <kafka-cluster-id>
      
  4. The new ksqlDB application appears in the All ksqlDB Applications list.

    Screenshot of Confluent Cloud showing the ksqlDB Applications page

Create the pageviews topic

In Confluent Cloud Quick Start, you created the users topic by using the Confluent Cloud CLI. In this step, you create the pageviews topic by using the Confluent Cloud UI.

  1. In the navigation bar, click Topics.

  2. Click Add a topic, and in the New Topic Page, enter “pageviews” for the topic name.

    Screenshot of Confluent Cloud showing the Create Topic page
  3. Click Create with defaults to create the pageviews topic.

  4. In the Confluent Cloud CLI, grant ksqlDB access to the pageviews topic.

    ccloud ksql app configure-acls <ksql-cluster-id> pageviews --cluster <kafka-cluster-id>
    

Produce pageview data to Confluent Cloud

Use a JavaScript file to produce data to the pageviews topic, the same way that you produced data to the users topic in Confluent Cloud Quick Start.

  1. Copy and paste the following code into a file and save as generate-pageviews.js.

    const userIdBase = 'User_'
    const pageIdBase = 'Page_'
    const maxUserId = 100
    const maxPageId = 1000
    const interval = 1000;
    
    function generateUserId() {
        return generateId(userIdBase, maxUserId);
    }
    
    function generatePageId() {
        return generateId(pageIdBase, maxPageId);
    }
    
    function generateId(idBase, maxIdValue) {
        var id = Math.floor(Math.random() * maxIdValue);
        var idString = idBase + id;
        return idString;
    }
    
    function sleep(ms) {
      return new Promise(resolve=>{
          setTimeout(resolve,ms)
      })
    }
    
    const json = {};
    
    async function begin() {
      while(true) {
    
        // Get a user ID.
        userId = generateUserId();
    
        // Concatenate a delimited record of the form <key>:<value>.
        // - The record key is userId, delimited by a ':' character.
        // - The record value is a comma-delimited list of fields.
        //
        //   <userId>:<view-time>,<pageId>
        //
        // Example record:
        //   User_77:1567552403986,User_77,Page_132
    
        record = userId + ":";
        record += new Date().valueOf()  + ",";
        record += generatePageId();
        console.log(record);
    
        await sleep(interval);
      }
    }
    begin();
    
  2. Open a new terminal and run the following command to pipe the output of the generate-pageviews.js file to the pageviews topic in Confluent Cloud. Leave this running in your terminal. Press Ctrl+C to end.

    node generate-pageviews.js | ccloud kafka topic produce pageviews
    

    Your output should resemble:

    Starting Kafka Producer. ^C to exit
    
  3. If you stopped the generate-users.js script previously, start it again in a separate console:

    node generate-users.js | ccloud kafka topic produce users
    

Two producers are running and sending event streams to topics in your Kafka cluster in Confluent Cloud.

Create a ksqlDB stream and ksqlDB table

To write streaming queries against the pageviews and users topics, register the the topics with ksqlDB as a stream and a table. Use the CREATE STREAM and CREATE TABLE statements in the ksqlDB Editor.

These examples query records from the pageviews and users topics using the following schema.

ER diagram showing a pageviews stream and a users table with a common userid column

Create a stream in the ksqlDB editor

You can create a stream or table by using the CREATE STREAM and CREATE TABLE statements in the ksqlDB Editor, just like how you use them in the ksqlDB CLI.

  1. In the navigation bar, click ksqlDB.

  2. In the ksqlDB applications list, click ksqldb-app1.

  3. Copy the following code into the editor window and click Run.

    CREATE STREAM pageviews_original (userid varchar KEY,viewtime bigint, pageid varchar) WITH
    (kafka_topic='pageviews', value_format='DELIMITED');
    

    Your output should resemble:

    Screenshot of the ksqlDB CREATE STREAM statement in Confluent Cloud
  4. In the editor window, run the SHOW TOPICS statement to inspect the status of the pageviews topic. Click Run to start the query.

    SHOW TOPICS;
    
  5. In the Query Results window, scroll to view the pageviews topic. Your output should resemble:

    {
      "@type": "kafka_topics",
      "statementText": "SHOW TOPICS;",
      "topics": [
        {
          "name": "pageviews",
          "replicaInfo": [
            3,
            3,
            3,
            3,
            3,
            3
          ]
        }
    }
    

    To see the count of consumers and consumer groups, use the SHOW TOPICS EXTENDED command.

  6. In the editor window, use a SELECT query to inspect records in the pageviews stream.

    SELECT * FROM PAGEVIEWS_ORIGINAL EMIT CHANGES;
    

    Your output should resemble:

    Screenshot of a ksqlDB SELECT query in Confluent Cloud
  7. The query continues until you end it explicitly. Click Stop to end the query.

Create a table in Confluent Cloud ksqlDB

Confluent Cloud ksqlDB guides you through the process of registering a topic as a stream or a table.

  1. Navigate to Tables and click Add a table. The Create a ksqlDB Table dialog opens.

    Screenshot of the Create a ksqlDB Table wizard in Confluent Cloud
  2. Click users to fill in the details for the table.

    • In the Encoding dropdown, select DELIMITED.
    • In the Field(s) you’d like to include in your TABLE, add these fields:
      • registertime with type BIGINT
      • userid with type VARCHAR
      • regionid with type VARCHAR
      • gender with type VARCHAR
    • ksqlDB tables have an optional key field. In the Key dropdown, select userid.
    Screenshot of the Create a ksqlDB Table wizard in Confluent Cloud
  3. Click Save TABLE to create a ksqlDB table on the the users topic. The ksqlDB Editor opens with an example query on the USERS table.

  4. Click Tables, and in the table list, click USERS to open the details page.

    Screenshot of the ksqlDB Table summary page in Confluent Cloud
  5. Click Editor to open the query editor, and click Run to run the example query.

    Screenshot of a ksqlDB SELECT query in Confluent Cloud

    The query results pane displays query status information, like Messages/sec, and it shows the fields that the query returns.

  6. The query continues until you end it explicitly. Click Stop to end the query.

Write persistent queries

With the pageviews topic registered as a stream, and the users topic registered as a table, you can write a streaming join query that runs until you end it with the TERMINATE statement.

  1. Copy the following code into the editing window and click Run.

    CREATE STREAM pageviews_enriched AS
    SELECT users.userid AS userid, pageid, regionid, gender
    FROM pageviews_original
    LEFT JOIN users
      ON pageviews_original.userid = users.userid
    EMIT CHANGES;
    

    Your output should resemble:

    Screenshot of the ksqlDB CREATE STREAM AS SELECT statement in Confluent Cloud
  2. To inspect your persistent queries, navigate to the Running Queries page, which shows details about the pageviews_enriched stream that you created in the previous query.

    Screenshot of the ksqlDB Running Queries page in Confluent Cloud
  3. Click Explain to see the schema and query properties for the persistent query.

    Screenshot of the ksqlDB Explain Query dialog in Confluent Cloud

Flow View

To visualize data flow in your ksqlDB application, click the Flow tab to open the Flow View page.

Use Flow View to:

  • View the topology of your ksqlDB application.
  • Inspect the details of streams, tables, and the SQL statements that create them.
  • View events as they flow through your application.
ksqlDB application topology on the Flow View page in Confluent Cloud

Click the CREATE-STREAM node to see the query that you used to create the PAGEVIEWS_ENRICHED stream.

Details of a CREATE STREAM statement in the ksqlDB Flow View in Confluent Cloud

Click the PAGEVIEWS_ENRICHED node to see stream’s events and schema.

Details of a stream in the ksqlDB Flow page in Confluent Cloud

Monitor persistent queries

You can monitor your persistent queries visually by using Confluent Cloud.

In the navigation menu, click Consumers and find the group that corresponds with your pageviews_enriched stream, for example _confluent-ksql-pksqlc-lgwpnquery_CSAS_PAGEVIEWS_ENRICHED_2. This view shows how well your persistent query is keeping up with the incoming data.

Screenshot of the Consumer Lag page in Confluent Cloud

Create an API key for Confluent Cloud ksqlDB through the Confluent Cloud CLI

This functionality is only available after Confluent Cloud CLI v0.198.0

Run the following to update the Confluent Cloud CLI.

ccloud update

Run the following command to create a ksqlDB API key.

ccloud api-key create --resource <ksql-cluster-id>

Record your API key and secret. You won’t be able to view your secret again later.

Run the following command to see a list of all API keys associated with your ksqlDB cluster.

ccloud api-key list --resource <ksql-cluster-id>

Accessing a ksqlDB application in Confluent Cloud with API keys

ksqlDB in Confluent Cloud supports authentication with ksqlDB API keys.

Using ksqlDB CLI
To connect the ksqlDB CLI to a cluster, run the following command with your Confluent Cloud ksqlDB server URL specified.
<path-to-confluent>/bin/ksql -u <ksql-api-key> -p <secret> <ccloud-ksql-server-URL>
Using HTTPS Requests
Specify BASIC in the Authorization header of your request. Additionally, you need to encode your ksqlDB API key and secret, separated by a colon, in base 64 as the credentials.
curl -X "POST" "https://<ccloud-ksql-endpoint>/ksql" \
     -H "Content-Type: application/vnd.ksql.v1+json; charset=utf-8" \
     -H "Authorization: BASIC base64(<ksql-api-key>:<secret>)" \
     -d $'{
  "ksql": "LIST STREAMS;",
  "streamsProperties": {}
}'

See also

For an example that shows fully-managed Confluent Cloud connectors in action with Confluent Cloud ksqlDB, see the Cloud ETL demo. The demo also shows how to use Confluent Cloud CLI to manage your resources in Confluent Cloud.