ksqlDB を使用した Apache Kafka® に対するストリーミングクエリの作成(ローカル)

このチュートリアルでは、ksqlDB を使用して Kafka のメッセージに対するストリーミングクエリを作成する、単純なワークフローについて紹介します。

開始するには、ZooKeeper および Kafka ブローカーを含む Kafka クラスターを起動する必要があります。その後 ksqlDB が、この Kafka クラスターからのメッセージに対してクエリを行います。ksqlDB は、デフォルトで Confluent Platform にインストールされています。

前提条件 :

  • Confluent Platform がインストールおよび実行されている。このインストールには、Kafka ブローカー、ksqlDB、Control Center、ZooKeeper、Schema Registry、REST Proxy および Connect が含まれています。
  • TAR または ZIP を使用して Confluent Platform をインストールした場合は、インストールディレクトリまで移動します。このチュートリアル全体で使用するパスおよびコマンドは、カレントディレクトリがこのインストールディレクトリであると想定しています。
  • Confluent Platform のローカルインストールを開始する際には、Confluent CLI の インストール を検討します。
  • Java: バージョン 1.8 以降。Oracle Java JRE または JDK 1.8 以降をローカルマシンにインストールする必要があります。

トピックの作成とデータの生成

Kafka のトピック pageviews および users を作成し、データを生成します。これらの手順では、Confluent Platform の ksqlDB datagen ツールを使用します。

  1. 新しいターミナルウィンドウを開いて以下のコマンドを実行し、データジェネレーターを使用して pageviews トピックを作成しデータを生成します。以下の例は、DELIMITED フォーマットのデータを継続的に生成します。

    $CONFLUENT_HOME/bin/ksql-datagen quickstart=pageviews format=delimited topic=pageviews msgRate=5
    
  2. 別のターミナルウィンドウを開き、以下のコマンドを実行して、データジェネレーターを使用して users トピックの Kafka データを生成します。以下の例は、DELIMITED フォーマットのデータを継続的に生成します。

    $CONFLUENT_HOME/bin/ksql-datagen quickstart=users format=delimited topic=users msgRate=1
    

ちなみに

Confluent Platform にある kafka-console-producer CLI を使用して Kafka データを生成することもできます。

ksqlDB CLI の起動

新しいターミナルウィンドウを開いて以下のコマンドを実行し、LOG_DIR 環境変数を設定して ksqlDB CLI を起動します。

LOG_DIR=./ksql_logs $CONFLUENT_HOME/bin/ksql

このコマンドは、CLI のログを ./ksql_logs ディレクトリへルーティングします。ディレクトリのパスはカレントディレクトリからの相対パスです。デフォルトでは http://localhost:8088 で実行されている ksqlDB サーバーを CLI が検索します。

重要

デフォルトで、ksqlDB は ksql 実行可能ファイルの場所に対応する logs と呼ばれるディレクトリにそのログを保管しようとします。たとえば、 ksql/usr/local/bin/ksql にインストールされると、/usr/local/logs にそのログを保管しようとします。ksql を Confluent Platform のデフォルトのロケーションである $CONFLUENT_HOME/bin から実行している場合は、LOG_DIR 変数を使用して、このデフォルトの動作をオーバーライドする必要があります。

ksqlDB 起動後のターミナルはこのようになります。

                  ===========================================
                  =       _              _ ____  ____       =
                  =      | | _____  __ _| |  _ \| __ )      =
                  =      | |/ / __|/ _` | | | | |  _ \      =
                  =      |   <\__ \ (_| | | |_| | |_) |     =
                  =      |_|\_\___/\__, |_|____/|____/      =
                  =                   |_|                   =
                  =  Event Streaming Database purpose-built =
                  =        for stream processing apps       =
                  ===========================================

Copyright 2017-2020 Confluent Inc.

CLI v6.0.6, Server v6.0.6 located at http://localhost:8088

Having trouble? Type 'help' (case-insensitive) for a rundown of how things work!

ksql>

SHOW および PRINT ステートメントを使用した Kafka トピックの調査

ksqlDB により、 Kafka のトピックとメッセージをリアルタイムで調査できます。

  • SHOW TOPICS ステートメントを使用して、Kafka クラスター上で使用可能なトピックをリストします。
  • PRINT ステートメントを使用して、トピックに到着したメッセージをそのつど表示します。

ksqlDB CLI で、以下のステートメントを実行します。

SHOW TOPICS;

出力は以下のようになります。

 Kafka Topic                 | Partitions | Partition Replicas
--------------------------------------------------------------
 default_ksql_processing_log | 1          | 1
 pageviews                   | 1          | 1
 users                       | 1          | 1
--------------------------------------------------------------

デフォルトでは、ksqlDB の内部トピックおよびシステムトピックは非表示になっています。SHOW ALL TOPICS ステートメントを使用して、Kafka クラスター上で使用可能な全トピックのリストを表示します。

SHOW ALL TOPICS;

出力は以下のようになります。

 Kafka Topic                            | Partitions | Partition Replicas
--------------------------------------------------------------------------
 __confluent.support.metrics            | 1          | 1
 _confluent-ksql-default__command_topic | 1          | 1
 _confluent-license                     | 1          | 1
 _confluent-metrics                     | 12         | 1
 default_ksql_processing_log            | 1          | 1
 pageviews                              | 1          | 1
 users                                  | 1          | 1
--------------------------------------------------------------------------

PRINT ステートメントを使用した users トピックの調査

PRINT users;

注釈

PRINT ステートメントは、ksqlDB のコマンドの中でも大文字と小文字が区別される数少ないコマンドの 1 つであり、トピック名が引用符で囲まれていない場合でも区別されます。

出力は以下のようになります。

Key format: KAFKA_STRING
Value format: AVRO
rowtime: 10/30/18 10:15:51 PM GMT, key: User_1, value: {"registertime":1516754966866,"userid":"User_1","regionid":"Region_9","gender":"MALE"}
rowtime: 10/30/18 10:15:51 PM GMT, key: User_3, value: {"registertime":1491558386780,"userid":"User_3","regionid":"Region_2","gender":"MALE"}
rowtime: 10/30/18 10:15:53 PM GMT, key: User_7, value: {"registertime":1514374073235,"userid":"User_7","regionid":"Region_2","gender":"OTHER"}
^Crowtime: 10/30/18 10:15:59 PM GMT, key: User_4, value: {"registertime":1510034151376,"userid":"User_4","regionid":"Region_8","gender":"FEMALE"}
Topic printing ceased

CTRL + C を押して、メッセージのプリントを停止します。

PRINT ステートメントを使用した pageviews トピックの調査

PRINT pageviews;

出力は以下のようになります。

Key format: KAFKA_INTEGER
Format: KAFKA_STRING
rowtime: 10/23/18 12:24:03 AM PSD, key: 1540254243183, value: 1540254243183,User_9,Page_20
rowtime: 10/23/18 12:24:03 AM PSD, key: 1540254243617, value: 1540254243617,User_7,Page_47
rowtime: 10/23/18 12:24:03 AM PSD, key: 1540254243888, value: 1540254243888,User_4,Page_27
^Crowtime: 10/23/18 12:24:05 AM PSD, key: 1540254245161, value: 1540254245161,User_9,Page_62
Topic printing ceased

CTRL + C を押して、メッセージのプリントを停止します。

詳細については、「ksqlDB 構文リファレンス」を参照してください。

ストリームおよびテーブルの作成

この例では、pageviews および users と呼ばれる Kafka トピックのメッセージをクエリで取得するために、以下のスキーマを使用します。

../../_images/ksql-quickstart-schemas.ja.jpg
  1. Kafka のトピック pageviews から pageviews_original という名前のストリームを作成し、value_format の値には DELIMITED を指定します。

    CREATE STREAM pageviews_original (rowkey bigint key, userid varchar, pageid varchar)
    WITH (kafka_topic='pageviews', value_format='DELIMITED');
    

    出力は以下のようになります。

     Message
    ---------------
     Stream created
    ---------------
    

    ちなみに

    DESCRIBE pageviews_original; を実行すると、ストリームのスキーマを見ることができます。ksqlDB が作成した ROWTIME という列が追加されていることに注意してください。これは Kafka メッセージのタイムスタンプに対応しています。

  2. Kafka のトピック users から users_original という名前のテーブルを作成し、value_format の値には AVRO を指定します。

    CREATE TABLE users_original WITH (kafka_topic='users', value_format='AVRO', key = 'userid');
    

    出力は以下のようになります。

     Message
    ---------------
     Table created
    ---------------
    

    ちなみに

    DESCRIBE users_original; を実行すると、テーブルのスキーマを見ることができます。

    注釈

    CREATE TABLE ステートメントは、CREATE STREAM ステートメントのように列のセットを定義していないことがわかります。これは、値のフォーマットが Avro であり、DataGen ツールが Schema Registry へ Avro スキーマをパブリッシュしているためです。ksqlDB はそのスキーマを Schema Registry から取得し、テーブルの SQL スキーマの構築に使用します。必要であれば、スキーマを指定することもできます。Github issue #4462 が完了するまで、スキーマの推測はこの例のようにデータのキーが STRING である場合にのみ利用可能です。

    注釈

    生成されたデータでは、Kafka レコードのキーの値が userId フィールドの値になります。上記では、WITH 句に key='userId' と指定することにより、このことを ksqlDB に知らせています。ksqlDB がこの情報を使用することにより、テーブルに対する結合が可能になり、ROWKEY よりもわかりやすい userId 列名を使用することができます。どちらに結合しても同じ結果が得られます。データの値の中にキーのコピーがなければ、ROWKEY で結合することができます。

  3. (省略可): すべてのストリームおよびテーブルを表示します。

    ksql> SHOW STREAMS;
    
     Stream Name         | Kafka Topic                 | Format
    ---------------------------------------------------------------
     KSQL_PROCESSING_LOG | default_ksql_processing_log | JSON
     PAGEVIEWS_ORIGINAL  | pageviews                   | DELIMITED
    ---------------------------------------------------------------
    
    ksql> SHOW TABLES;
    
     Table Name     | Kafka Topic | Format | Windowed
    --------------------------------------------------
     USERS_ORIGINAL | users       | AVRO   | false
    --------------------------------------------------
    

    ちなみに

    SHOW STREAMS の出力に KSQL_PROCESSING_LOG ストリームがリストされていることに注意してください。ksqlDB では、データの処理中になんらかの問題が発生すると、それについて説明するメッセージが追加されます。期待どおりに機能しないものがある場合は、このストリームの内容を見て ksqlDB にデータエラーが発生してないかを確認します。

データの表示

  1. SELECT を使用して、TABLE からデータを返すクエリを作成します。このクエリには、クエリの結果で返される行数を制限する LIMIT キーワードと、結果をストリームにより返すよう指示する EMIT CHANGES キーワードが含まれます。これがいわゆる プルクエリ です。各種クエリの解説については、「クエリ」を参照してください。なお、正確なデータ出力はデータ生成のランダム性により変動することがあります。

    SELECT * from users_original emit changes limit 5;
    

    出力は以下のようになります。

    +--------------------+--------------+--------------+---------+----------+-------------+
    |ROWTIME             |ROWKEY        |REGISTERTIME  |GENDER   |REGIONID  |USERID       |
    +--------------------+--------------+--------------+---------+----------+-------------+
    |1581077558655       |User_9        |1513529638461 |OTHER    |Region_1  |User_9       |
    |1581077561454       |User_7        |1489408314958 |OTHER    |Region_2  |User_7       |
    |1581077561654       |User_3        |1511291005264 |MALE     |Region_2  |User_3       |
    |1581077561857       |User_4        |1496797956753 |OTHER    |Region_1  |User_4       |
    |1581077562858       |User_8        |1489169082491 |FEMALE   |Region_8  |User_8       |
    Limit Reached
    Query terminated
    

    注釈

    テーブルに対するプッシュクエリは、Kafka の changelog トピックに保管されているテーブルの全履歴を出力します。つまり、過去のデータを出力した後に、テーブルに対する更新のストリームを出力します。したがって、テーブル内の既存の行が更新されると、その行と一致する ROWKEY を持つ行が出力される可能性があります。

  2. 以下のプッシュクエリを発行して、pageviews_original ストリームのデータを表示します。

    SELECT viewtime, userid, pageid FROM pageviews_original emit changes LIMIT 3;
    

    出力は以下のようになります。

    +--------------+--------------+--------------+
    |VIEWTIME      |USERID        |PAGEID        |
    +--------------+--------------+--------------+
    |1581078296791 |User_1        |Page_54       |
    |1581078297792 |User_8        |Page_93       |
    |1581078298792 |User_6        |Page_26       |
    Limit Reached
    Query terminated
    

    注釈

    デフォルトでは、ストリームに対するプッシュクエリはクエリ開始後に発生した変更のみを出力します。つまり、過去のデータは含まれません。過去のデータを表示する場合は、set 'auto.offset.reset'='earliest'; を実行してセッションのプロパティを更新します。

クエリの書き込み

この例では、ksqlDB を使用してクエリを書き込みます。

注記 : デフォルトでは、ksqlDB はストリームおよびテーブルのトピックを最新のオフセットから読み取ります。

  1. users テーブルからユーザーの genderregionid を読み取って pageviews データを拡張するクエリを作成します。以下のクエリは、users_original テーブルの userid 列に対して LEFT JOIN を行うことにより、pageviews_original ストリームを拡張します。

    SELECT users_original.userid AS userid, pageid, regionid, gender
     FROM pageviews_original
     LEFT JOIN users_original
       ON pageviews_original.userid = users_original.userid
     EMIT CHANGES
     LIMIT 5;
    

    出力は以下のようになります。

    +-------------------+-------------------+-------------------+-------------------+
    |USERID             |PAGEID             |REGIONID           |GENDER             |
    +-------------------+-------------------+-------------------+-------------------+
    |User_7             |Page_23            |Region_2           |OTHER              |
    |User_3             |Page_42            |Region_2           |MALE               |
    |User_7             |Page_87            |Region_2           |OTHER              |
    |User_2             |Page_57            |Region_5           |FEMALE             |
    |User_9             |Page_59            |Region_1           |OTHER              |
    Limit Reached
    Query terminated
    

    注釈

    users テーブルに対する結合は userid 列に対して行われます。これは、CREATE TABLE ステートメントでテーブルのプライマリキー ROWKEY のエイリアスとして識別されています。userIdROWKEY は、テーブルに対する JOIN 基準として相互に入れ替えて使用できます。ただしストリーム側の userid のデータはストリームのキーと一致しないため、ksqlDB は結合前に userId 列でストリームを内部的に再分割します。

  2. CREATE STREAM キーワードを使用して、SELECT ステートメントに先行する永続的なクエリを作成します。クエリの結果は PAGEVIEWS_ENRICHED Kafka トピックに書き込まれます。以下のクエリは、users_original テーブルのユーザー ID に対して LEFT JOIN を行うことにより、pageviews_original ストリームを拡張します。

    CREATE STREAM pageviews_enriched AS
    SELECT users_original.userid AS userid, pageid, regionid, gender
    FROM pageviews_original
    LEFT JOIN users_original
      ON pageviews_original.userid = users_original.userid
    EMIT CHANGES;
    

    出力は以下のようになります。

     Message
    ----------------------------------------------------------------------------------------------------------
     Stream PAGEVIEWS_ENRICHED created and running. Created by query with query ID: CSAS_PAGEVIEWS_ENRICHED_0
    ----------------------------------------------------------------------------------------------------------
    

    ちなみに

    DESCRIBE pageviews_enriched; を実行すると、ストリームを記述できます。

  3. SELECT を使用して、クエリ結果の到着ごとに表示します。クエリ結果の表示を停止するには Ctrl + C を押します。これによりコンソールへのプリントは停止しますが、クエリそのものは停止しません。基盤となる ksqlDB アプリケーションではクエリが実行され続けます。

    SELECT * FROM pageviews_enriched emit changes;
    

    出力は以下のようになります。

        +-------------+------------+------------+------------+------------+------------+
        |ROWTIME      |ROWKEY      |USERID      |PAGEID      |REGIONID    |GENDER      |
        +-------------+------------+------------+------------+------------+------------+
        |1581079706741|User_5      |User_5      |Page_53     |Region_3    |FEMALE      |
        |1581079707742|User_2      |User_2      |Page_86     |Region_5    |OTHER       |
        |1581079708745|User_9      |User_9      |Page_75     |Region_1    |OTHER       |
    
        ^CQuery terminated
    
    Use Ctrl+C to terminate the query.
    
  4. WHERE を使用して、条件によりストリームのコンテンツを制限する新しい永続的なクエリを作成します。クエリの結果は、PAGEVIEWS_FEMALE と呼ばれる Kafka トピックに書き込まれます。

    CREATE STREAM pageviews_female AS
      SELECT * FROM pageviews_enriched
      WHERE gender = 'FEMALE'
      EMIT CHANGES;
    

    出力は以下のようになります。

     Message
    -------------------------------------------------------------------------------------------------------
     Stream PAGEVIEWS_FEMALE created and running. Created by query with query ID: CSAS_PAGEVIEWS_FEMALE_11
    -------------------------------------------------------------------------------------------------------
    

    ちなみに

    DESCRIBE pageviews_female; を実行すると、ストリームを記述できます。

  5. LIKE を使用して、別の条件を満たす新しい永続的なクエリを作成します。クエリの結果は pageviews_enriched_r8_r9 Kafka トピックに書き込まれます。

    CREATE STREAM pageviews_female_like_89
      WITH (kafka_topic='pageviews_enriched_r8_r9') AS
      SELECT * FROM pageviews_female
      WHERE regionid LIKE '%_8' OR regionid LIKE '%_9'
      EMIT CHANGES;
    

    出力は以下のようになります。

     Message
    -----------------------------------------------------------------------------------------------------------------------
     Stream PAGEVIEWS_FEMALE_LIKE_89 created and running. Created by query with query ID: CSAS_PAGEVIEWS_FEMALE_LIKE_89_13
    -----------------------------------------------------------------------------------------------------------------------
    
  6. カウントが 1 より大きい場合に 30 秒の タンブリングウィンドウ で地域と性別の組み合わせごとにページビューをカウントする新しい永続的なクエリを作成します。クエリの結果は PAGEVIEWS_REGIONS Kafka トピックに Avro フォーマットで書き込まれます。ksqlDB は、PAGEVIEWS_REGIONS トピックへ最初のメッセージを書き込む際に、構成済みの Schema Registry で Avro スキーマを登録します。

    CREATE TABLE pageviews_regions
      WITH (VALUE_FORMAT='avro') AS
    SELECT gender, regionid , COUNT(*) AS numusers
    FROM pageviews_enriched
      WINDOW TUMBLING (size 30 second)
    GROUP BY gender, regionid
    EMIT CHANGES;
    

    出力は以下のようになります。

     Message
    --------------------------------------------------------------------------------------------------------
     Table PAGEVIEWS_REGIONS created and running. Created by query with query ID: CTAS_PAGEVIEWS_REGIONS_15
    --------------------------------------------------------------------------------------------------------
    

    ちなみに

    DESCRIBE pageviews_regions; を実行すると、テーブルを記述できます。

  7. (省略可): プッシュクエリを使用して、上記のクエリの結果を表示します。

    SELECT * FROM pageviews_regions EMIT CHANGES LIMIT 5;
    

    出力は以下のようになります。

    +---------------+-----------------+---------------+---------------+---------------+---------------+---------------+
    |ROWTIME        |ROWKEY           |WINDOWSTART    |WINDOWEND      |GENDER         |REGIONID       |NUMUSERS       |
    +---------------+-----------------+---------------+---------------+---------------+---------------+---------------+
    |1581080500530  |OTHER|+|Region_9 |1581080490000  |1581080520000  |OTHER          |Region_9       |1              |
    |1581080501530  |OTHER|+|Region_5 |1581080490000  |1581080520000  |OTHER          |Region_5       |2              |
    |1581080510532  |MALE|+|Region_7  |1581080490000  |1581080520000  |MALE           |Region_7       |4              |
    |1581080513532  |FEMALE|+|Region_1|1581080490000  |1581080520000  |FEMALE         |Region_1       |2              |
    |1581080516533  |MALE|+|Region_2  |1581080490000  |1581080520000  |MALE           |Region_2       |3              |
    Limit Reached
    Query terminated
    

    注釈

    WINDOWSTART 列と WINDOWEND 列の追加に注目してください。これらが利用可能な理由は、pageviews_regions が 30 秒あたりの ウィンドウ でデータを集計しているためです。ksqlDB は、ウィンドウ化された集計結果に応じてこれらのシステム列を自動的に追加します。

  8. (省略可): プルクエリを使用して、前のクエリの結果を表示します。

    CREATE TABLE ステートメントに GROUP BY 句が含まれている場合、ksqlDB は集計結果を保管するテーブルを内部的に構築します。ksqlDB はこのような集計結果に対するプルクエリをサポートしています。

    前のステップで使用した結果のストリームを プッシュ するプッシュクエリとは異なり、プルクエリは結果セットを プル して自動的に終了します。

    プルクエリには EMIT CHANGES 句はありません。

    プルクエリを使用して、特定の性別と地域について利用可能なすべてのウィンドウおよびユーザーカウントを表示します。

    SELECT * FROM pageviews_regions WHERE ROWKEY='OTHER|+|Region_9';
    

    出力は以下のようになります。

    +------------------+------------------+------------------+------------------+------------------+------------------+------------------+
    |ROWKEY            |WINDOWSTART       |WINDOWEND         |ROWTIME           |GENDER            |REGIONID          |NUMUSERS          |
    +------------------+------------------+------------------+------------------+------------------+------------------+------------------+
    |OTHER|+|Region_9  |1581080490000     |1581080520000     |1581080500530     |OTHER             |Region_9          |1                 |
    |OTHER|+|Region_9  |1581080550000     |1581080580000     |1581080576526     |OTHER             |Region_9          |4                 |
    |OTHER|+|Region_9  |1581080580000     |1581080610000     |1581080606525     |OTHER             |Region_9          |4                 |
    |OTHER|+|Region_9  |1581080610000     |1581080640000     |1581080622524     |OTHER             |Region_9          |3                 |
    |OTHER|+|Region_9  |1581080640000     |1581080670000     |1581080667528     |OTHER             |Region_9          |6                 |
    

    pageviews_regions などのウィンドウ化されたテーブルに対するプルクエリは、単一ウィンドウの結果に対するクエリもサポートしています。

    SELECT NUMUSERS FROM pageviews_regions WHERE ROWKEY='OTHER|+|Region_9' AND WINDOWSTART=1581080550000;
    

    注釈

    前の SQL の WINDOWSTART の値は、データのウィンドウ境界のいずれかと一致するように変更する必要があります。そうしなければ、結果は返されません。

    出力は以下のようになります。

    +----------+
    |NUMUSERS  |
    +----------+
    |4         |
    Query terminated
    

    ウィンドウの範囲を照会するクエリを実行します。

    SELECT WINDOWSTART, WINDOWEND, NUMUSERS FROM pageviews_regions WHERE ROWKEY='OTHER|+|Region_9' AND 1581080550000 <= WINDOWSTART AND WINDOWSTART <= 1581080610000;
    

    注釈

    前の SQL の WINDOWSTART の値は、データのウィンドウ境界のいずれかと一致するように変更する必要があります。そうしなければ、結果は返されません。

    出力は以下のようになります。

    +----------------------------+----------------------------+----------------------------+
    |WINDOWSTART                 |WINDOWEND                   |NUMUSERS                    |
    +----------------------------+----------------------------+----------------------------+
    |1581080550000               |1581080580000               |4                           |
    |1581080580000               |1581080610000               |4                           |
    |1581080610000               |1581080640000               |3                           |
    Query terminated
    
  9. (省略可): すべての永続的なクエリを表示します。

    SHOW QUERIES;
    

    出力は以下のようになります。

    Query ID                         | Status  | Sink Name                | Sink Kafka Topic         | Query String
    -----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------
    CTAS_PAGEVIEWS_REGIONS_15        | RUNNING | PAGEVIEWS_REGIONS        | PAGEVIEWS_REGIONS        | CREATE TABLE PAGEVIEWS_REGIONS WITH (KAFKA_TOPIC='PAGEVIEWS_REGIONS', PARTITIONS=1, REPLICAS=1, VALUE_FORMAT='avro') AS SELECT  PAGEVIEWS_ENRICHED.GENDER GENDER,  PAGEVIEWS_ENRICHED.REGIONID REGIONID,  COUNT(*) NUMUSERSFROM PAGEVIEWS_ENRICHED PAGEVIEWS_ENRICHEDWINDOW TUMBLING ( SIZE 30 SECONDS ) GROUP BY PAGEVIEWS_ENRICHED.GENDER, PAGEVIEWS_ENRICHED.REGIONIDEMIT CHANGES;
    CSAS_PAGEVIEWS_FEMALE_LIKE_89_13 | RUNNING | PAGEVIEWS_FEMALE_LIKE_89 | pageviews_enriched_r8_r9 | CREATE STREAM PAGEVIEWS_FEMALE_LIKE_89 WITH (KAFKA_TOPIC='pageviews_enriched_r8_r9', PARTITIONS=1, REPLICAS=1) AS SELECT *FROM PAGEVIEWS_FEMALE PAGEVIEWS_FEMALEWHERE ((PAGEVIEWS_FEMALE.REGIONID LIKE '%_8') OR (PAGEVIEWS_FEMALE.REGIONID LIKE '%_9'))EMIT CHANGES;
    CSAS_PAGEVIEWS_ENRICHED_0        | RUNNING | PAGEVIEWS_ENRICHED       | PAGEVIEWS_ENRICHED       | CREATE STREAM PAGEVIEWS_ENRICHED WITH (KAFKA_TOPIC='PAGEVIEWS_ENRICHED', PARTITIONS=1, REPLICAS=1) AS SELECT  USERS_ORIGINAL.USERID USERID,  PAGEVIEWS_ORIGINAL.PAGEID PAGEID,  USERS_ORIGINAL.REGIONID REGIONID,  USERS_ORIGINAL.GENDER GENDERFROM PAGEVIEWS_ORIGINAL PAGEVIEWS_ORIGINALLEFT OUTER JOIN USERS_ORIGINAL USERS_ORIGINAL ON ((PAGEVIEWS_ORIGINAL.USERID = USERS_ORIGINAL.USERID))EMIT CHANGES;
    CSAS_PAGEVIEWS_FEMALE_11         | RUNNING | PAGEVIEWS_FEMALE         | PAGEVIEWS_FEMALE         | CREATE STREAM PAGEVIEWS_FEMALE WITH (KAFKA_TOPIC='PAGEVIEWS_FEMALE', PARTITIONS=1, REPLICAS=1) AS SELECT *FROM PAGEVIEWS_ENRICHED PAGEVIEWS_ENRICHEDWHERE (PAGEVIEWS_ENRICHED.GENDER = 'FEMALE')EMIT CHANGES;
    

    クエリの詳細が必要な場合は、EXPLAIN <Query ID>; を実行します。

  10. (省略可): クエリ実行時間のメトリクスや詳細を調べます。ターゲットである Kafka トピックが利用可能であることや、処理されているメッセージのスループットを示す数値などの情報を確認します。

    DESCRIBE EXTENDED PAGEVIEWS_REGIONS;
    

    出力は以下のようになります。

    Name                 : PAGEVIEWS_REGIONS
    Type                 : TABLE
    Key field            :
    Timestamp field      : Not set - using <ROWTIME>
    Key format           : KAFKA
    Value format         : AVRO
    Kafka topic          : PAGEVIEWS_REGIONS (partitions: 1, replication: 1)
    Statement            : CREATE TABLE PAGEVIEWS_REGIONS WITH (KAFKA_TOPIC='PAGEVIEWS_REGIONS', PARTITIONS=1, REPLICAS=1, VALUE_FORMAT='json') AS SELECT
      PAGEVIEWS_ENRICHED.GENDER GENDER,
      PAGEVIEWS_ENRICHED.REGIONID REGIONID,
      COUNT(*) NUMUSERS
    FROM PAGEVIEWS_ENRICHED PAGEVIEWS_ENRICHED
    WINDOW TUMBLING ( SIZE 30 SECONDS )
    GROUP BY PAGEVIEWS_ENRICHED.GENDER, PAGEVIEWS_ENRICHED.REGIONID
    EMIT CHANGES;
    
     Field    | Type
    --------------------------------------------------------------
     ROWTIME  | BIGINT           (system)
     ROWKEY   | VARCHAR(STRING)  (system) (Window type: TUMBLING)
     GENDER   | VARCHAR(STRING)
     REGIONID | VARCHAR(STRING)
     NUMUSERS | BIGINT
    --------------------------------------------------------------
    
    Queries that write from this TABLE
    -----------------------------------
    CTAS_PAGEVIEWS_REGIONS_15 (RUNNING) : CREATE TABLE PAGEVIEWS_REGIONS WITH (KAFKA_TOPIC='PAGEVIEWS_REGIONS', PARTITIONS=1, REPLICAS=1, VALUE_FORMAT='json') AS SELECT  PAGEVIEWS_ENRICHED.GENDER GENDER,  PAGEVIEWS_ENRICHED.REGIONID REGIONID,  COUNT(*) NUMUSERSFROM PAGEVIEWS_ENRICHED PAGEVIEWS_ENRICHEDWINDOW TUMBLING ( SIZE 30 SECONDS ) GROUP BY PAGEVIEWS_ENRICHED.GENDER, PAGEVIEWS_ENRICHED.REGIONIDEMIT CHANGES;
    
    For query topology and execution plan please run: EXPLAIN <QueryId>
    
    Local runtime statistics
    ------------------------
    messages-per-sec:      0.90   total-messages:       498     last-message: 2020-02-07T13:10:32.033Z
    

ksqlDB でのネスト化されたスキーマ(STRUCT)の使用

Struct のサポートにより、JSON と Avro のどちらでも Kafka のトピック内のネスト化データのモデリングとアクセスが可能になります。

ここでは ksql-datagen ツールを使用して、ネスト化された address フィールドを含むサンプルデータをいくつか作成します。これを新しいウィンドウで実行し、そのままにしておきます。

<path-to-confluent>/bin/ksql-datagen  \
     quickstart=orders \
     format=avro \
     topic=orders

ksqlDB のコマンドプロンプトから、そのトピックを ksqlDB に登録します。

CREATE STREAM ORDERS
  (
    ROWKEY INT KEY,
    ORDERTIME BIGINT,
    ORDERID INT,
    ITEMID STRING,
    ORDERUNITS DOUBLE,
    ADDRESS STRUCT<CITY STRING, STATE STRING, ZIPCODE BIGINT>
  )
   WITH (KAFKA_TOPIC='orders', VALUE_FORMAT='json', key='orderid');

出力は以下のようになります。

 Message
----------------
 Stream created
----------------

DESCRIBE 関数を使用してスキーマを調べると、STRUCT が含まれています。

DESCRIBE ORDERS;

出力は以下のようになります。

Name                 : ORDERS
 Field      | Type
----------------------------------------------------------------------------------
 ROWTIME    | BIGINT           (system)
 ROWKEY     | INT              (system)
 ORDERTIME  | BIGINT
 ORDERID    | INTEGER
 ITEMID     | VARCHAR(STRING)
 ORDERUNITS | DOUBLE
 ADDRESS    | STRUCT<CITY VARCHAR(STRING), STATE VARCHAR(STRING), ZIPCODE BIGINT>
----------------------------------------------------------------------------------
For runtime statistics and query details run: DESCRIBE EXTENDED <Stream,Table>;
ksql>

データのクエリを実行し、-> 表記を使用して Struct の内容にアクセスします。

SELECT ORDERID, ADDRESS->CITY FROM ORDERS EMIT CHANGES LIMIT 5;

出力は以下のようになります。

+-----------------------------------+-----------------------------------+
|ORDERID                            |ADDRESS__CITY                      |
+-----------------------------------+-----------------------------------+
|1188                               |City_95                            |
|1189                               |City_24                            |
|1190                               |City_57                            |
|1191                               |City_37                            |
|1192                               |City_82                            |
Limit Reached
Query terminated

ストリーム同士の結合

ストリーム同士の結合を使用することにより、共通のキーについて 2 つの ストリーム(イベントのストリーム)を結合できます。一例として、注文イベントのストリームと出荷イベントのストリームが挙げられます。これらを注文のキーで結合することにより、注文と併せて出荷の情報を見ることができるようになります。

ksqlDB CLI で 2 つのストリームを新たに作成します。どちらのストリームもそれぞれの注文 ID を ROWKEY に保管します。

<path-to-confluent>/bin/kafka-console-producer \
      --broker-list localhost:9092 \
      --topic new_orders \
      --property "parse.key=true" \
      --property "key.separator=:"<<EOF
1:{"order_id":1,"total_amount":10.50,"customer_name":"Bob Smith"}
2:{"order_id":2,"total_amount":3.32,"customer_name":"Sarah Black"}
3:{"order_id":3,"total_amount":21.00,"customer_name":"Emma Turner"}
EOF

<path-to-confluent>/bin/kafka-console-producer \
      --broker-list localhost:9092 \
      --topic shipments \
      --property "parse.key=true" \
      --property "key.separator=:"<<EOF
1:{"order_id":1,"shipment_id":42,"warehouse":"Nashville"}
3:{"order_id":3,"shipment_id":43,"warehouse":"Palo Alto"}
EOF

ちなみに

なお、上記のステートメントを実行すると以下の警告メッセージが表示される場合がありますが、このメッセージは無視しても安全です。

Error while fetching metadata with correlation id 1 : {new_orders=LEADER_NOT_AVAILABLE} (org.apache.kafka.clients.NetworkClient)
Error while fetching metadata with correlation id 1 : {shipments=LEADER_NOT_AVAILABLE} (org.apache.kafka.clients.NetworkClient)
CREATE STREAM NEW_ORDERS (ROWKEY INT KEY, TOTAL_AMOUNT DOUBLE, CUSTOMER_NAME VARCHAR)
WITH (KAFKA_TOPIC='new_orders', VALUE_FORMAT='JSON', PARTITIONS=2);

CREATE STREAM SHIPMENTS (ROWKEY INT KEY, SHIPMENT_ID INT, WAREHOUSE VARCHAR)
WITH (KAFKA_TOPIC='shipments', VALUE_FORMAT='JSON', PARTITIONS=2);

注釈

ksqlDB は、これらのステートメントが実行される際に基盤となるトピックを Kafka で作成します。また、REPLICAS のカウントを指定できます。

2 つの CREATE STREAM ステートメントの後の出力は以下のようになります。

 Message
----------------
 Stream created
----------------

INSERT VALUES ステートメントを使用して、ストリームにサンプルデータを入力します。

-- Insert values in NEW_ORDERS:
-- insert supplying the list of columns to insert:
INSERT INTO NEW_ORDERS (ROWKEY, CUSTOMER_NAME, TOTAL_AMOUNT)
VALUES (1, 'Bob Smith', 10.50);

-- shorthand syntax can be used when inserting values for all columns (except ROWTIME), in column order:
INSERT INTO NEW_ORDERS  VALUES (2, 3.32, 'Sarah Black');
INSERT INTO NEW_ORDERS  VALUES (3, 21.00, 'Emma Turner');

-- Insert values in SHIPMENTS:
INSERT INTO SHIPMENTS VALUES (1, 42, 'Nashville');
INSERT INTO SHIPMENTS VALUES (3, 43, 'Palo Alto');

データのクエリを実行してトピックにそのデータが存在することを確認します。

ちなみに

以下を実行して、各ストリームの beginning から読み取るよう ksqlDB に指示します。

SET 'auto.offset.reset' = 'earliest';

現在の ksqlDB CLI セッションで既に実行している場合はスキップできます。

NEW_ORDERS トピックについて以下を実行します。

SELECT * FROM NEW_ORDERS EMIT CHANGES LIMIT 3;

出力は以下のようになります。

+-------------------------+-------------------------+-------------------------+-------------------------+
|ROWTIME                  |ROWKEY                   |TOTAL_AMOUNT             |CUSTOMER_NAME            |
+-------------------------+-------------------------+-------------------------+-------------------------+
|1581083057609            |1                        |10.5                     |Bob Smith                |
|1581083178418            |2                        |3.32                     |Sarah Black              |
|1581083210494            |3                        |21.0                     |Emma Turner              |
Limit Reached
Query terminated

SHIPMENTS トピックについて以下を実行します。

SELECT * FROM SHIPMENTS EMIT CHANGES LIMIT 2;

出力は以下のようになります。

+-------------------------+-------------------------+-------------------------+-------------------------+
|ROWTIME                  |ROWKEY                   |SHIPMENT_ID              |WAREHOUSE                |
+-------------------------+-------------------------+-------------------------+-------------------------+
|1581083340711            |1                        |42                       |Nashville                |
|1581083384229            |3                        |43                       |Palo Alto                |
Limit Reached
Query terminated

以下のクエリを実行します。これにより、1 時間の結合ウィンドウを基に、注文とそれに伴う出荷が表示されます。

SELECT O.ROWKEY AS ORDER_ID, O.TOTAL_AMOUNT, O.CUSTOMER_NAME, S.SHIPMENT_ID, S.WAREHOUSE
FROM NEW_ORDERS O
INNER JOIN SHIPMENTS S
  WITHIN 1 HOURS
  ON O.ROWKEY = S.ROWKEY
EMIT CHANGES;

出力は以下のようになります。

+--------------------------+--------------------------+--------------------------+--------------------------+--------------------------+
|ORDER_ID                  |TOTAL_AMOUNT              |CUSTOMER_NAME             |SHIPMENT_ID               |WAREHOUSE                 |
+--------------------------+--------------------------+--------------------------+--------------------------+--------------------------+
|1                         |10.5                      |Bob Smith                 |42                        |Nashville                 |
|3                         |21.0                      |Emma Turner               |43                        |Palo Alto                 |

なお、ORDER_ID=2 のメッセージには、対応する SHIPMENT_ID または WAREHOUSE がありません。これは、指定された時間ウィンドウ内の shipments ストリームに対応する行がないためです。

以下を実行して、2 つ目のウィンドウで ksqlDB CLI を起動します。

LOG_DIR=./ksql_logs $CONFLUENT_HOME/bin/ksql

以下の INSERT VALUES ステートメントを入力して、注文 ID 2 の出荷を挿入します。

最初の ksqlDB CLI ウィンドウに切り替えます。これで 3 行目が出力されています。

+--------------------------+--------------------------+--------------------------+--------------------------+--------------------------+
|ORDER_ID                  |TOTAL_AMOUNT              |CUSTOMER_NAME             |SHIPMENT_ID               |WAREHOUSE                 |
+--------------------------+--------------------------+--------------------------+--------------------------+--------------------------+
|1                         |10.5                      |Bob Smith                 |42                        |Nashville                 |
|3                         |21.0                      |Emma Turner               |43                        |Palo Alto                 |
|2                         |3.32                      |Sarah Black               |49                        |London                    |

Ctrl + C を押して SELECT クエリをキャンセルし、 ksqlDB のプロンプトに戻ります。

テーブル同士の結合

テーブル同士の結合を使用することにより、共通のキーについて 2 つの テーブル を結合できます。ksqlDB のテーブルは、指定した キー について最新の を提供します。結合は キー に対してのみ可能であり、1 対多(1:N)の結合は現在のセマンティックモデルではサポートされていません。

この例では、あるシステムから得た倉庫の位置データがあり、別のシステムからの倉庫の規模に関するデータによりこれを拡張します。

ksqlDB CLI で、両方のトピックを ksqlDB のテーブルとして登録します。なおこの例では、倉庫 ID はキーと WAREHOUSE_ID フィールドの値の両方に保管されています。

<path-to-confluent>/bin/kafka-console-producer \
      --broker-list localhost:9092 \
      --topic warehouse_location \
      --property "parse.key=true" \
      --property "key.separator=:"<<EOF

出力は以下のようになります。

1:{"warehouse_id":1,"city":"Leeds","country":"UK"}
2:{"warehouse_id":2,"city":"Sheffield","country":"UK"}
3:{"warehouse_id":3,"city":"Berlin","country":"Germany"}
EOF
<path-to-confluent>/bin/kafka-console-producer \
      --broker-list localhost:9092 \
      --topic warehouse_size \
      --property "parse.key=true" \
      --property "key.separator=:"<<EOF

出力は以下のようになります。

1:{"warehouse_id":1,"square_footage":16000}
2:{"warehouse_id":2,"square_footage":42000}
3:{"warehouse_id":3,"square_footage":94000}
EOF
CREATE TABLE WAREHOUSE_LOCATION
   (ROWKEY INT KEY, WAREHOUSE_ID INT, CITY VARCHAR, COUNTRY VARCHAR)
   WITH (KAFKA_TOPIC='warehouse_location',
      VALUE_FORMAT='JSON',
      KEY='WAREHOUSE_ID',
      PARTITIONS=2);

CREATE TABLE WAREHOUSE_SIZE
   (ROWKEY INT KEY, WAREHOUSE_ID INT, SQUARE_FOOTAGE DOUBLE)
   WITH (KAFKA_TOPIC='warehouse_size',
      VALUE_FORMAT='JSON',
      KEY='WAREHOUSE_ID',
      PARTITIONS=2);

2 つの CREATE TABLE ステートメントの後の出力は以下のようになります。

 Message
---------------
 Table created
---------------
-- note: ksqlDB will automatically populate ROWKEY with the same value as WAREHOUSE_ID:
INSERT INTO WAREHOUSE_LOCATION (WAREHOUSE_ID, CITY, COUNTRY) VALUES (1, 'Leeds', 'UK');
INSERT INTO WAREHOUSE_LOCATION (WAREHOUSE_ID, CITY, COUNTRY) VALUES (2, 'Sheffield', 'UK');
INSERT INTO WAREHOUSE_LOCATION (WAREHOUSE_ID, CITY, COUNTRY) VALUES (3, 'Berlin', 'Germany');

INSERT INTO WAREHOUSE_SIZE (WAREHOUSE_ID, SQUARE_FOOTAGE) VALUES (1, 16000);
INSERT INTO WAREHOUSE_SIZE (WAREHOUSE_ID, SQUARE_FOOTAGE) VALUES (2, 42000);
INSERT INTO WAREHOUSE_SIZE (WAREHOUSE_ID, SQUARE_FOOTAGE) VALUES (3, 94000);

WAREHOUSE_LOCATION テーブルを調べます。

SELECT ROWKEY, WAREHOUSE_ID FROM WAREHOUSE_LOCATION EMIT CHANGES LIMIT 3;

出力は以下のようになります。

+---------------------------------------+---------------------------------------+
|ROWKEY                                 |WAREHOUSE_ID                           |
+---------------------------------------+---------------------------------------+
|2                                      |2                                      |
|1                                      |1                                      |
|3                                      |3                                      |
Limit Reached
Query terminated

WAREHOUSE_SIZE テーブルを調べます。

SELECT ROWKEY, WAREHOUSE_ID FROM WAREHOUSE_SIZE EMIT CHANGES LIMIT 3;

出力は以下のようになります。

+---------------------------------------+---------------------------------------+
|ROWKEY                                 |WAREHOUSE_ID                           |
+---------------------------------------+---------------------------------------+
|2                                      |2                                      |
|1                                      |1                                      |
|3                                      |3                                      |
Limit Reached
Query terminated

ここで 2 つのテーブルを結合します。

SELECT WL.WAREHOUSE_ID, WL.CITY, WL.COUNTRY, WS.SQUARE_FOOTAGE
FROM WAREHOUSE_LOCATION WL
  LEFT JOIN WAREHOUSE_SIZE WS
    ON WL.WAREHOUSE_ID=WS.WAREHOUSE_ID
EMIT CHANGES
LIMIT 3;

出力は以下のようになります。

+------------------+------------------+------------------+------------------+
|WL_WAREHOUSE_ID   |CITY              |COUNTRY           |SQUARE_FOOTAGE    |
+------------------+------------------+------------------+------------------+
|1                 |Leeds             |UK                |16000.0           |
|1                 |Leeds             |UK                |16000.0           |
|2                 |Sheffield         |UK                |42000.0           |
Limit Reached
Query terminated

INSERT INTO

INSERT INTO 構文は、複数ストリームのコンテンツのマージに使用することができます。例として、同じイベントタイプを複数の異なる送信元から取得する場合などが挙げられます。

それぞれが異なるトピックへ書き込む 2 つの datagen プロセスを実行し、ローカルインストールとサードパーティの双方から到着する注文データをシミュレートします。

ちなみに

これらのコマンドは、それぞれ個別のウィンドウで実行する必要があります。演習が完了したら、Ctrl + C を押してそれらを終了します。

<path-to-confluent>/bin/ksql-datagen \
     quickstart=orders \
     format=json \
     topic=orders_local \
     msgRate=2

<path-to-confluent>/bin/ksql-datagen \
     quickstart=orders \
     format=json \
     topic=orders_3rdparty \
     msgRate=2

ksqlDB CLI で、それぞれのソーストピックを登録します。

CREATE STREAM ORDERS_SRC_LOCAL
 (
   ROWKEY INT KEY,
   ORDERTIME BIGINT,
   ORDERID INT,
   ITEMID STRING,
   ORDERUNITS DOUBLE,
   ADDRESS STRUCT<CITY STRING, STATE STRING, ZIPCODE BIGINT>
 )
  WITH (KAFKA_TOPIC='orders_local', VALUE_FORMAT='JSON');

CREATE STREAM ORDERS_SRC_3RDPARTY
 (
   ROWKEY INT KEY,
   ORDERTIME BIGINT,
   ORDERID INT,
   ITEMID STRING,
   ORDERUNITS DOUBLE,
   ADDRESS STRUCT<CITY STRING, STATE STRING, ZIPCODE BIGINT>
 )
  WITH (KAFKA_TOPIC='orders_3rdparty', VALUE_FORMAT='JSON');

CREATE STREAM ステートメントの後に、メッセージが表示されます。

 Message
----------------
 Stream created
----------------

標準の CREATE STREAM AS 構文を使用して出力ストリームを作成します。複数の送信元のデータが共通のターゲットに結合されるため、データの流れの情報を追加すると有用です。SELECT の一部に含めるだけでこれを実行できます。

CREATE STREAM ALL_ORDERS AS SELECT 'LOCAL' AS SRC, * FROM ORDERS_SRC_LOCAL EMIT CHANGES;

出力は以下のようになります。

 Message
-------------------------------------------------------------------------------------------
 Stream ALL_ORDERS created and running. Created by query with query ID: CSAS_ALL_ORDERS_17
-------------------------------------------------------------------------------------------

DESCRIBE コマンドを使用してターゲットストリームのスキーマを調べます。

DESCRIBE ALL_ORDERS;

出力は以下のようになります。

Name                 : ALL_ORDERS
 Field      | Type
----------------------------------------------------------------------------------
 ROWTIME    | BIGINT           (system)
 ROWKEY     | INTEGER          (system)
 SRC        | VARCHAR(STRING)
 ORDERTIME  | BIGINT
 ORDERID    | INTEGER
 ITEMID     | VARCHAR(STRING)
 ORDERUNITS | DOUBLE
 ADDRESS    | STRUCT<CITY VARCHAR(STRING), STATE VARCHAR(STRING), ZIPCODE BIGINT>
----------------------------------------------------------------------------------
For runtime statistics and query details run: DESCRIBE EXTENDED <Stream,Table>;

サードパーティの注文のストリームを既存の出力ストリームに追加します。

INSERT INTO ALL_ORDERS SELECT '3RD PARTY' AS SRC, * FROM ORDERS_SRC_3RDPARTY EMIT CHANGES;

出力は以下のようになります。

 Message
------------------------------------------------------------
 Insert Into query is running with query ID: INSERTQUERY_43
------------------------------------------------------------

出力ストリームのクエリを実行して、各送信元からのデータが書き込まれていることを検証します。

SELECT * FROM ALL_ORDERS EMIT CHANGES;

この出力は以下のようになります。両方のソーストピックからのメッセージが含まれていることに注目します(それぞれ LOCAL3RD PARTY により示されています)。

+--------------+----------+-----------+--------------+----------+-------------+----------------------+---------------------------------------------+
|ROWTIME       |ROWKEY    |SRC        |ORDERTIME     |ORDERID   |ITEMID       |ORDERUNITS            |ADDRESS                                      |
+--------------+----------+-----------+--------------+----------+-------------+----------------------+---------------------------------------------+
|1581085344272 |510       |3RD PARTY  |1503198352036 |510       |Item_643     |1.653210222047296     |{CITY=City_94, STATE=State_72, ZIPCODE=61274}|
|1581085344293 |546       |LOCAL      |1498476865306 |546       |Item_234     |9.284691223615178     |{CITY=City_44, STATE=State_29, ZIPCODE=84678}|
|1581085344776 |511       |3RD PARTY  |1489945722538 |511       |Item_264     |8.213163488516212     |{CITY=City_36, STATE=State_13, ZIPCODE=44821}|
…

Ctrl + C を押して SELECT クエリをキャンセルし ksqlDB のプロンプトに戻ります。

SHOW QUERIES を使用して、実行中の 2 つのクエリを表示できます。

SHOW QUERIES;

出力は以下のようになります。

 Query ID                         | Status  | Sink Name                | Sink Kafka Topic         | Query String
-----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------
 INSERTQUERY_43                   | RUNNING | ALL_ORDERS               | ALL_ORDERS               | INSERT INTO ALL_ORDERS SELECT '3RD PARTY' AS SRC, * FROM ORDERS_SRC_3RDPARTY EMIT CHANGES;
 CSAS_ALL_ORDERS_17               | RUNNING | ALL_ORDERS               | ALL_ORDERS               | CREATE STREAM ALL_ORDERS WITH (KAFKA_TOPIC='ALL_ORDERS', PARTITIONS=1, REPLICAS=1) AS SELECT  'LOCAL' SRC,  *FROM ORDERS_SRC_LOCAL ORDERS_SRC_LOCALEMIT CHANGES;
…

終了

ksqlDB

注釈

永続的なクエリは、手動で終了するまで ksqlDB アプリケーションとして継続的に実行されます。ksqlDB CLI を終了しても永続的なクエリは終了されません。

  1. SHOW QUERIES; の出力から、終了するクエリ ID を特定します。たとえばクエリ ID CTAS_PAGEVIEWS_REGIONS_15 を終了するには、以下を実行します。

    TERMINATE CTAS_PAGEVIEWS_REGIONS_15;
    

    ちなみに

    実行されているクエリの実際の名前は異なる場合があります。 SHOW QUERIES; の出力を参照してください。

  2. exit コマンドを実行して ksqlDB CLI を終了します。

    ksql> exit
    Exiting ksqlDB.
    

Confluent CLI

CLI を使用して Confluent Platform を実行している場合は、このコマンドで停止させることができます。

<path-to-confluent>/bin/confluent local stop