Confluent Cloud ksqlDB によるプルクエリのプレビュー

プルクエリ は、ksqlDB が提供する機能としては比較的新しいものの、不可欠な機能です。クライアントに対してクエリ結果を継続的かつインクリメンタルにストリーミングするプッシュクエリとは対照的に、プルクエリは従来のリクエスト/レスポンスモデルに従っています。つまりプルクエリは、ksqlDB サーバーから有限の結果を取得して完了時に終了します。これは、従来のデータベースにおけるクエリと同様に動作するということです。

プルクエリは、すべての Confluent Cloud ksqlDB ユーザーが プレビュー機能 として利用できます。機能がプレビューである間は、本稼働環境のワークロードに使用することはお勧めしません。このドキュメントの残りの部分では、Confluent Cloud でのプルクエリの使用の詳細を説明し、プレビュー状態でのプルクエリに関する制限などについて説明します。

制限

Confluent Cloud ksqlDB でのプレビューのプルクエリには、以下の制限があります。

  • スループット : プルクエリのリクエストスループットには、CSU あたり 1 秒間に約 41 個のクエリというレートの制限があります。たとえば、4 CSU の ksqlDB アプリケーションは、1 秒当たり約 167 のプルクエリを処理することができます。このレート制限は、プルクエリが一般提供可能になると変更される可能性があります。
  • 整合性 : プルクエリの結果は、最終整合性のある整合性モデルを使用しています。つまり、プルクエリリクエストの前にクエリ先のテーブルへの書き込みが発生していても、これらの書き込みがプルクエリの結果にまだ表示されないことがあります。実際にはこの「不整合性」のウィンドウは非常に狭いものですが、アプリケーションではこれを考慮する必要があります。
  • パフォーマンスへの影響 : プルクエリは ksqlDB アプリケーションにより処理されるため、同じアプリケーションにより実行されているプッシュクエリのスループットを低下させる可能性があります。パフォーマンスに影響を及ぼす可能性は基盤となるワークロードに強く依存しますが、一般的には、現実的なベンチマークを実行し、プルクエリのワークロードが ksqlDB の特定ワークロードのパフォーマンスにどのように影響するかを把握するようお勧めします。
  • SLA: プルクエリは、プレビューの間は Confluent Cloud ksqlDB SLA の適用対象外となります。

料金

プルクエリは、プレビューの間は無料です。プルクエリが一般提供される場合は、該当する消費量ベースの費用がかかります。これは、プルクエリが帯域幅を消費するためです。

CLI の構成

プルクエリは Confluent Cloud ksqlDB のウェブインターフェイスから直接発行することができ、追加の構成は不要です。

CLI セッションから、または HTTP リクエスト経由でプルクエリを使用するには、ksqlDB API キーを作成してクライアントがアプリケーションに接続できるようにする必要があります。

注釈

ksqlDB API キーは Kafka API キーとは異なり、ksqlDB アプリケーションごとに作成する必要があります。

  1. ccloud CLI を使用して Confluent Cloud にログインします。

    ccloud login
    
    Copy
  2. 以下のコマンドを使用して ksqlDB アプリケーションのリストを作成します。

    ccloud ksql app list
    
    Copy

    出力は以下のようになります。

           Id      |    Name     | Topic Prefix |   Kafka   | Storage |                       Endpoint                        | Status
    +--------------+-------------+--------------+-----------+---------+-------------------------------------------------------+--------+
      lksqlc-ok1yo | ksqldb-app1 | pksqlc-e8816 | lkc-2ry82 |     500 | https://pksqlc-e8816.us-west2.gcp.confluent.cloud:443 | UP
    
    Copy

    なお IdKafka および Endpoint の値は、この例では lksqlc-ok1yolkc-2ry82 および https://pksqlc-e8816.us-west2.gcp.confluent.cloud:443 です。

    • アプリケーション ID は、アプリケーション固有の API キーおよびシークレットの作成に使用します。これらは、クライアントがアプリケーションに接続する際に使用します。
    • Kafka は Kafka クラスターの ID を指定します。
    • アプリケーションの Endpoint は、アプリケーションへの接続に使用します。
  3. 以下のコマンドを実行して、CLI セッションが使用する Kafka クラスターを指定します。

    ccloud kafka cluster use <kafka-cluster-id>
    
    Copy
  4. セキュリティの構成に使用する Confluent Cloud サービスアカウント ID を取得します。これは、以下のコマンドの出力の Id 列で確認できます。

    ccloud service-account list
    
    Copy
  5. サービスアカウント ID を取得したら、以下のコマンドを実行してトピックに必要なアクセス許可を付与します。

    ccloud kafka acl create --allow --service-account <service-account-id> --operation READ --operation WRITE --operation CREATE --topic 'pq_' --prefix
    
    Copy

    --topic 'pq_' --prefix オプションは、プレフィックスが pq_ の名前を持つすべてのトピックに ACL を適用します。

  6. 以下のコマンドを実行してアプリケーション ID を --resource として渡すことにより、アプリケーション固有の API キーおよびシークレットを作成します。

    ccloud api-key create --resource <ksqldb-application-id>
    
    Copy
  7. 前述のコマンドにより返されたキーとシークレットを使用し、ksqlDB CLI を使用してアプリケーションに接続します。キーをユーザー名として渡し、シークレットをパスワードとして渡します。

    $CONFLUENT_HOME/bin/ksql -u <api-key> -p <api-key-secret> <endpoint>
    
    Copy

ワークロードの例

このセクションでは Confluent Cloud でプルクエリを試用できるように、サンプルのワークロードを構築します。

  1. ksqlDB CLI で以下のステートメントを実行して入力ストリームを作成します。

    CREATE STREAM pq_pageviews (user_id INTEGER KEY, url STRING, status INTEGER) WITH
     (kafka_topic='pq_pageviews', value_format='json', partitions=1);
    
    Copy
  2. この入力ストリームのマテリアライズドビューを作成します。マテリアライズドビューは pageviews ストリームからイベントを集約し、それらのイベントを url フィールドによりグループ化します。

    CREATE TABLE pq_pageviews_metrics AS
     SELECT url, COUNT(*) AS num_views
      FROM pq_pageviews
      GROUP BY url
      EMIT CHANGES;
    
    Copy
  3. pageviews 入力ストリームへイベントを書き込むことにより、マテリアライズドビューに入力します。

    INSERT INTO pq_pageviews (user_id, url, status) VALUES (0, 'https://confluent.io', 200);
    INSERT INTO pq_pageviews (user_id, url, status) VALUES (1, 'https://confluent.io/blog', 200);
    
    Copy
  4. これでマテリアライズドビューにはなんらかのデータが格納されているので、これに対してプルクエリを発行し、指定した URL について最新のカウントを取得します。

    SELECT * FROM pq_pageviews_metrics WHERE url = 'https://confluent.io';
    
    Copy

    出力は以下のようになります。

    +-------------------------------------------------+-------------------------------------------------+
    |URL                                              |NUM_VIEWS                                        |
    +-------------------------------------------------+-------------------------------------------------+
    |https://confluent.io                             |1                                                |
    
    Copy

    このプルクエリは正確に 1 つの行のみを返します。プルクエリを実行するたびに、ターゲットである列について最新の値が返されます。