Pull queries with Confluent Cloud ksqlDB

Pull queries are a relatively new but integral feature offered by ksqlDB. In contrast to push queries, which perpetually stream incremental query results to clients, pull queries follow a traditional request/response model, which means that a pull query retrieves a finite result from the ksqlDB server and terminate on completion, similar to the way queries work with traditional databases.

Pull queries are available to all Confluent Cloud ksqlDB users. The remainder of this document provides details about using pull queries in Confluent Cloud and describes the limitations of pull queries.

Limitations

Pull queries in Confluent Cloud ksqlDB have the following limitations:

  • Consistency: Pull query results use an eventually consistent consistency model, which means that some writes to the table being queried may not yet appear in a pull query’s result, even if these writes occurred before the pull query request. In practice, this window of “inconsistency” is very narrow, but you should account for this in your applications.
  • Performance impact: Because pull queries are processed by the ksqlDB application, pull queries do have the potential to reduce the throughput of push queries that are being run by the same application. Any potential performance impact depends strongly on the underlying workload, but in general we recommend that you perform realistic benchmarks to understand how a pull query workload will affect the performance of your specific ksqlDB workload.
  • Bandwidth: Pull queries can consume up to 48MB per hour, per CSU of bandwidth. Requests that exceed this limit are throttled. For example, a 4-CSU ksqlDB application can process approximately 190MB of pull query responses per hour. If you’ve reached the 190MB of throttle limit within a sliding window over the last 1 hour, ksqlDB won’t process any new query requests until the bandwidth used by pull queries in the last hour has dropped below 190MB.

CLI configuration

You can issue pull queries directly from the Confluent Cloud ksqlDB web interface, with no additional configuration.

If you want to use pull queries from a CLI session, or via HTTP requests, you must create a ksqlDB API key to enable your client to connect with your application.

Note

ksqlDB API keys are different from Kafka API keys and must be created for your specific ksqlDB application.

  1. Log in to Confluent Cloud by using the Confluent CLI:

    confluent login
    
  2. Use the following command to list your ksqlDB applications:

    confluent ksql app list
    

    Your output should resemble:

           Id      |    Name     | Topic Prefix |   Kafka   | Storage |                       Endpoint                        | Status
    +--------------+-------------+--------------+-----------+---------+-------------------------------------------------------+--------+
      lksqlc-ok1yo | ksqldb-app1 | pksqlc-e8816 | lkc-2ry82 |     500 | https://pksqlc-e8816.us-west2.gcp.confluent.cloud:443 | UP
    

    Note the Id, Kafka, and Endpoint values, which in this example are lksqlc-ok1yo, lkc-2ry82, and https://pksqlc-e8816.us-west2.gcp.confluent.cloud:443.

    • The application ID is used to create an application-specific API key and secret that clients use to connect to the application.
    • Kafka specifies the ID of your Kafka cluster.
    • The application Endpoint is used to connect to your application.
  3. Run the following command to specify the Kafka cluster that your CLI session should use:

    confluent kafka cluster use <kafka-cluster-id>
    
  4. Obtain your Confluent Cloud service account ID to use for security configuration. You can find it in the Id column of the following command’s output:

    confluent iam service-account list
    
  5. After you get your service account ID, run the following command to grant the required permissions for the topic:

    confluent kafka acl create --allow --service-account <service-account-id> --operation READ --operation WRITE --operation CREATE --topic 'pq_' --prefix
    

    The --topic 'pq_' --prefix option applies the ACLs to all topics that have a name that is prefixed with pq_.

  6. Run the following command to create an application-specific API key and secret by passing the application ID as the --resource.

    confluent api-key create --resource <ksqldb-application-id>
    
  7. Use the key and secret returned by the previous command to connect to your application by using the ksqlDB CLI. Pass the key as the username, and the secret as the password.

    $CONFLUENT_HOME/bin/ksql -u <api-key> -p <api-key-secret> <endpoint>
    

Example workload

In this section, you build an example workload that enables experimenting with pull queries in Confluent Cloud.

  1. In the ksqlDB CLI, run the following statement to create an input stream.

    CREATE STREAM pq_pageviews (user_id INTEGER KEY, url STRING, status INTEGER) WITH
     (kafka_topic='pq_pageviews', value_format='json', partitions=1);
    
  2. Create a materialized view over this input stream. The materialized view aggregates events from the pageviews stream, grouped on the events’ url field:

    CREATE TABLE pq_pageviews_metrics AS
     SELECT url, COUNT(*) AS num_views
      FROM pq_pageviews
      GROUP BY url
      EMIT CHANGES;
    
  3. Populate the materialized view by writing events to the pageviews input stream.

    INSERT INTO pq_pageviews (user_id, url, status) VALUES (0, 'https://confluent.io', 200);
    INSERT INTO pq_pageviews (user_id, url, status) VALUES (1, 'https://confluent.io/blog', 200);
    
  4. Now that the materialized view contains some data, issue a pull query against it to retrieve the latest count for a given URL.

    SELECT * FROM pq_pageviews_metrics WHERE url = 'https://confluent.io';
    

    Your output should resemble:

    +-------------------------------------------------+-------------------------------------------------+
    |URL                                              |NUM_VIEWS                                        |
    +-------------------------------------------------+-------------------------------------------------+
    |https://confluent.io                             |1                                                |
    

    This pull query should return precisely one row. Each time the pull query runs, it will return the latest value for the targeted row.