Create streaming queries in Confluent Cloud ksqlDB

Easily build stream processing applications with a simple and lightweight SQL syntax. Continuously transform, enrich, join together, and aggregate your Kafka events without writing any complex application code. As a fully managed service with a 99.9% uptime SLA, Confluent Cloud ksqlDB eliminates the operational overhead of running and operating infrastructure, empowering you to focus on application development.

This quick start gets you up and running with Confluent Cloud ksqlDB. It shows how to create streams and tables, and how to write streaming queries on cloud-hosted ksqlDB.

Important

In Quick Start for Apache Kafka using Confluent Cloud, you installed a Datagen connector to produce data to the users topic in your Confluent Cloud cluster.

In this quick start, you perform the following steps.

  1. Create a ksqlDB application in Confluent Cloud.
  2. Install the Confluent Cloud CLI.
  3. Enable ksqlDB access in Confluent Cloud.
  4. Create the pageviews topic.
  5. Produce pageview data to Confluent Cloud.
  6. Create a stream in the ksqlDB editor.
  7. Create a table in the ksqlDB editor.
  8. Write a persistent query.
  9. Monitor persistent queries.

Note

This quick start uses the Confluent Cloud UI to create a ksqlDB application. For an introduction that uses the ksqlDB CLI exclusively, see ksqlDB Quickstart for Confluent Cloud.

Create a ksqlDB application in Confluent Cloud

To write queries against streams and tables, create a new ksqlDB application in Confluent Cloud.

  1. In the the navigation menu, click ksqlDB.

    Screenshot of Confluent Cloud showing the ksqlDB Add Application page
  2. Click Add Application, and in the Application name field, enter ksqldb-app1. Click Continue. The ksqlDB app should have a PROVISIONING status.

    Screenshot of Confluent Cloud showing the ksqlDB Add Application wizard

    Note

    It may take a few minutes to provision the ksqlDB cluster. When ksqlDB is ready, its Status changes from Provisioning to Up.

  3. The new ksqlDB application appears in the Applications list.

    Screenshot of Confluent Cloud showing the ksqlDB Applications page

Install the Confluent Cloud CLI

After you have a working Kafka cluster in Confluent Cloud, you can use the Confluent Cloud CLI to interact with your cluster from your local computer. For example you can produce and consume to your topic using the Confluent Cloud CLI.

Scripted installation

Run this command to install the Confluent Cloud CLI. This command creates a bin directory in your designated location (<path-to-directory>/bin). On Windows, an appropriate Linux environment may need to be installed in order to have the curl and sh commands available, such as the Windows Subsystem for Linux.

Important

The CLI installation location must be in your PATH (e.g. /usr/local/bin).

curl -L --http1.1 https://cnfl.io/ccloud-cli | sh -s -- -b /<path-to-directory>/bin

Tarball installation

Download and install the raw binaries by platform.

Tip

For examples of using multiple languages to connect Kafka client applications to Confluent Cloud, see the Code Examples.

In the following steps, you log in to your Kafka cluster and connect to your cluster with the API key you created in Step 3: Create a Sample Producer.

For more information about Confluent Cloud CLI commands, see Confluent Cloud CLI Command Reference.

  1. Log in to your Confluent Cloud cluster.

    ccloud login
    

    Your output should resemble:

    Enter your Confluent credentials:
    Email: jdoe@myemail.io
    Password:
    
    Logged in as jdoe@myemail.io
    Using environment t118 ("default")
    
  2. View your cluster.

    ccloud kafka cluster list
    

    Your output should resemble:

          Id      |       Name        |     Type     | Provider |  Region  | Availability | Status
    +-------------+-------------------+--------------+----------+----------+--------------+--------+
        lkc-emmox | ccloud-quickstart | BASIC_LEGACY | gcp      | us-west2 | HIGH         | UP
    
  3. Set the active Kafka cluster. In this example, the cluster ID is lkc-emmox.

    ccloud kafka cluster use lkc-emmox
    

    Tip

    The lkc prefix in the cluster ID is an acronym for “logical Kafka cluster”.

  4. Add the API secret with ccloud api-key store <key> <secret>. When you create an API key with the CLI, it is automatically stored locally. However, when you create an API key using the UI, API, or with the CLI on another machine, the secret is not available for CLI use until you store it. This is required because secrets cannot be retrieved after creation.

    From the Confluent Cloud CLI, type the following command:

    ccloud api-key store <api-key> <secret> --resource <cluster-id>
    

    For an API key/secret pair on the example cluster with ID lkc-emmox, the command might resemble the following:

    ccloud api-key store LD35EN2ZJTCTRQRM 67JImN+9vk+Hj3eak2/UcwUlbDNlGGC3KAIOy5JNRVSnweumPBVpW31JWZSBeawz --resource lkc-emmox
    
  5. Set the API key to use for Confluent Cloud CLI commands with the command ccloud api-key use <key>.

    ccloud api-key use --resource <resource-id> <api-key>
    

    For an API key/secret pair on the previously shown cluster with ID lkc-emmox, the command might resemble the following:

    ccloud api-key use --resource lkc-emmox LD35EN2ZJTCTRQRM
    

Enable ksqlDB access in Confluent Cloud

You must set ACLs to enable accessing the app by using the Confluent Cloud CLI.

  1. Configure the Confluent Cloud ACLs for ksqlDB. For more information about Confluent Cloud ACLs, see Access Control Lists (ACLs) for Confluent Cloud.

    1. In the Confluent Cloud CLI, find the ksqlDB cluster ID.

      ccloud ksql app list
      

      Your output should resemble:

             Id      |   Name      | Topic Prefix |   Kafka   | Storage |                         Endpoint                          | Status
      +--------------+-------------+--------------+-----------+---------+-----------------------------------------------------------+--------+
        lksqlc-lg0g3 | ksqldb-app1 | pksqlc-4v5nn | lkc-emmox |     500 | https://pksqlc-emko7.us-central1.gcp.confluent.cloud:8088 | UP
      

      Tip

      The lksqlc and pksqlc prefixes are acronyms for “logical ksqlDB cluster” and “physical ksqlDB cluster”.

    2. Grant ksqlDB access to the users topic.

      ccloud ksql app configure-acls <ksqldb-cluster-id> users --cluster <kafka-cluster-id>
      

      For the example clusters shown previously, the following command gives ksqlDB access to the users topic.

      ccloud ksql app configure-acls lksqlc-lg0g3 users --cluster lkc-emmox
      

Create the pageviews topic

In Quick Start for Apache Kafka using Confluent Cloud, you created the users topic by using the Confluent Cloud UI. In this step, you create the pageviews topic by using the Confluent Cloud CLI.

  1. Run the following command to create the pageviews topic in your Confluent Cloud cluster.

    ccloud kafka topic create pageviews
    

    Your output should resemble:

    Created topic "pageviews".
    
  2. Run the following command to list the topics in your cluster.

    ccloud kafka topic list
    

    Your output should resemble:

                                      Name
    +------------------------------------------------------------------------------+
      _confluent-ksql-pksqlc-4v5nn_command_topic
      pageviews
      pksqlc-4v5nn-processing-log
      users
    
  3. In the Confluent Cloud CLI, grant ksqlDB access to the pageviews topic.

    ccloud ksql app configure-acls <ksqldb-cluster-id> pageviews --cluster <kafka-cluster-id>
    

For more information about Confluent Cloud CLI commands, see Confluent Cloud CLI Command Reference.

Produce pageview data to Confluent Cloud

In this step, you create a Datagen connector for the pageviews topic, using the same procedure that you used to create DatagenSourceConnector_users.

  1. Return to the Confluent Cloud UI. In the navigation bar, click Connectors, and click Add connector.

  2. Select the Datagen Source Connector tile and fill in the form with the following values.

    Field Value
    Name enter “DatagenSourceConnector_pageviews”
    Which topic do you want to send data to? select pageviews
    Output Messages select JSON
    Quickstart select PAGEVIEWS
    Max interval between messages enter “100” for 0.1-second interval
    Number of tasks for this connector enter “1”

    When the form is filled in, it should resemble the following.

    Screenshot of Confluent Cloud showing the Add Datagen Source Connector page
  3. At the bottom of the form, click Continue to review the details for your connector, and click Launch to start it.

    Screenshot of Confluent Cloud showing two running Datagen Source Connectors

    When the status of the new connector changes from Provisioning to Running, you have two producers sending event streams to topics in your Confluent Cloud cluster.

Create a stream and a table

To write streaming queries against the pageviews and users topics, register the the topics with ksqlDB as a stream and a table. You can use the CREATE STREAM and CREATE TABLE statements in the ksqlDB Editor.

These examples query records from the pageviews and users topics using the following schema.

ER diagram showing a pageviews stream and a users table with a common userid column

Create a stream in the ksqlDB editor

You can create a stream or table by using the CREATE STREAM and CREATE TABLE statements in the ksqlDB Editor, similar to how you use them in the ksqlDB CLI.

  1. In the navigation bar, click ksqlDB.

  2. In the ksqlDB applications list, click ksqldb-app1.

  3. Copy the following code into the editor window and click Run.

    CREATE STREAM pageviews_original (viewtime bigint, userid varchar, pageid varchar) WITH
    (kafka_topic='pageviews', value_format='JSON');
    

    Your output should resemble:

    Screenshot of the ksqlDB CREATE STREAM statement in Confluent Cloud
  4. In the editor window, use a SELECT query to inspect records in the pageviews stream.

    SELECT * FROM PAGEVIEWS_ORIGINAL EMIT CHANGES;
    

    Your output should resemble:

    Screenshot of a ksqlDB SELECT query in Confluent Cloud
  5. The query continues until you end it explicitly. Click Stop to end the query.

Create a table in the ksqlDB editor

Use the CREATE TABLE statement to register a table on a topic.

  1. Copy the following code into the editor window and click Run.

    CREATE TABLE users (userid VARCHAR PRIMARY KEY, registertime BIGINT, gender VARCHAR, regionid VARCHAR) WITH
    (KAFKA_TOPIC='users', VALUE_FORMAT='JSON');
    

    Your output should resemble:

    Screenshot of the ksqlDB CREATE TABLE statement in Confluent Cloud
  2. In the editor window, use a SELECT query to inspect records in the users table.

    SELECT * FROM users EMIT CHANGES;
    

    Your output should resemble:

    Screenshot of a ksqlDB SELECT query on a table in Confluent Cloud
  3. The query continues until you end it explicitly. Click Stop to end the query.

  4. Click Tables, and in the list, click USERS to open the details page.

    Screenshot of the ksqlDB Table summary page in Confluent Cloud

Write a persistent query

With the pageviews topic registered as a stream, and the users topic registered as a table, you can write a streaming join query that runs until you end it with the TERMINATE statement.

  1. Copy the following code into the editor and click Run.

    CREATE STREAM pageviews_enriched AS
    SELECT users.userid AS userid, pageid, regionid, gender
    FROM pageviews_original
    LEFT JOIN users
      ON pageviews_original.userid = users.userid
    EMIT CHANGES;
    

    Your output should resemble:

    Screenshot of the ksqlDB CREATE STREAM AS SELECT statement in Confluent Cloud
  2. To inspect your persistent queries, navigate to the Running Queries page, which shows details about the pageviews_enriched stream that you created in the previous query.

    Screenshot of the ksqlDB Running Queries page in Confluent Cloud
  3. Click Explain to see the schema and query properties for the persistent query.

    Screenshot of the ksqlDB Explain Query dialog in Confluent Cloud

Use Flow View to inspect data

To visualize data flow in your ksqlDB application, click the Flow tab to open the Flow View page.

Use Flow View to:

  • View the topology of your ksqlDB application.
  • Inspect the details of streams, tables, and the SQL statements that create them.
  • View events as they flow through your application.
ksqlDB application topology on the Flow View page in Confluent Cloud

Click the CREATE-STREAM node to see the query that you used to create the PAGEVIEWS_ENRICHED stream.

Details of a CREATE STREAM statement in the ksqlDB Flow View in Confluent Cloud

Click the PAGEVIEWS_ENRICHED node to see stream’s events and schema.

Details of a stream in the ksqlDB Flow page in Confluent Cloud

Monitor persistent queries

You can monitor your persistent queries visually by using Confluent Cloud.

In the navigation menu, click Consumers and find the group that corresponds with your pageviews_enriched stream, for example _confluent-ksql-pksqlc-lgwpnquery_CSAS_PAGEVIEWS_ENRICHED_2. This view shows how well your persistent query is keeping up with the incoming data.

Screenshot of the Consumer Lag page in Confluent Cloud

Create an API key for Confluent Cloud ksqlDB through the Confluent Cloud CLI

Starting with Confluent Cloud CLI v0.198.0, you can create API keys by using the Confluent Cloud CLI. For more information, see Create Resource-Specific API Keys using the CLI.

Important

The API key and secret that you create in this step are distinct from the key pair that you created when you installed the Confluent Cloud CLI. This key pair is for the ksqlDB cluster specifically and can only be created by using the ccloud api-key create --resource <ksqldb-cluster-id> command.

Run the following command to see a list of all API keys associated with your ksqlDB cluster.

ccloud api-key list --resource <ksqldb-cluster-id>

If the output is empty, follow the steps in Create Resource-Specific API Keys using the CLI.

Access a ksqlDB application in Confluent Cloud with an API key

ksqlDB in Confluent Cloud supports authentication with ksqlDB API keys.

Using ksqlDB CLI
To connect the ksqlDB CLI to a cluster, run the following command with your Confluent Cloud ksqlDB server URL specified.
<path-to-confluent>/bin/ksql -u <ksql-api-key> -p <secret> <ccloud-ksql-server-URL>
Using HTTPS Requests
Specify --basic in the Accept header of your request. Additionally, you need to send your ksqlDB API key and secret, separated by a colon, as the --user credentials.
  1. Run the ccloud ksql app list command to get the URL of the ksqlDB endpoint.

    ccloud ksql app list
    
           Id      |   Name      | Topic Prefix |   Kafka   | Storage |                         Endpoint                         | Status
    +--------------+-------------+--------------+-----------+---------+----------------------------------------------------------+--------+
      lksqlc-lg0g3 | ksqldb-app1 | pksqlc-4v5nn | lkc-emmox |     500 | https://pksqlc-emko7.us-central1.gcp.confluent.cloud:443 | UP
    
  2. Run the curl command to send a POST request to the ksql endpoint:

    curl -X "POST" "https://<cloud-ksqldb-endpoint>:443/ksql" \
         -H "Accept: application/vnd.ksql.v1+json" \
         --basic --user "<ksqldb-api-key>:<secret>" \
         -d $'{
      "ksql": "LIST STREAMS;",
      "streamsProperties": {}
    }'
    

    An example command for the previous ksqlDB cluster might resemble:

    curl -X "POST" "https://pksqlc-4v5nn.us-west2.gcp.confluent.cloud:443/ksql" \
         -H "Accept: application/vnd.ksql.v1+json" \
         --basic --user "7DOQ6QCUWCE6M0HD:25UCfE+p25wPXXHMJcIBg3dANJvr76LG4VZ5Hold+X2dVD5drvm8CdDaVdHia8d7" \
         -d $'{
      "ksql": "LIST STREAMS;",
      "streamsProperties": {}
    }'
    

    Your output should resemble:

    [
      {
        "@type": "streams",
        "statementText": "LIST STREAMS;",
        "streams": [
          {
            "type": "STREAM",
            "name": "KSQL_PROCESSING_LOG",
            "topic": "pksqlc-4v5nn-processing-log",
            "format": "JSON"
          },
          {
            "type": "STREAM",
            "name": "PAGEVIEWS_ENRICHED",
            "topic": "pksqlc-4v5nnPAGEVIEWS_ENRICHED",
            "format": "JSON"
          },
          {
            "type": "STREAM",
            "name": "PAGEVIEWS_ORIGINAL",
            "topic": "pageviews",
            "format": "JSON"
          }
        ],
        "warnings": []
      }
    ]
    
  3. Run the following command to inspect the streaming data in the PAGEVIEWS_ENRICHED stream. Press Ctrl+C to stop the query.

    curl -X "POST" "https://<cloud-ksqldb-endpoint>:443/query-stream" \
         --basic --user "<ksqldb-api-key>:<secret>" \
          -d $'{
       "sql": "SELECT * FROM PAGEVIEWS_ENRICHED EMIT CHANGES;",
       "streamsProperties": {}
     }'
    

    Your output should resemble:

    {"queryId":"d14df2c1-5995-4855-b300-f7710464c81c","columnNames":["USERID","PAGEID","REGIONID","GENDER"],"columnTypes":["STRING","STRING","STRING","STRING"]}
    ["User_7","Page_85","Region_8","FEMALE"]
    ["User_9","Page_47","Region_7","MALE"]
    ["User_4","Page_98","Region_7","OTHER"]
    ^C
    

See also

For an example that shows fully-managed Confluent Cloud connectors in action with Confluent Cloud ksqlDB, see the Cloud ETL Demo. This example also shows how to use Confluent Cloud CLI to manage your resources in Confluent Cloud.

../_images/topology1.png