ksqlDB for Confluent Cloud Quick Start

This quick start gets you up and running with ksqlDB in the Confluent Cloud Console.

In this quick start guide, you perform the following steps to create a simple streaming application that tracks the latest location of simulated riders.

Prerequisites

  • Access to Confluent Cloud.

Step 1: Create a ksqlDB cluster

  1. Log in to Cloud Console.

  2. In the navigation menu, click Environments and click the default tile.

  3. If you don’t have an Apache Kafka® cluster yet, click Add cluster and follow these instructions.

    When you have a Kafka cluster available, click the tile of the cluster where you want ksqlDB to run.

  4. In the cluster navigation menu, click ksqlDB, and in the ksqlDB overview page, click Add cluster.

  5. In the New cluster page, click the My account tile and click Continue.

  6. In the Cluster name textbox, enter a name for your cluster or accept the default cluster name, which resembles “ksqlDB_cluster_0”. Accept the default configuration options.

  7. Click Launch cluster to create your new ksqlDB cluster.

    The ksqlDB overview page opens and shows your cluster with the Provisioning status.

    It may take a few minutes to complete cluster provisioning.

Step 2: Create a stream

When your ksqlDB cluster shows the Up status, you can strat running queries. In this step, you create a data stream. A stream associates a schema with an underlying Kafka topic.

  1. Click the cluster you just created to open the query editor.

  2. Copy the following query into the editor window and click Run query to create a stream named “riderLocations”.

    CREATE STREAM riderLocations (profileId VARCHAR, latitude DOUBLE, longitude DOUBLE)
      WITH (kafka_topic='quickstart-locations', value_format='json', partitions=1);
    

    Your output should resemble:

    {
      "@type": "currentStatus",
      "statementText": "CREATE STREAM RIDERLOCATIONS (PROFILEID STRING, LATITUDE DOUBLE, LONGITUDE DOUBLE) WITH (CLEANUP_POLICY='delete', KAFKA_TOPIC='quickstart-locations', KEY_FORMAT='KAFKA', PARTITIONS=1, VALUE_FORMAT='JSON');",
      "commandId": "stream/`RIDERLOCATIONS`/create",
      "commandStatus": {
        "status": "SUCCESS",
        "message": "Stream created",
        "queryId": null
      },
      "commandSequenceNumber": 2,
      "warnings": []
    }
    

    ksqlDB automatically creates the underlying Kafka topic, which is named “quickstart-locations”.

    Here’s what each parameter in the CREATE STREAM statement does:

    • kafka_topic - Name of the Kafka topic underlying the stream. In this case, it is created automatically, because it doesn’t exist yet, but you can create streams over topics that exist already.

    • value_format - Encoding of the messages stored in the Kafka topic. For JSON encoding, each row is stored as a JSON object whose keys and values are column names and values, for example:

      {"profileId": "c2309eec", "latitude": 37.7877, "longitude": -122.4205}
      
    • partitions - Number of partitions to create for the quickstart-locations topic. This parameter is not needed for topics that exist already.

Step 3: Create materialized views

The goal of this quick start is to keep track of the latest location of the riders by using a materialized view.

  • You create a currentLocation table by running a CREATE TABLE AS SELECT statement over the previously created stream. The table is updated incrementally as new rider location data arrives. The LATEST_BY_OFFSET aggregate function specifies that the application is interested only in the latest location of each rider.
  • To add more interest, the application materializes a derived table, named ridersNearMountainView, that captures the distance of riders from a given location or city.
  1. In the query editor, clear the previous query and run the following statement to create the currentLocation table.

    -- Create the currentLocation table
    CREATE TABLE currentLocation AS
      SELECT profileId,
             LATEST_BY_OFFSET(latitude) AS la,
             LATEST_BY_OFFSET(longitude) AS lo
      FROM riderlocations
      GROUP BY profileId
      EMIT CHANGES;
    

    Your output should resemble:

    {
      "@type": "currentStatus",
      "statementText": "CREATE TABLE CURRENTLOCATION WITH (CLEANUP_POLICY='compact', KAFKA_TOPIC='pksqlc-j3yd58CURRENTLOCATION', PARTITIONS=1, REPLICAS=3, RETENTION_MS=604800000) AS SELECT\n  RIDERLOCATIONS.PROFILEID PROFILEID,\n  LATEST_BY_OFFSET(RIDERLOCATIONS.LATITUDE) LA,\n  LATEST_BY_OFFSET(RIDERLOCATIONS.LONGITUDE) LO\nFROM RIDERLOCATIONS RIDERLOCATIONS\nGROUP BY RIDERLOCATIONS.PROFILEID\nEMIT CHANGES;",
      "commandId": "table/`CURRENTLOCATION`/create",
      "commandStatus": {
        "status": "SUCCESS",
        "message": "Created query with ID CTAS_CURRENTLOCATION_3",
        "queryId": "CTAS_CURRENTLOCATION_3"
      },
      "commandSequenceNumber": 4,
      "warnings": []
    }
    

    The CREATE TABLE AS SELECT statement created a persistent query, named CTAS_CURRENTLOCATION_3, that runs continuously on the ksqlDB server. The query’s name is prepended with “CTAS”, which stands for “CREATE TABLE AS SELECT”.

  2. In the query editor, clear the previous query and run the following statement to create the ridersNearMountainView table.

    -- Create the ridersNearMountainView table
    CREATE TABLE ridersNearMountainView AS
      SELECT ROUND(GEO_DISTANCE(la, lo, 37.4133, -122.1162), -1) AS distanceInMiles,
             COLLECT_LIST(profileId) AS riders,
             COUNT(*) AS count
      FROM currentLocation
     GROUP BY ROUND(GEO_DISTANCE(la, lo, 37.4133, -122.1162), -1);
    

    Your output should resemble:

    {
      "@type": "currentStatus",
      "statementText": "CREATE TABLE RIDERSNEARMOUNTAINVIEW WITH (CLEANUP_POLICY='compact', KAFKA_TOPIC='pksqlc-j3yd58RIDERSNEARMOUNTAINVIEW', PARTITIONS=1, REPLICAS=3, RETENTION_MS=604800000) AS SELECT\n  ROUND(GEO_DISTANCE(CURRENTLOCATION.LA, CURRENTLOCATION.LO, 37.4133, -122.1162), -1) DISTANCEINMILES,\n  COLLECT_LIST(CURRENTLOCATION.PROFILEID) RIDERS,\n  COUNT(*) COUNT\nFROM CURRENTLOCATION CURRENTLOCATION\nGROUP BY ROUND(GEO_DISTANCE(CURRENTLOCATION.LA, CURRENTLOCATION.LO, 37.4133, -122.1162), -1)\nEMIT CHANGES;",
      "commandId": "table/`RIDERSNEARMOUNTAINVIEW`/create",
      "commandStatus": {
        "status": "SUCCESS",
        "message": "Created query with ID CTAS_RIDERSNEARMOUNTAINVIEW_5",
        "queryId": "CTAS_RIDERSNEARMOUNTAINVIEW_5"
      },
      "commandSequenceNumber": 6,
      "warnings": []
    }
    

Step 4: Run a push query over the stream

In this step, you create a push query that filters rows in the riderLocations stream.

This query outputs all rows from the riderLocations stream that have coordinates within 5 miles of Mountain View, California.

This query never returns, until it’s terminated explicitly. It pushes output rows perpetually to the client as events are written to the riderLocations stream.

In the query editor, clear the previous query and run the following statement to create a push query that filters rows in the riderLocations stream.

-- Mountain View lat, long: 37.4133, -122.1162
SELECT * FROM riderLocations
  WHERE GEO_DISTANCE(latitude, longitude, 37.4133, -122.1162) <= 5 EMIT CHANGES;

With this query, no output is displayed.

For now, leave this query running in the current session. In the next step, you write some mock data into the riderLocations stream so that the query produces output.

Step 5: Populate the stream with events

Because the session from the previous step is busy waiting for output from the push query, start another session that you can use to write some data into ksqlDB.

  1. Press the Ctrl key and click Editor to open a new query editor window.

  2. In the new editor session, run copy the following INSERT statements and click Run query.

    INSERT INTO riderLocations (profileId, latitude, longitude) VALUES ('c2309eec', 37.7877, -122.4205);
    INSERT INTO riderLocations (profileId, latitude, longitude) VALUES ('18f4ea86', 37.3903, -122.0643);
    INSERT INTO riderLocations (profileId, latitude, longitude) VALUES ('4ab5cbad', 37.3952, -122.0813);
    INSERT INTO riderLocations (profileId, latitude, longitude) VALUES ('8b6eae59', 37.3944, -122.0813);
    INSERT INTO riderLocations (profileId, latitude, longitude) VALUES ('4a7c7b41', 37.4049, -122.0822);
    INSERT INTO riderLocations (profileId, latitude, longitude) VALUES ('4ddad000', 37.7857, -122.4011);
    

    No output is shown from the INSERT INTO VALUES statements, but the rows have been entered into the riderLocations table.

  3. In the browser, click the tab for the previous editor session. The push query that you started in Step 4 now shows the following output:

    {"PROFILEID":"4a7c7b41","LATITUDE":37.4049,"LONGITUDE":-122.0822}
    {"PROFILEID":"8b6eae59","LATITUDE":37.3944,"LONGITUDE":-122.0813}
    {"PROFILEID":"4ab5cbad","LATITUDE":37.3952,"LONGITUDE":-122.0813}
    

    Only the rows that have riders who are within 5 miles of Mountain View are displayed.

Step 6: Run a pull query against the materialized view

In this step, you run a pull query against the materialized view to retrieve all of the riders who are currently within 10 miles of Mountain View.

In contrast to the previous push query, which runs continuously, the pull query follows a traditional request-response model and retrieves the latest result from the materialized view.

In the second editor session, clear the previous query and run the following statement.

SELECT * from ridersNearMountainView WHERE distanceInMiles <= 10;

Your output should resemble:

{"DISTANCEINMILES":"10.0","RIDERS":["18f4ea86"],"COUNT":1}
{"DISTANCEINMILES":"0.0","RIDERS":["4ab5cbad","8b6eae59","4a7c7b41"],"COUNT":3}

Step 7: Clean up

When you’re done with the quick start, close the second browser tab.

  1. Click the browser tab that has the first editor session.

  2. Click Stop query to end the push query.

  3. Click Persistent queries to view the two queries that you created in Step 3.

  4. For both queries, click Terminate and type their names in the delete dialog to confirm deletion.

  5. Click Tables to view the tables that were created by the queries in Step 3.

  6. Click the RIDERSNEARMOUNTAINVIEW table to open the details view.

  7. Click DROP TABLE and type its name in the delete dialog to confirm deletion.

    The corresponding Kafka topic is also deleted.

  8. Repeat the deletion steps for the CURRENTLOCATION table.