ksqlDB を使用した Apache Kafka® に対するストリーミングクエリの作成(ローカル)¶
このチュートリアルでは、ksqlDB を使用して Kafka のメッセージに対するストリーミングクエリを作成する、単純なワークフローについて紹介します。
開始するには、ZooKeeper および Kafka ブローカーを含む Kafka クラスターを起動する必要があります。その後 ksqlDB が、この Kafka クラスターからのメッセージに対してクエリを行います。ksqlDB は、デフォルトで Confluent Platform にインストールされています。
前提条件 :
- Confluent Platform がインストールおよび実行されている。このインストールには、Kafka ブローカー、ksqlDB、Control Center、ZooKeeper、Schema Registry、REST Proxy および Connect が含まれています。
- TAR または ZIP を使用して Confluent Platform をインストールした場合は、インストールディレクトリまで移動します。このチュートリアル全体で使用するパスおよびコマンドは、カレントディレクトリがこのインストールディレクトリであると想定しています。
- Confluent Platform のローカルインストールを開始する際には、Confluent CLI の インストール を検討します。
- Java: バージョン 1.8 以降。Oracle Java JRE または JDK 1.8 以降をローカルマシンにインストールする必要があります。
トピックの作成とデータの生成¶
Kafka のトピック pageviews
および users
を作成し、データを生成します。これらの手順では、Confluent Platform の ksqlDB datagen ツールを使用します。
新しいターミナルウィンドウを開いて以下のコマンドを実行し、データジェネレーターを使用して
pageviews
トピックを作成しデータを生成します。以下の例は、DELIMITED フォーマットのデータを継続的に生成します。$CONFLUENT_HOME/bin/ksql-datagen quickstart=pageviews format=delimited topic=pageviews msgRate=5
別のターミナルウィンドウを開き、以下のコマンドを実行して、データジェネレーターを使用して
users
トピックの Kafka データを生成します。以下の例は、DELIMITED フォーマットのデータを継続的に生成します。$CONFLUENT_HOME/bin/ksql-datagen quickstart=users format=delimited topic=users msgRate=1
ちなみに
Confluent Platform にある kafka-console-producer
CLI を使用して Kafka データを生成することもできます。
ksqlDB CLI の起動¶
新しいターミナルウィンドウを開いて以下のコマンドを実行し、LOG_DIR
環境変数を設定して ksqlDB CLI を起動します。
LOG_DIR=./ksql_logs $CONFLUENT_HOME/bin/ksql
このコマンドは、CLI のログを ./ksql_logs
ディレクトリへルーティングします。ディレクトリのパスはカレントディレクトリからの相対パスです。デフォルトでは http://localhost:8088
で実行されている ksqlDB サーバーを CLI が検索します。
重要
デフォルトで、ksqlDB は ksql
実行可能ファイルの場所に対応する logs
と呼ばれるディレクトリにそのログを保管しようとします。たとえば、 ksql
が /usr/local/bin/ksql
にインストールされると、/usr/local/logs
にそのログを保管しようとします。ksql
を Confluent Platform のデフォルトのロケーションである $CONFLUENT_HOME/bin
から実行している場合は、LOG_DIR
変数を使用して、このデフォルトの動作をオーバーライドする必要があります。
ksqlDB 起動後のターミナルはこのようになります。
===========================================
= _ _ ____ ____ =
= | | _____ __ _| | _ \| __ ) =
= | |/ / __|/ _` | | | | | _ \ =
= | <\__ \ (_| | | |_| | |_) | =
= |_|\_\___/\__, |_|____/|____/ =
= |_| =
= Event Streaming Database purpose-built =
= for stream processing apps =
===========================================
Copyright 2017-2020 Confluent Inc.
CLI v6.0.6, Server v6.0.6 located at http://localhost:8088
Having trouble? Type 'help' (case-insensitive) for a rundown of how things work!
ksql>
SHOW および PRINT ステートメントを使用した Kafka トピックの調査¶
ksqlDB により、 Kafka のトピックとメッセージをリアルタイムで調査できます。
- SHOW TOPICS ステートメントを使用して、Kafka クラスター上で使用可能なトピックをリストします。
- PRINT ステートメントを使用して、トピックに到着したメッセージをそのつど表示します。
ksqlDB CLI で、以下のステートメントを実行します。
SHOW TOPICS;
出力は以下のようになります。
Kafka Topic | Partitions | Partition Replicas
--------------------------------------------------------------
default_ksql_processing_log | 1 | 1
pageviews | 1 | 1
users | 1 | 1
--------------------------------------------------------------
デフォルトでは、ksqlDB の内部トピックおよびシステムトピックは非表示になっています。SHOW ALL TOPICS ステートメントを使用して、Kafka クラスター上で使用可能な全トピックのリストを表示します。
SHOW ALL TOPICS;
出力は以下のようになります。
Kafka Topic | Partitions | Partition Replicas
--------------------------------------------------------------------------
__confluent.support.metrics | 1 | 1
_confluent-ksql-default__command_topic | 1 | 1
_confluent-license | 1 | 1
_confluent-metrics | 12 | 1
default_ksql_processing_log | 1 | 1
pageviews | 1 | 1
users | 1 | 1
--------------------------------------------------------------------------
PRINT ステートメントを使用した users
トピックの調査
PRINT users;
注釈
PRINT ステートメントは、ksqlDB のコマンドの中でも大文字と小文字が区別される数少ないコマンドの 1 つであり、トピック名が引用符で囲まれていない場合でも区別されます。
出力は以下のようになります。
Key format: KAFKA_STRING
Value format: AVRO
rowtime: 10/30/18 10:15:51 PM GMT, key: User_1, value: {"registertime":1516754966866,"userid":"User_1","regionid":"Region_9","gender":"MALE"}
rowtime: 10/30/18 10:15:51 PM GMT, key: User_3, value: {"registertime":1491558386780,"userid":"User_3","regionid":"Region_2","gender":"MALE"}
rowtime: 10/30/18 10:15:53 PM GMT, key: User_7, value: {"registertime":1514374073235,"userid":"User_7","regionid":"Region_2","gender":"OTHER"}
^Crowtime: 10/30/18 10:15:59 PM GMT, key: User_4, value: {"registertime":1510034151376,"userid":"User_4","regionid":"Region_8","gender":"FEMALE"}
Topic printing ceased
CTRL + C を押して、メッセージのプリントを停止します。
PRINT ステートメントを使用した pageviews
トピックの調査
PRINT pageviews;
出力は以下のようになります。
Key format: KAFKA_INTEGER
Format: KAFKA_STRING
rowtime: 10/23/18 12:24:03 AM PSD, key: 1540254243183, value: 1540254243183,User_9,Page_20
rowtime: 10/23/18 12:24:03 AM PSD, key: 1540254243617, value: 1540254243617,User_7,Page_47
rowtime: 10/23/18 12:24:03 AM PSD, key: 1540254243888, value: 1540254243888,User_4,Page_27
^Crowtime: 10/23/18 12:24:05 AM PSD, key: 1540254245161, value: 1540254245161,User_9,Page_62
Topic printing ceased
CTRL + C を押して、メッセージのプリントを停止します。
詳細については、「ksqlDB 構文リファレンス」を参照してください。
ストリームおよびテーブルの作成¶
この例では、pageviews
および users
と呼ばれる Kafka トピックのメッセージをクエリで取得するために、以下のスキーマを使用します。

Kafka のトピック
pageviews
からpageviews_original
という名前のストリームを作成し、value_format
の値にはDELIMITED
を指定します。CREATE STREAM pageviews_original (rowkey bigint key, userid varchar, pageid varchar) WITH (kafka_topic='pageviews', value_format='DELIMITED');
出力は以下のようになります。
Message --------------- Stream created ---------------
ちなみに
DESCRIBE pageviews_original;
を実行すると、ストリームのスキーマを見ることができます。ksqlDB が作成したROWTIME
という列が追加されていることに注意してください。これは Kafka メッセージのタイムスタンプに対応しています。Kafka のトピック
users
からusers_original
という名前のテーブルを作成し、value_format
の値にはAVRO
を指定します。CREATE TABLE users_original WITH (kafka_topic='users', value_format='AVRO', key = 'userid');
出力は以下のようになります。
Message --------------- Table created ---------------
ちなみに
DESCRIBE users_original;
を実行すると、テーブルのスキーマを見ることができます。注釈
CREATE TABLE ステートメントは、CREATE STREAM ステートメントのように列のセットを定義していないことがわかります。これは、値のフォーマットが Avro であり、DataGen ツールが Schema Registry へ Avro スキーマをパブリッシュしているためです。ksqlDB はそのスキーマを Schema Registry から取得し、テーブルの SQL スキーマの構築に使用します。必要であれば、スキーマを指定することもできます。Github issue #4462 が完了するまで、スキーマの推測はこの例のようにデータのキーが STRING である場合にのみ利用可能です。
注釈
生成されたデータでは、Kafka レコードのキーの値が
userId
フィールドの値になります。上記では、WITH 句にkey='userId'
と指定することにより、このことを ksqlDB に知らせています。ksqlDB がこの情報を使用することにより、テーブルに対する結合が可能になり、ROWKEY
よりもわかりやすいuserId
列名を使用することができます。どちらに結合しても同じ結果が得られます。データの値の中にキーのコピーがなければ、ROWKEY
で結合することができます。(省略可): すべてのストリームおよびテーブルを表示します。
ksql> SHOW STREAMS; Stream Name | Kafka Topic | Format --------------------------------------------------------------- KSQL_PROCESSING_LOG | default_ksql_processing_log | JSON PAGEVIEWS_ORIGINAL | pageviews | DELIMITED --------------------------------------------------------------- ksql> SHOW TABLES; Table Name | Kafka Topic | Format | Windowed -------------------------------------------------- USERS_ORIGINAL | users | AVRO | false --------------------------------------------------
ちなみに
SHOW STREAMS の出力に
KSQL_PROCESSING_LOG
ストリームがリストされていることに注意してください。ksqlDB では、データの処理中になんらかの問題が発生すると、それについて説明するメッセージが追加されます。期待どおりに機能しないものがある場合は、このストリームの内容を見て ksqlDB にデータエラーが発生してないかを確認します。
データの表示¶
SELECT を使用して、TABLE からデータを返すクエリを作成します。このクエリには、クエリの結果で返される行数を制限する LIMIT キーワードと、結果をストリームにより返すよう指示する EMIT CHANGES キーワードが含まれます。これがいわゆる プルクエリ です。各種クエリの解説については、「クエリ」を参照してください。なお、正確なデータ出力はデータ生成のランダム性により変動することがあります。
SELECT * from users_original emit changes limit 5;
出力は以下のようになります。
+--------------------+--------------+--------------+---------+----------+-------------+ |ROWTIME |ROWKEY |REGISTERTIME |GENDER |REGIONID |USERID | +--------------------+--------------+--------------+---------+----------+-------------+ |1581077558655 |User_9 |1513529638461 |OTHER |Region_1 |User_9 | |1581077561454 |User_7 |1489408314958 |OTHER |Region_2 |User_7 | |1581077561654 |User_3 |1511291005264 |MALE |Region_2 |User_3 | |1581077561857 |User_4 |1496797956753 |OTHER |Region_1 |User_4 | |1581077562858 |User_8 |1489169082491 |FEMALE |Region_8 |User_8 | Limit Reached Query terminated
注釈
テーブルに対するプッシュクエリは、Kafka の changelog トピックに保管されているテーブルの全履歴を出力します。つまり、過去のデータを出力した後に、テーブルに対する更新のストリームを出力します。したがって、テーブル内の既存の行が更新されると、その行と一致する
ROWKEY
を持つ行が出力される可能性があります。以下のプッシュクエリを発行して、pageviews_original ストリームのデータを表示します。
SELECT viewtime, userid, pageid FROM pageviews_original emit changes LIMIT 3;
出力は以下のようになります。
+--------------+--------------+--------------+ |VIEWTIME |USERID |PAGEID | +--------------+--------------+--------------+ |1581078296791 |User_1 |Page_54 | |1581078297792 |User_8 |Page_93 | |1581078298792 |User_6 |Page_26 | Limit Reached Query terminated
注釈
デフォルトでは、ストリームに対するプッシュクエリはクエリ開始後に発生した変更のみを出力します。つまり、過去のデータは含まれません。過去のデータを表示する場合は、
set 'auto.offset.reset'='earliest';
を実行してセッションのプロパティを更新します。
クエリの書き込み¶
この例では、ksqlDB を使用してクエリを書き込みます。
注記 : デフォルトでは、ksqlDB はストリームおよびテーブルのトピックを最新のオフセットから読み取ります。
users
テーブルからユーザーのgender
とregionid
を読み取ってpageviews
データを拡張するクエリを作成します。以下のクエリは、users_original
テーブルのuserid
列に対して LEFT JOIN を行うことにより、pageviews_original
ストリームを拡張します。SELECT users_original.userid AS userid, pageid, regionid, gender FROM pageviews_original LEFT JOIN users_original ON pageviews_original.userid = users_original.userid EMIT CHANGES LIMIT 5;
出力は以下のようになります。
+-------------------+-------------------+-------------------+-------------------+ |USERID |PAGEID |REGIONID |GENDER | +-------------------+-------------------+-------------------+-------------------+ |User_7 |Page_23 |Region_2 |OTHER | |User_3 |Page_42 |Region_2 |MALE | |User_7 |Page_87 |Region_2 |OTHER | |User_2 |Page_57 |Region_5 |FEMALE | |User_9 |Page_59 |Region_1 |OTHER | Limit Reached Query terminated
注釈
users
テーブルに対する結合はuserid
列に対して行われます。これは、CREATE TABLE ステートメントでテーブルのプライマリキーROWKEY
のエイリアスとして識別されています。userId
とROWKEY
は、テーブルに対する JOIN 基準として相互に入れ替えて使用できます。ただしストリーム側のuserid
のデータはストリームのキーと一致しないため、ksqlDB は結合前にuserId
列でストリームを内部的に再分割します。CREATE STREAM
キーワードを使用して、SELECT
ステートメントに先行する永続的なクエリを作成します。クエリの結果はPAGEVIEWS_ENRICHED
Kafka トピックに書き込まれます。以下のクエリは、users_original
テーブルのユーザー ID に対してLEFT JOIN
を行うことにより、pageviews_original
ストリームを拡張します。CREATE STREAM pageviews_enriched AS SELECT users_original.userid AS userid, pageid, regionid, gender FROM pageviews_original LEFT JOIN users_original ON pageviews_original.userid = users_original.userid EMIT CHANGES;
出力は以下のようになります。
Message ---------------------------------------------------------------------------------------------------------- Stream PAGEVIEWS_ENRICHED created and running. Created by query with query ID: CSAS_PAGEVIEWS_ENRICHED_0 ----------------------------------------------------------------------------------------------------------
ちなみに
DESCRIBE pageviews_enriched;
を実行すると、ストリームを記述できます。SELECT
を使用して、クエリ結果の到着ごとに表示します。クエリ結果の表示を停止するには Ctrl + C を押します。これによりコンソールへのプリントは停止しますが、クエリそのものは停止しません。基盤となる ksqlDB アプリケーションではクエリが実行され続けます。SELECT * FROM pageviews_enriched emit changes;
出力は以下のようになります。
+-------------+------------+------------+------------+------------+------------+ |ROWTIME |ROWKEY |USERID |PAGEID |REGIONID |GENDER | +-------------+------------+------------+------------+------------+------------+ |1581079706741|User_5 |User_5 |Page_53 |Region_3 |FEMALE | |1581079707742|User_2 |User_2 |Page_86 |Region_5 |OTHER | |1581079708745|User_9 |User_9 |Page_75 |Region_1 |OTHER | ^CQuery terminated Use Ctrl+C to terminate the query.
WHERE
を使用して、条件によりストリームのコンテンツを制限する新しい永続的なクエリを作成します。クエリの結果は、PAGEVIEWS_FEMALE
と呼ばれる Kafka トピックに書き込まれます。CREATE STREAM pageviews_female AS SELECT * FROM pageviews_enriched WHERE gender = 'FEMALE' EMIT CHANGES;
出力は以下のようになります。
Message ------------------------------------------------------------------------------------------------------- Stream PAGEVIEWS_FEMALE created and running. Created by query with query ID: CSAS_PAGEVIEWS_FEMALE_11 -------------------------------------------------------------------------------------------------------
ちなみに
DESCRIBE pageviews_female;
を実行すると、ストリームを記述できます。LIKE を使用して、別の条件を満たす新しい永続的なクエリを作成します。クエリの結果は
pageviews_enriched_r8_r9
Kafka トピックに書き込まれます。CREATE STREAM pageviews_female_like_89 WITH (kafka_topic='pageviews_enriched_r8_r9') AS SELECT * FROM pageviews_female WHERE regionid LIKE '%_8' OR regionid LIKE '%_9' EMIT CHANGES;
出力は以下のようになります。
Message ----------------------------------------------------------------------------------------------------------------------- Stream PAGEVIEWS_FEMALE_LIKE_89 created and running. Created by query with query ID: CSAS_PAGEVIEWS_FEMALE_LIKE_89_13 -----------------------------------------------------------------------------------------------------------------------
カウントが 1 より大きい場合に 30 秒の タンブリングウィンドウ で地域と性別の組み合わせごとにページビューをカウントする新しい永続的なクエリを作成します。クエリの結果は
PAGEVIEWS_REGIONS
Kafka トピックに Avro フォーマットで書き込まれます。ksqlDB は、PAGEVIEWS_REGIONS
トピックへ最初のメッセージを書き込む際に、構成済みの Schema Registry で Avro スキーマを登録します。CREATE TABLE pageviews_regions WITH (VALUE_FORMAT='avro') AS SELECT gender, regionid , COUNT(*) AS numusers FROM pageviews_enriched WINDOW TUMBLING (size 30 second) GROUP BY gender, regionid EMIT CHANGES;
出力は以下のようになります。
Message -------------------------------------------------------------------------------------------------------- Table PAGEVIEWS_REGIONS created and running. Created by query with query ID: CTAS_PAGEVIEWS_REGIONS_15 --------------------------------------------------------------------------------------------------------
ちなみに
DESCRIBE pageviews_regions;
を実行すると、テーブルを記述できます。(省略可): プッシュクエリを使用して、上記のクエリの結果を表示します。
SELECT * FROM pageviews_regions EMIT CHANGES LIMIT 5;
出力は以下のようになります。
+---------------+-----------------+---------------+---------------+---------------+---------------+---------------+ |ROWTIME |ROWKEY |WINDOWSTART |WINDOWEND |GENDER |REGIONID |NUMUSERS | +---------------+-----------------+---------------+---------------+---------------+---------------+---------------+ |1581080500530 |OTHER|+|Region_9 |1581080490000 |1581080520000 |OTHER |Region_9 |1 | |1581080501530 |OTHER|+|Region_5 |1581080490000 |1581080520000 |OTHER |Region_5 |2 | |1581080510532 |MALE|+|Region_7 |1581080490000 |1581080520000 |MALE |Region_7 |4 | |1581080513532 |FEMALE|+|Region_1|1581080490000 |1581080520000 |FEMALE |Region_1 |2 | |1581080516533 |MALE|+|Region_2 |1581080490000 |1581080520000 |MALE |Region_2 |3 | Limit Reached Query terminated
注釈
WINDOWSTART 列と WINDOWEND 列の追加に注目してください。これらが利用可能な理由は、
pageviews_regions
が 30 秒あたりの ウィンドウ でデータを集計しているためです。ksqlDB は、ウィンドウ化された集計結果に応じてこれらのシステム列を自動的に追加します。(省略可): プルクエリを使用して、前のクエリの結果を表示します。
CREATE TABLE ステートメントに GROUP BY 句が含まれている場合、ksqlDB は集計結果を保管するテーブルを内部的に構築します。ksqlDB はこのような集計結果に対するプルクエリをサポートしています。
前のステップで使用した結果のストリームを プッシュ するプッシュクエリとは異なり、プルクエリは結果セットを プル して自動的に終了します。
プルクエリには EMIT CHANGES 句はありません。
プルクエリを使用して、特定の性別と地域について利用可能なすべてのウィンドウおよびユーザーカウントを表示します。
SELECT * FROM pageviews_regions WHERE ROWKEY='OTHER|+|Region_9';
出力は以下のようになります。
+------------------+------------------+------------------+------------------+------------------+------------------+------------------+ |ROWKEY |WINDOWSTART |WINDOWEND |ROWTIME |GENDER |REGIONID |NUMUSERS | +------------------+------------------+------------------+------------------+------------------+------------------+------------------+ |OTHER|+|Region_9 |1581080490000 |1581080520000 |1581080500530 |OTHER |Region_9 |1 | |OTHER|+|Region_9 |1581080550000 |1581080580000 |1581080576526 |OTHER |Region_9 |4 | |OTHER|+|Region_9 |1581080580000 |1581080610000 |1581080606525 |OTHER |Region_9 |4 | |OTHER|+|Region_9 |1581080610000 |1581080640000 |1581080622524 |OTHER |Region_9 |3 | |OTHER|+|Region_9 |1581080640000 |1581080670000 |1581080667528 |OTHER |Region_9 |6 |
pageviews_regions などのウィンドウ化されたテーブルに対するプルクエリは、単一ウィンドウの結果に対するクエリもサポートしています。
SELECT NUMUSERS FROM pageviews_regions WHERE ROWKEY='OTHER|+|Region_9' AND WINDOWSTART=1581080550000;
注釈
前の SQL の
WINDOWSTART
の値は、データのウィンドウ境界のいずれかと一致するように変更する必要があります。そうしなければ、結果は返されません。出力は以下のようになります。
+----------+ |NUMUSERS | +----------+ |4 | Query terminated
ウィンドウの範囲を照会するクエリを実行します。
SELECT WINDOWSTART, WINDOWEND, NUMUSERS FROM pageviews_regions WHERE ROWKEY='OTHER|+|Region_9' AND 1581080550000 <= WINDOWSTART AND WINDOWSTART <= 1581080610000;
注釈
前の SQL の
WINDOWSTART
の値は、データのウィンドウ境界のいずれかと一致するように変更する必要があります。そうしなければ、結果は返されません。出力は以下のようになります。
+----------------------------+----------------------------+----------------------------+ |WINDOWSTART |WINDOWEND |NUMUSERS | +----------------------------+----------------------------+----------------------------+ |1581080550000 |1581080580000 |4 | |1581080580000 |1581080610000 |4 | |1581080610000 |1581080640000 |3 | Query terminated
(省略可): すべての永続的なクエリを表示します。
SHOW QUERIES;
出力は以下のようになります。
Query ID | Status | Sink Name | Sink Kafka Topic | Query String ----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------- CTAS_PAGEVIEWS_REGIONS_15 | RUNNING | PAGEVIEWS_REGIONS | PAGEVIEWS_REGIONS | CREATE TABLE PAGEVIEWS_REGIONS WITH (KAFKA_TOPIC='PAGEVIEWS_REGIONS', PARTITIONS=1, REPLICAS=1, VALUE_FORMAT='avro') AS SELECT PAGEVIEWS_ENRICHED.GENDER GENDER, PAGEVIEWS_ENRICHED.REGIONID REGIONID, COUNT(*) NUMUSERSFROM PAGEVIEWS_ENRICHED PAGEVIEWS_ENRICHEDWINDOW TUMBLING ( SIZE 30 SECONDS ) GROUP BY PAGEVIEWS_ENRICHED.GENDER, PAGEVIEWS_ENRICHED.REGIONIDEMIT CHANGES; CSAS_PAGEVIEWS_FEMALE_LIKE_89_13 | RUNNING | PAGEVIEWS_FEMALE_LIKE_89 | pageviews_enriched_r8_r9 | CREATE STREAM PAGEVIEWS_FEMALE_LIKE_89 WITH (KAFKA_TOPIC='pageviews_enriched_r8_r9', PARTITIONS=1, REPLICAS=1) AS SELECT *FROM PAGEVIEWS_FEMALE PAGEVIEWS_FEMALEWHERE ((PAGEVIEWS_FEMALE.REGIONID LIKE '%_8') OR (PAGEVIEWS_FEMALE.REGIONID LIKE '%_9'))EMIT CHANGES; CSAS_PAGEVIEWS_ENRICHED_0 | RUNNING | PAGEVIEWS_ENRICHED | PAGEVIEWS_ENRICHED | CREATE STREAM PAGEVIEWS_ENRICHED WITH (KAFKA_TOPIC='PAGEVIEWS_ENRICHED', PARTITIONS=1, REPLICAS=1) AS SELECT USERS_ORIGINAL.USERID USERID, PAGEVIEWS_ORIGINAL.PAGEID PAGEID, USERS_ORIGINAL.REGIONID REGIONID, USERS_ORIGINAL.GENDER GENDERFROM PAGEVIEWS_ORIGINAL PAGEVIEWS_ORIGINALLEFT OUTER JOIN USERS_ORIGINAL USERS_ORIGINAL ON ((PAGEVIEWS_ORIGINAL.USERID = USERS_ORIGINAL.USERID))EMIT CHANGES; CSAS_PAGEVIEWS_FEMALE_11 | RUNNING | PAGEVIEWS_FEMALE | PAGEVIEWS_FEMALE | CREATE STREAM PAGEVIEWS_FEMALE WITH (KAFKA_TOPIC='PAGEVIEWS_FEMALE', PARTITIONS=1, REPLICAS=1) AS SELECT *FROM PAGEVIEWS_ENRICHED PAGEVIEWS_ENRICHEDWHERE (PAGEVIEWS_ENRICHED.GENDER = 'FEMALE')EMIT CHANGES;
クエリの詳細が必要な場合は、
EXPLAIN <Query ID>;
を実行します。(省略可): クエリ実行時間のメトリクスや詳細を調べます。ターゲットである Kafka トピックが利用可能であることや、処理されているメッセージのスループットを示す数値などの情報を確認します。
DESCRIBE EXTENDED PAGEVIEWS_REGIONS;
出力は以下のようになります。
Name : PAGEVIEWS_REGIONS Type : TABLE Key field : Timestamp field : Not set - using <ROWTIME> Key format : KAFKA Value format : AVRO Kafka topic : PAGEVIEWS_REGIONS (partitions: 1, replication: 1) Statement : CREATE TABLE PAGEVIEWS_REGIONS WITH (KAFKA_TOPIC='PAGEVIEWS_REGIONS', PARTITIONS=1, REPLICAS=1, VALUE_FORMAT='json') AS SELECT PAGEVIEWS_ENRICHED.GENDER GENDER, PAGEVIEWS_ENRICHED.REGIONID REGIONID, COUNT(*) NUMUSERS FROM PAGEVIEWS_ENRICHED PAGEVIEWS_ENRICHED WINDOW TUMBLING ( SIZE 30 SECONDS ) GROUP BY PAGEVIEWS_ENRICHED.GENDER, PAGEVIEWS_ENRICHED.REGIONID EMIT CHANGES; Field | Type -------------------------------------------------------------- ROWTIME | BIGINT (system) ROWKEY | VARCHAR(STRING) (system) (Window type: TUMBLING) GENDER | VARCHAR(STRING) REGIONID | VARCHAR(STRING) NUMUSERS | BIGINT -------------------------------------------------------------- Queries that write from this TABLE ----------------------------------- CTAS_PAGEVIEWS_REGIONS_15 (RUNNING) : CREATE TABLE PAGEVIEWS_REGIONS WITH (KAFKA_TOPIC='PAGEVIEWS_REGIONS', PARTITIONS=1, REPLICAS=1, VALUE_FORMAT='json') AS SELECT PAGEVIEWS_ENRICHED.GENDER GENDER, PAGEVIEWS_ENRICHED.REGIONID REGIONID, COUNT(*) NUMUSERSFROM PAGEVIEWS_ENRICHED PAGEVIEWS_ENRICHEDWINDOW TUMBLING ( SIZE 30 SECONDS ) GROUP BY PAGEVIEWS_ENRICHED.GENDER, PAGEVIEWS_ENRICHED.REGIONIDEMIT CHANGES; For query topology and execution plan please run: EXPLAIN <QueryId> Local runtime statistics ------------------------ messages-per-sec: 0.90 total-messages: 498 last-message: 2020-02-07T13:10:32.033Z
ksqlDB でのネスト化されたスキーマ(STRUCT)の使用¶
Struct のサポートにより、JSON と Avro のどちらでも Kafka のトピック内のネスト化データのモデリングとアクセスが可能になります。
ここでは ksql-datagen
ツールを使用して、ネスト化された address
フィールドを含むサンプルデータをいくつか作成します。これを新しいウィンドウで実行し、そのままにしておきます。
<path-to-confluent>/bin/ksql-datagen \
quickstart=orders \
format=avro \
topic=orders
ksqlDB のコマンドプロンプトから、そのトピックを ksqlDB に登録します。
CREATE STREAM ORDERS
(
ROWKEY INT KEY,
ORDERTIME BIGINT,
ORDERID INT,
ITEMID STRING,
ORDERUNITS DOUBLE,
ADDRESS STRUCT<CITY STRING, STATE STRING, ZIPCODE BIGINT>
)
WITH (KAFKA_TOPIC='orders', VALUE_FORMAT='json', key='orderid');
出力は以下のようになります。
Message
----------------
Stream created
----------------
DESCRIBE
関数を使用してスキーマを調べると、STRUCT
が含まれています。
DESCRIBE ORDERS;
出力は以下のようになります。
Name : ORDERS
Field | Type
----------------------------------------------------------------------------------
ROWTIME | BIGINT (system)
ROWKEY | INT (system)
ORDERTIME | BIGINT
ORDERID | INTEGER
ITEMID | VARCHAR(STRING)
ORDERUNITS | DOUBLE
ADDRESS | STRUCT<CITY VARCHAR(STRING), STATE VARCHAR(STRING), ZIPCODE BIGINT>
----------------------------------------------------------------------------------
For runtime statistics and query details run: DESCRIBE EXTENDED <Stream,Table>;
ksql>
データのクエリを実行し、->
表記を使用して Struct の内容にアクセスします。
SELECT ORDERID, ADDRESS->CITY FROM ORDERS EMIT CHANGES LIMIT 5;
出力は以下のようになります。
+-----------------------------------+-----------------------------------+
|ORDERID |ADDRESS__CITY |
+-----------------------------------+-----------------------------------+
|1188 |City_95 |
|1189 |City_24 |
|1190 |City_57 |
|1191 |City_37 |
|1192 |City_82 |
Limit Reached
Query terminated
ストリーム同士の結合¶
ストリーム同士の結合を使用することにより、共通のキーについて 2 つの ストリーム(イベントのストリーム)を結合できます。一例として、注文イベントのストリームと出荷イベントのストリームが挙げられます。これらを注文のキーで結合することにより、注文と併せて出荷の情報を見ることができるようになります。
ksqlDB CLI で 2 つのストリームを新たに作成します。どちらのストリームもそれぞれの注文 ID を ROWKEY に保管します。
<path-to-confluent>/bin/kafka-console-producer \
--broker-list localhost:9092 \
--topic new_orders \
--property "parse.key=true" \
--property "key.separator=:"<<EOF
1:{"order_id":1,"total_amount":10.50,"customer_name":"Bob Smith"}
2:{"order_id":2,"total_amount":3.32,"customer_name":"Sarah Black"}
3:{"order_id":3,"total_amount":21.00,"customer_name":"Emma Turner"}
EOF
<path-to-confluent>/bin/kafka-console-producer \
--broker-list localhost:9092 \
--topic shipments \
--property "parse.key=true" \
--property "key.separator=:"<<EOF
1:{"order_id":1,"shipment_id":42,"warehouse":"Nashville"}
3:{"order_id":3,"shipment_id":43,"warehouse":"Palo Alto"}
EOF
ちなみに
なお、上記のステートメントを実行すると以下の警告メッセージが表示される場合がありますが、このメッセージは無視しても安全です。
Error while fetching metadata with correlation id 1 : {new_orders=LEADER_NOT_AVAILABLE} (org.apache.kafka.clients.NetworkClient)
Error while fetching metadata with correlation id 1 : {shipments=LEADER_NOT_AVAILABLE} (org.apache.kafka.clients.NetworkClient)
CREATE STREAM NEW_ORDERS (ROWKEY INT KEY, TOTAL_AMOUNT DOUBLE, CUSTOMER_NAME VARCHAR)
WITH (KAFKA_TOPIC='new_orders', VALUE_FORMAT='JSON', PARTITIONS=2);
CREATE STREAM SHIPMENTS (ROWKEY INT KEY, SHIPMENT_ID INT, WAREHOUSE VARCHAR)
WITH (KAFKA_TOPIC='shipments', VALUE_FORMAT='JSON', PARTITIONS=2);
注釈
ksqlDB は、これらのステートメントが実行される際に基盤となるトピックを Kafka で作成します。また、REPLICAS
のカウントを指定できます。
2 つの CREATE STREAM
ステートメントの後の出力は以下のようになります。
Message
----------------
Stream created
----------------
INSERT VALUES ステートメントを使用して、ストリームにサンプルデータを入力します。
-- Insert values in NEW_ORDERS:
-- insert supplying the list of columns to insert:
INSERT INTO NEW_ORDERS (ROWKEY, CUSTOMER_NAME, TOTAL_AMOUNT)
VALUES (1, 'Bob Smith', 10.50);
-- shorthand syntax can be used when inserting values for all columns (except ROWTIME), in column order:
INSERT INTO NEW_ORDERS VALUES (2, 3.32, 'Sarah Black');
INSERT INTO NEW_ORDERS VALUES (3, 21.00, 'Emma Turner');
-- Insert values in SHIPMENTS:
INSERT INTO SHIPMENTS VALUES (1, 42, 'Nashville');
INSERT INTO SHIPMENTS VALUES (3, 43, 'Palo Alto');
データのクエリを実行してトピックにそのデータが存在することを確認します。
ちなみに
以下を実行して、各ストリームの beginning
から読み取るよう ksqlDB に指示します。
SET 'auto.offset.reset' = 'earliest';
現在の ksqlDB CLI セッションで既に実行している場合はスキップできます。
NEW_ORDERS
トピックについて以下を実行します。
SELECT * FROM NEW_ORDERS EMIT CHANGES LIMIT 3;
出力は以下のようになります。
+-------------------------+-------------------------+-------------------------+-------------------------+
|ROWTIME |ROWKEY |TOTAL_AMOUNT |CUSTOMER_NAME |
+-------------------------+-------------------------+-------------------------+-------------------------+
|1581083057609 |1 |10.5 |Bob Smith |
|1581083178418 |2 |3.32 |Sarah Black |
|1581083210494 |3 |21.0 |Emma Turner |
Limit Reached
Query terminated
SHIPMENTS
トピックについて以下を実行します。
SELECT * FROM SHIPMENTS EMIT CHANGES LIMIT 2;
出力は以下のようになります。
+-------------------------+-------------------------+-------------------------+-------------------------+
|ROWTIME |ROWKEY |SHIPMENT_ID |WAREHOUSE |
+-------------------------+-------------------------+-------------------------+-------------------------+
|1581083340711 |1 |42 |Nashville |
|1581083384229 |3 |43 |Palo Alto |
Limit Reached
Query terminated
以下のクエリを実行します。これにより、1 時間の結合ウィンドウを基に、注文とそれに伴う出荷が表示されます。
SELECT O.ROWKEY AS ORDER_ID, O.TOTAL_AMOUNT, O.CUSTOMER_NAME, S.SHIPMENT_ID, S.WAREHOUSE
FROM NEW_ORDERS O
INNER JOIN SHIPMENTS S
WITHIN 1 HOURS
ON O.ROWKEY = S.ROWKEY
EMIT CHANGES;
出力は以下のようになります。
+--------------------------+--------------------------+--------------------------+--------------------------+--------------------------+
|ORDER_ID |TOTAL_AMOUNT |CUSTOMER_NAME |SHIPMENT_ID |WAREHOUSE |
+--------------------------+--------------------------+--------------------------+--------------------------+--------------------------+
|1 |10.5 |Bob Smith |42 |Nashville |
|3 |21.0 |Emma Turner |43 |Palo Alto |
なお、ORDER_ID=2
のメッセージには、対応する SHIPMENT_ID
または WAREHOUSE
がありません。これは、指定された時間ウィンドウ内の shipments
ストリームに対応する行がないためです。
以下を実行して、2 つ目のウィンドウで ksqlDB CLI を起動します。
LOG_DIR=./ksql_logs $CONFLUENT_HOME/bin/ksql
以下の INSERT VALUES ステートメントを入力して、注文 ID 2 の出荷を挿入します。
最初の ksqlDB CLI ウィンドウに切り替えます。これで 3 行目が出力されています。
+--------------------------+--------------------------+--------------------------+--------------------------+--------------------------+
|ORDER_ID |TOTAL_AMOUNT |CUSTOMER_NAME |SHIPMENT_ID |WAREHOUSE |
+--------------------------+--------------------------+--------------------------+--------------------------+--------------------------+
|1 |10.5 |Bob Smith |42 |Nashville |
|3 |21.0 |Emma Turner |43 |Palo Alto |
|2 |3.32 |Sarah Black |49 |London |
Ctrl + C を押して SELECT
クエリをキャンセルし、 ksqlDB のプロンプトに戻ります。
テーブル同士の結合¶
テーブル同士の結合を使用することにより、共通のキーについて 2 つの テーブル を結合できます。ksqlDB のテーブルは、指定した キー について最新の 値 を提供します。結合は キー に対してのみ可能であり、1 対多(1:N)の結合は現在のセマンティックモデルではサポートされていません。
この例では、あるシステムから得た倉庫の位置データがあり、別のシステムからの倉庫の規模に関するデータによりこれを拡張します。
ksqlDB CLI で、両方のトピックを ksqlDB のテーブルとして登録します。なおこの例では、倉庫 ID はキーと WAREHOUSE_ID フィールドの値の両方に保管されています。
<path-to-confluent>/bin/kafka-console-producer \
--broker-list localhost:9092 \
--topic warehouse_location \
--property "parse.key=true" \
--property "key.separator=:"<<EOF
出力は以下のようになります。
1:{"warehouse_id":1,"city":"Leeds","country":"UK"}
2:{"warehouse_id":2,"city":"Sheffield","country":"UK"}
3:{"warehouse_id":3,"city":"Berlin","country":"Germany"}
EOF
<path-to-confluent>/bin/kafka-console-producer \
--broker-list localhost:9092 \
--topic warehouse_size \
--property "parse.key=true" \
--property "key.separator=:"<<EOF
出力は以下のようになります。
1:{"warehouse_id":1,"square_footage":16000}
2:{"warehouse_id":2,"square_footage":42000}
3:{"warehouse_id":3,"square_footage":94000}
EOF
CREATE TABLE WAREHOUSE_LOCATION
(ROWKEY INT KEY, WAREHOUSE_ID INT, CITY VARCHAR, COUNTRY VARCHAR)
WITH (KAFKA_TOPIC='warehouse_location',
VALUE_FORMAT='JSON',
KEY='WAREHOUSE_ID',
PARTITIONS=2);
CREATE TABLE WAREHOUSE_SIZE
(ROWKEY INT KEY, WAREHOUSE_ID INT, SQUARE_FOOTAGE DOUBLE)
WITH (KAFKA_TOPIC='warehouse_size',
VALUE_FORMAT='JSON',
KEY='WAREHOUSE_ID',
PARTITIONS=2);
2 つの CREATE TABLE
ステートメントの後の出力は以下のようになります。
Message
---------------
Table created
---------------
-- note: ksqlDB will automatically populate ROWKEY with the same value as WAREHOUSE_ID:
INSERT INTO WAREHOUSE_LOCATION (WAREHOUSE_ID, CITY, COUNTRY) VALUES (1, 'Leeds', 'UK');
INSERT INTO WAREHOUSE_LOCATION (WAREHOUSE_ID, CITY, COUNTRY) VALUES (2, 'Sheffield', 'UK');
INSERT INTO WAREHOUSE_LOCATION (WAREHOUSE_ID, CITY, COUNTRY) VALUES (3, 'Berlin', 'Germany');
INSERT INTO WAREHOUSE_SIZE (WAREHOUSE_ID, SQUARE_FOOTAGE) VALUES (1, 16000);
INSERT INTO WAREHOUSE_SIZE (WAREHOUSE_ID, SQUARE_FOOTAGE) VALUES (2, 42000);
INSERT INTO WAREHOUSE_SIZE (WAREHOUSE_ID, SQUARE_FOOTAGE) VALUES (3, 94000);
WAREHOUSE_LOCATION テーブルを調べます。
SELECT ROWKEY, WAREHOUSE_ID FROM WAREHOUSE_LOCATION EMIT CHANGES LIMIT 3;
出力は以下のようになります。
+---------------------------------------+---------------------------------------+
|ROWKEY |WAREHOUSE_ID |
+---------------------------------------+---------------------------------------+
|2 |2 |
|1 |1 |
|3 |3 |
Limit Reached
Query terminated
WAREHOUSE_SIZE テーブルを調べます。
SELECT ROWKEY, WAREHOUSE_ID FROM WAREHOUSE_SIZE EMIT CHANGES LIMIT 3;
出力は以下のようになります。
+---------------------------------------+---------------------------------------+
|ROWKEY |WAREHOUSE_ID |
+---------------------------------------+---------------------------------------+
|2 |2 |
|1 |1 |
|3 |3 |
Limit Reached
Query terminated
ここで 2 つのテーブルを結合します。
SELECT WL.WAREHOUSE_ID, WL.CITY, WL.COUNTRY, WS.SQUARE_FOOTAGE
FROM WAREHOUSE_LOCATION WL
LEFT JOIN WAREHOUSE_SIZE WS
ON WL.WAREHOUSE_ID=WS.WAREHOUSE_ID
EMIT CHANGES
LIMIT 3;
出力は以下のようになります。
+------------------+------------------+------------------+------------------+
|WL_WAREHOUSE_ID |CITY |COUNTRY |SQUARE_FOOTAGE |
+------------------+------------------+------------------+------------------+
|1 |Leeds |UK |16000.0 |
|1 |Leeds |UK |16000.0 |
|2 |Sheffield |UK |42000.0 |
Limit Reached
Query terminated
INSERT INTO¶
INSERT INTO
構文は、複数ストリームのコンテンツのマージに使用することができます。例として、同じイベントタイプを複数の異なる送信元から取得する場合などが挙げられます。
それぞれが異なるトピックへ書き込む 2 つの datagen プロセスを実行し、ローカルインストールとサードパーティの双方から到着する注文データをシミュレートします。
ちなみに
これらのコマンドは、それぞれ個別のウィンドウで実行する必要があります。演習が完了したら、Ctrl + C を押してそれらを終了します。
<path-to-confluent>/bin/ksql-datagen \
quickstart=orders \
format=json \
topic=orders_local \
msgRate=2
<path-to-confluent>/bin/ksql-datagen \
quickstart=orders \
format=json \
topic=orders_3rdparty \
msgRate=2
ksqlDB CLI で、それぞれのソーストピックを登録します。
CREATE STREAM ORDERS_SRC_LOCAL
(
ROWKEY INT KEY,
ORDERTIME BIGINT,
ORDERID INT,
ITEMID STRING,
ORDERUNITS DOUBLE,
ADDRESS STRUCT<CITY STRING, STATE STRING, ZIPCODE BIGINT>
)
WITH (KAFKA_TOPIC='orders_local', VALUE_FORMAT='JSON');
CREATE STREAM ORDERS_SRC_3RDPARTY
(
ROWKEY INT KEY,
ORDERTIME BIGINT,
ORDERID INT,
ITEMID STRING,
ORDERUNITS DOUBLE,
ADDRESS STRUCT<CITY STRING, STATE STRING, ZIPCODE BIGINT>
)
WITH (KAFKA_TOPIC='orders_3rdparty', VALUE_FORMAT='JSON');
各 CREATE STREAM
ステートメントの後に、メッセージが表示されます。
Message
----------------
Stream created
----------------
標準の CREATE STREAM … AS
構文を使用して出力ストリームを作成します。複数の送信元のデータが共通のターゲットに結合されるため、データの流れの情報を追加すると有用です。SELECT
の一部に含めるだけでこれを実行できます。
CREATE STREAM ALL_ORDERS AS SELECT 'LOCAL' AS SRC, * FROM ORDERS_SRC_LOCAL EMIT CHANGES;
出力は以下のようになります。
Message
-------------------------------------------------------------------------------------------
Stream ALL_ORDERS created and running. Created by query with query ID: CSAS_ALL_ORDERS_17
-------------------------------------------------------------------------------------------
DESCRIBE
コマンドを使用してターゲットストリームのスキーマを調べます。
DESCRIBE ALL_ORDERS;
出力は以下のようになります。
Name : ALL_ORDERS
Field | Type
----------------------------------------------------------------------------------
ROWTIME | BIGINT (system)
ROWKEY | INTEGER (system)
SRC | VARCHAR(STRING)
ORDERTIME | BIGINT
ORDERID | INTEGER
ITEMID | VARCHAR(STRING)
ORDERUNITS | DOUBLE
ADDRESS | STRUCT<CITY VARCHAR(STRING), STATE VARCHAR(STRING), ZIPCODE BIGINT>
----------------------------------------------------------------------------------
For runtime statistics and query details run: DESCRIBE EXTENDED <Stream,Table>;
サードパーティの注文のストリームを既存の出力ストリームに追加します。
INSERT INTO ALL_ORDERS SELECT '3RD PARTY' AS SRC, * FROM ORDERS_SRC_3RDPARTY EMIT CHANGES;
出力は以下のようになります。
Message
------------------------------------------------------------
Insert Into query is running with query ID: INSERTQUERY_43
------------------------------------------------------------
出力ストリームのクエリを実行して、各送信元からのデータが書き込まれていることを検証します。
SELECT * FROM ALL_ORDERS EMIT CHANGES;
この出力は以下のようになります。両方のソーストピックからのメッセージが含まれていることに注目します(それぞれ LOCAL
と 3RD PARTY
により示されています)。
+--------------+----------+-----------+--------------+----------+-------------+----------------------+---------------------------------------------+
|ROWTIME |ROWKEY |SRC |ORDERTIME |ORDERID |ITEMID |ORDERUNITS |ADDRESS |
+--------------+----------+-----------+--------------+----------+-------------+----------------------+---------------------------------------------+
|1581085344272 |510 |3RD PARTY |1503198352036 |510 |Item_643 |1.653210222047296 |{CITY=City_94, STATE=State_72, ZIPCODE=61274}|
|1581085344293 |546 |LOCAL |1498476865306 |546 |Item_234 |9.284691223615178 |{CITY=City_44, STATE=State_29, ZIPCODE=84678}|
|1581085344776 |511 |3RD PARTY |1489945722538 |511 |Item_264 |8.213163488516212 |{CITY=City_36, STATE=State_13, ZIPCODE=44821}|
…
Ctrl + C を押して SELECT
クエリをキャンセルし ksqlDB のプロンプトに戻ります。
SHOW QUERIES
を使用して、実行中の 2 つのクエリを表示できます。
SHOW QUERIES;
出力は以下のようになります。
Query ID | Status | Sink Name | Sink Kafka Topic | Query String
-----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------
INSERTQUERY_43 | RUNNING | ALL_ORDERS | ALL_ORDERS | INSERT INTO ALL_ORDERS SELECT '3RD PARTY' AS SRC, * FROM ORDERS_SRC_3RDPARTY EMIT CHANGES;
CSAS_ALL_ORDERS_17 | RUNNING | ALL_ORDERS | ALL_ORDERS | CREATE STREAM ALL_ORDERS WITH (KAFKA_TOPIC='ALL_ORDERS', PARTITIONS=1, REPLICAS=1) AS SELECT 'LOCAL' SRC, *FROM ORDERS_SRC_LOCAL ORDERS_SRC_LOCALEMIT CHANGES;
…
終了¶
ksqlDB¶
注釈
永続的なクエリは、手動で終了するまで ksqlDB アプリケーションとして継続的に実行されます。ksqlDB CLI を終了しても永続的なクエリは終了されません。
SHOW QUERIES;
の出力から、終了するクエリ ID を特定します。たとえばクエリ IDCTAS_PAGEVIEWS_REGIONS_15
を終了するには、以下を実行します。TERMINATE CTAS_PAGEVIEWS_REGIONS_15;
ちなみに
実行されているクエリの実際の名前は異なる場合があります。
SHOW QUERIES;
の出力を参照してください。exit
コマンドを実行して ksqlDB CLI を終了します。ksql> exit Exiting ksqlDB.
Confluent CLI¶
CLI を使用して Confluent Platform を実行している場合は、このコマンドで停止させることができます。
<path-to-confluent>/bin/confluent local stop