ksqlDB を使用した Apache Kafka® に対するストリーミングクエリの作成(ローカル)¶
このチュートリアルでは、ksqlDB を使用して Kafka のメッセージに対するストリーミングクエリを作成する、単純なワークフローについて紹介します。
開始するには、ZooKeeper および Kafka ブローカーを含む Kafka クラスターを起動する必要があります。その後 ksqlDB が、この Kafka クラスターからのメッセージに対してクエリを行います。ksqlDB は、デフォルトで Confluent Platform にインストールされています。
前提条件 :
- Confluent Platform がインストールおよび実行されている。このインストールには、Kafka ブローカー、ksqlDB、Control Center、ZooKeeper、Schema Registry、REST Proxy および Connect が含まれています。
- TAR または ZIP を使用して Confluent Platform をインストールした場合は、インストールディレクトリまで移動します。このチュートリアル全体で使用するパスおよびコマンドは、カレントディレクトリがこのインストールディレクトリであると想定しています。
- Confluent Platform のローカルインストールを開始する際には、Confluent CLI の インストール を検討します。
- Java: バージョン 1.8 以降。Oracle Java JRE または JDK 1.8 以降をローカルマシンにインストールする必要があります。
トピックの作成とデータの生成¶
Kafka のトピック pageviews
および users
を作成し、データを生成します。これらの手順では、Confluent Platform の ksqlDB datagen ツールを使用します。
新しいターミナルウィンドウを開いて以下のコマンドを実行し、データジェネレーターを使用して
pageviews
トピックを作成しデータを生成します。以下の例は、DELIMITED フォーマットのデータを継続的に生成します。$CONFLUENT_HOME/bin/ksql-datagen quickstart=pageviews format=avro topic=pageviews msgRate=5
別のターミナルウィンドウを開き、以下のコマンドを実行して、データジェネレーターを使用して
users
トピックの Kafka データを生成します。以下の例は、DELIMITED フォーマットのデータを継続的に生成します。$CONFLUENT_HOME/bin/ksql-datagen quickstart=users format=avro topic=users msgRate=1
ちなみに
Confluent Platform にある kafka-console-producer
CLI を使用して Kafka データを生成することもできます。
ksqlDB CLI の起動¶
新しいターミナルウィンドウを開いて以下のコマンドを実行し、LOG_DIR
環境変数を設定して ksqlDB CLI を起動します。
LOG_DIR=./ksql_logs $CONFLUENT_HOME/bin/ksql
このコマンドは、CLI のログを ./ksql_logs
ディレクトリへルーティングします。ディレクトリのパスはカレントディレクトリからの相対パスです。デフォルトでは http://localhost:8088
で実行されている ksqlDB サーバーを CLI が検索します。
重要
デフォルトで、ksqlDB は ksql
実行可能ファイルの場所に対応する logs
と呼ばれるディレクトリにそのログを保管しようとします。たとえば、 ksql
が /usr/local/bin/ksql
にインストールされると、/usr/local/logs
にそのログを保管しようとします。ksql
を Confluent Platform のデフォルトのロケーションである $CONFLUENT_HOME/bin
から実行している場合は、LOG_DIR
変数を使用して、このデフォルトの動作をオーバーライドする必要があります。
ksqlDB 起動後のターミナルはこのようになります。
===========================================
= _ _ ____ ____ =
= | | _____ __ _| | _ \| __ ) =
= | |/ / __|/ _` | | | | | _ \ =
= | <\__ \ (_| | | |_| | |_) | =
= |_|\_\___/\__, |_|____/|____/ =
= |_| =
= The Database purpose-built =
= for stream processing apps =
===========================================
Copyright 2017-2020 Confluent Inc.
CLI v6.1.5, Server v6.1.5 located at http://localhost:8088
Server Status: RUNNING
Having trouble? Type 'help' (case-insensitive) for a rundown of how things work!
ksql>
SHOW および PRINT ステートメントを使用した Kafka トピックの調査¶
ksqlDB により、 Kafka のトピックとメッセージをリアルタイムで調査できます。
- SHOW TOPICS ステートメントを使用して、Kafka クラスター上で使用可能なトピックをリストします。
- PRINT ステートメントを使用して、トピックに到着したメッセージをそのつど表示します。
ksqlDB CLI で、以下のステートメントを実行します。
SHOW TOPICS;
出力は以下のようになります。
Kafka Topic | Partitions | Partition Replicas
--------------------------------------------------------------
default_ksql_processing_log | 1 | 1
pageviews | 1 | 1
users | 1 | 1
--------------------------------------------------------------
デフォルトでは、ksqlDB の内部トピックおよびシステムトピックは非表示になっています。SHOW ALL TOPICS ステートメントを使用して、Kafka クラスター上で使用可能な全トピックのリストを表示します。
SHOW ALL TOPICS;
出力は以下のようになります。
Kafka Topic | Partitions | Partition Replicas
---------------------------------------------------------------------------------------------------------------------------------
_confluent-command | 1 | 1
_confluent-controlcenter-... | 2 | 1
...
_confluent-ksql-default__command_topic | 1 | 1
_confluent-license | 1 | 1
_confluent-metrics | 12 | 1
_confluent-monitoring | 2 | 1
_confluent-telemetry-metrics | 12 | 1
_confluent_balancer_api_state | 1 | 1
_confluent_balancer_broker_samples | 32 | 1
_confluent_balancer_partition_samples | 32 | 1
_schemas | 1 | 1
connect-configs | 1 | 1
connect-offsets | 25 | 1
connect-statuses | 5 | 1
default_ksql_processing_log | 1 | 1
pageviews | 1 | 1
users | 1 | 1
---------------------------------------------------------------------------------------------------------------------------------
注釈
実際の出力には、_confluent-controlcenter
トピックが多数表示されます。わかりやすくするためにここでは削除されています。
PRINT ステートメントを使用した users
トピックの調査
PRINT users;
注釈
PRINT ステートメントは、ksqlDB のコマンドの中でも大文字と小文字が区別される数少ないコマンドの 1 つであり、トピック名が引用符で囲まれていない場合でも区別されます。
出力は以下のようになります。
Key format: KAFKA_STRING
Value format: KAFKA_STRING
rowtime: 2021/01/27 18:06:22.057 Z, key: User_6, value: 1505521983750,User_6,Region_4,OTHER
rowtime: 2021/01/27 18:06:23.057 Z, key: User_8, value: 1518180723778,User_8,Region_1,OTHER
rowtime: 2021/01/27 18:06:24.057 Z, key: User_8, value: 1517994971847,User_8,Region_5,MALE
^CTopic printing ceased
CTRL + C を押して、メッセージのプリントを停止します。
PRINT ステートメントを使用した pageviews
トピックの調査
PRINT pageviews;
出力は以下のようになります。
Key format: KAFKA_BIGINT or KAFKA_DOUBLE
Value format: KAFKA_STRING
rowtime: 2021/01/27 18:08:50.961 Z, key: 1611770930961, value: 1611770930961,User_4,Page_32
rowtime: 2021/01/27 18:08:51.161 Z, key: 1611770931161, value: 1611770931161,User_5,Page_81
rowtime: 2021/01/27 18:08:51.361 Z, key: 1611770931361, value: 1611770931361,User_1,Page_74
^CTopic printing ceased
CTRL + C を押して、メッセージのプリントを停止します。
詳細については、「ksqlDB 構文リファレンス」を参照してください。
ストリームおよびテーブルの作成¶
この例では、pageviews
および users
と呼ばれる Kafka トピックのメッセージをクエリで取得するために、以下のスキーマを使用します。

Kafka のトピック
pageviews
からpageviews_original
という名前のストリームを作成し、value_format
の値にはAVRO
を指定します。CREATE STREAM pageviews_original (viewtime bigint, userid varchar, pageid varchar) WITH (kafka_topic='pageviews', value_format='AVRO');
出力は以下のようになります。
Message --------------- Stream created ---------------
ちなみに
DESCRIBE pageviews_original;
を実行すると、ストリームのスキーマを見ることができます。ksqlDB が作成したROWTIME
という列が追加されていることに注意してください。これは Kafka メッセージのタイムスタンプに対応しています。Kafka のトピック
users
からusers_original
という名前のテーブルを作成し、value_format
の値にはAVRO
を指定します。CREATE TABLE users_original (id VARCHAR PRIMARY KEY) WITH (kafka_topic='users', value_format='AVRO');
出力は以下のようになります。
Message --------------- Table created ---------------
ちなみに
DESCRIBE users_original;
を実行すると、テーブルのスキーマを見ることができます。注釈
CREATE TABLE ステートメントは、CREATE STREAM ステートメントのように列のセットを定義していないことがわかります。これは、値のフォーマットが Avro であり、DataGen ツールが Schema Registry へ Avro スキーマをパブリッシュしているためです。ksqlDB はそのスキーマを Schema Registry から取得し、テーブルの SQL スキーマの構築に使用します。必要であれば、スキーマを指定することもできます。
(省略可): すべてのストリームおよびテーブルを表示します。
ksql> SHOW STREAMS; Stream Name | Kafka Topic | Key Format | Value Format | Windowed ------------------------------------------------------------------------------------------ KSQL_PROCESSING_LOG | default_ksql_processing_log | KAFKA | JSON | false PAGEVIEWS_ORIGINAL | pageviews | KAFKA | DELIMITED | false ------------------------------------------------------------------------------------------ ksql> SHOW TABLES; Table Name | Kafka Topic | Key Format | Value Format | Windowed --------------------------------------------------------------------- USERS_ORIGINAL | users | KAFKA | AVRO | false ---------------------------------------------------------------------
ちなみに
SHOW STREAMS の出力に
KSQL_PROCESSING_LOG
ストリームがリストされていることに注意してください。ksqlDB では、データの処理中になんらかの問題が発生すると、それについて説明するメッセージが追加されます。期待どおりに機能しないものがある場合は、このストリームの内容を見て ksqlDB にデータエラーが発生してないかを確認します。
データの表示¶
SELECT を使用して、TABLE からデータを返すクエリを作成します。このクエリには、クエリの結果で返される行数を制限する LIMIT キーワードと、結果をストリームにより返すよう指示する EMIT CHANGES キーワードが含まれます。これがいわゆる プルクエリ です。各種クエリの解説については、「クエリ」を参照してください。なお、正確なデータ出力はデータ生成のランダム性により変動することがあります。
SELECT * from users_original emit changes limit 5;
出力は以下のようになります。
+---------------+---------------+---------------+---------------+---------------+ |ID |REGISTERTIME |USERID |REGIONID |GENDER | +---------------+---------------+---------------+---------------+---------------+ |User_2 |1502155111606 |User_2 |Region_8 |OTHER | |User_1 |1499783711681 |User_1 |Region_3 |OTHER | |User_9 |1504556621362 |User_9 |Region_5 |FEMALE | |User_6 |1488869543103 |User_6 |Region_4 |OTHER | |User_3 |1512248344223 |User_3 |Region_9 |FEMALE | Limit Reached Query terminated
注釈
テーブルに対するプッシュクエリは、Kafka の changelog トピックに保管されているテーブルの全履歴を出力します。つまり、過去のデータを出力した後に、テーブルに対するアップデートのストリームを出力します。したがって、テーブル内の既存の行がアップデートされると、その行と一致する
ID
を持つ行が出力される可能性があります。以下のプッシュクエリを発行して、pageviews_original ストリームのデータを表示します。
SELECT viewtime, userid, pageid FROM pageviews_original emit changes LIMIT 3;
出力は以下のようになります。
+--------------+--------------+--------------+ |VIEWTIME |USERID |PAGEID | +--------------+--------------+--------------+ |1581078296791 |User_1 |Page_54 | |1581078297792 |User_8 |Page_93 | |1581078298792 |User_6 |Page_26 | Limit Reached Query terminated
注釈
デフォルトでは、ストリームに対するプッシュクエリはクエリ開始後に発生した変更のみを出力します。つまり、過去のデータは含まれません。過去のデータを表示する場合は、
set 'auto.offset.reset'='earliest';
を実行してセッションのプロパティを更新します。
クエリの書き込み¶
この例では、ksqlDB を使用してクエリを書き込みます。
注記 : デフォルトでは、ksqlDB はストリームおよびテーブルのトピックを最新のオフセットから読み取ります。
users
テーブルからユーザーのgender
とregionid
を読み取ってpageviews
データを拡張するクエリを作成します。以下のクエリは、users_original
テーブルのuserid
列に対して LEFT JOIN を行うことにより、pageviews_original
ストリームを拡張します。SELECT users_original.userid AS userid, pageid, regionid, gender FROM pageviews_original LEFT JOIN users_original ON pageviews_original.userid = users_original.id EMIT CHANGES LIMIT 5;
出力は以下のようになります。
+-------------------+-------------------+-------------------+-------------------+ |ID |PAGEID |REGIONID |GENDER | +-------------------+-------------------+-------------------+-------------------+ |User_7 |Page_23 |Region_2 |OTHER | |User_3 |Page_42 |Region_2 |MALE | |User_7 |Page_87 |Region_2 |OTHER | |User_2 |Page_57 |Region_5 |FEMALE | |User_9 |Page_59 |Region_1 |OTHER | Limit Reached Query terminated
CREATE STREAM
キーワードを使用して、SELECT
ステートメントに先行する永続的なクエリを作成します。クエリの結果はPAGEVIEWS_ENRICHED
Kafka トピックに書き込まれます。以下のクエリは、users_original
テーブルのユーザー ID に対してLEFT JOIN
を行うことにより、pageviews_original
ストリームを拡張します。CREATE STREAM pageviews_enriched AS SELECT users_original.id AS userid, pageid, regionid, gender FROM pageviews_original LEFT JOIN users_original ON pageviews_original.userid = users_original.id EMIT CHANGES;
出力は以下のようになります。
Message -------------------------------------------------- Created query with ID CSAS_PAGEVIEWS_ENRICHED_33 --------------------------------------------------
ちなみに
DESCRIBE pageviews_enriched;
を実行すると、ストリームを記述できます。SELECT
を使用して、クエリ結果の到着ごとに表示します。クエリ結果の表示を停止するには Ctrl + C を押します。これによりコンソールへのプリントは停止しますが、クエリそのものは停止しません。基盤となる ksqlDB アプリケーションではクエリが実行され続けます。SELECT * FROM pageviews_enriched emit changes;
出力は以下のようになります。
+---------------------+---------------------+---------------------+---------------------+ |ID |PAGEID |REGIONID |GENDER | +---------------------+---------------------+---------------------+---------------------+ |User_8 |Page_41 |Region_4 |FEMALE | |User_2 |Page_87 |Region_3 |OTHER | |User_3 |Page_84 |Region_8 |FEMALE | ^CQuery terminated
Ctrl + C キーを押してクエリを停止します。
WHERE
を使用して、条件によりストリームのコンテンツを制限する新しい永続的なクエリを作成します。クエリの結果は、PAGEVIEWS_FEMALE
という名前の Kafka トピックに書き込まれます。CREATE STREAM pageviews_female AS SELECT * FROM pageviews_enriched WHERE gender = 'FEMALE' EMIT CHANGES;
出力は以下のようになります。
Message ------------------------------------------------ Created query with ID CSAS_PAGEVIEWS_FEMALE_35 ------------------------------------------------
ちなみに
DESCRIBE pageviews_female;
を実行すると、ストリームを記述できます。LIKE を使用して、別の条件を満たす新しい永続的なクエリを作成します。クエリの結果は
pageviews_enriched_r8_r9
Kafka トピックに書き込まれます。CREATE STREAM pageviews_female_like_89 WITH (kafka_topic='pageviews_enriched_r8_r9') AS SELECT * FROM pageviews_female WHERE regionid LIKE '%_8' OR regionid LIKE '%_9' EMIT CHANGES;
出力は以下のようになります。
Message -------------------------------------------------------- Created query with ID CSAS_PAGEVIEWS_FEMALE_LIKE_89_37 --------------------------------------------------------
カウントが 1 より大きい場合に 30 秒の タンブリングウィンドウ で地域と性別の組み合わせごとにページビューをカウントする新しい永続的なクエリを作成します。クエリの結果は
PAGEVIEWS_REGIONS
Kafka トピックに Avro フォーマットで書き込まれます。ksqlDB は、PAGEVIEWS_REGIONS
トピックへ最初のメッセージを書き込む際に、構成済みの Schema Registry で Avro スキーマを登録します。CREATE TABLE pageviews_regions WITH (VALUE_FORMAT='avro') AS SELECT gender, regionid , COUNT(*) AS numusers FROM pageviews_enriched WINDOW TUMBLING (size 30 second) GROUP BY gender, regionid EMIT CHANGES;
出力は以下のようになります。
Message ------------------------------------------------- Created query with ID CTAS_PAGEVIEWS_REGIONS_39 -------------------------------------------------
ちなみに
DESCRIBE pageviews_regions;
を実行すると、テーブルを記述できます。(省略可): プッシュクエリを使用して、上記のクエリの結果を表示します。
SELECT * FROM pageviews_regions EMIT CHANGES LIMIT 5;
出力は以下のようになります。
+----------------------+----------------------+----------------------+----------------------+ |KSQL_COL_0 |WINDOWSTART |WINDOWEND |NUMUSERS | +----------------------+----------------------+----------------------+----------------------+ |FEMALE|+|Region_2 |1611774900000 |1611774930000 |1 | |FEMALE|+|Region_3 |1611774900000 |1611774930000 |2 | |FEMALE|+|Region_4 |1611774900000 |1611774930000 |3 | |FEMALE|+|Region_8 |1611774900000 |1611774930000 |2 | |MALE|+|Region_4 |1611774900000 |1611774930000 |3 | Limit Reached Query terminated
注釈
WINDOWSTART 列と WINDOWEND 列の追加に注目してください。これらが利用可能な理由は、
pageviews_regions
が 30 秒あたりの ウィンドウ でデータを集計しているためです。ksqlDB は、ウィンドウ化された集計結果に応じてこれらのシステム列を自動的に追加します。(省略可): プルクエリを使用して、前のクエリの結果を表示します。
CREATE TABLE ステートメントに GROUP BY 句が含まれている場合、ksqlDB は集計結果を保管するテーブルを内部で構築します。ksqlDB はこのような集計結果に対するプルクエリをサポートしています。
前のステップで使用した結果のストリームを プッシュ するプッシュクエリとは異なり、プルクエリは結果セットを プル して自動的に終了します。
プルクエリには EMIT CHANGES 句はありません。
プルクエリを使用して、特定の性別と地域について利用可能なすべてのウィンドウおよびユーザーカウントを表示します。
SELECT * FROM pageviews_regions WHERE KSQL_COL_0='FEMALE|+|Region_4';
出力は以下のようになります。
+----------------------+----------------------+----------------------+----------------------+ |KSQL_COL_0 |WINDOWSTART |WINDOWEND |NUMUSERS | +----------------------+----------------------+----------------------+----------------------+ |FEMALE|+|Region_4 |1611774900000 |1611774930000 |3 | |FEMALE|+|Region_4 |1611774930000 |1611774960000 |2 | |FEMALE|+|Region_4 |1611774990000 |1611775020000 |3 | Query terminated
pageviews_regions
などのウィンドウ化されたテーブルに対するプルクエリは、単一ウィンドウの結果に対するクエリもサポートしています。SELECT NUMUSERS FROM pageviews_regions WHERE KSQL_COL_0='FEMALE|+|Region_4' AND WINDOWSTART=1611774900000;
注釈
前の SQL の
WINDOWSTART
の値は、データのウィンドウ境界のいずれかと一致するように変更する必要があります。そうしなければ、結果は返されません。出力は以下のようになります。
+----------+ |NUMUSERS | +----------+ |4 | Query terminated
ウィンドウの範囲を照会するクエリを実行します。
SELECT WINDOWSTART, WINDOWEND, NUMUSERS FROM pageviews_regions WHERE KSQL_COL_0='OTHER|+|Region_9' AND 1611774900000 <= WINDOWSTART AND WINDOWSTART <= 1611775020000;
注釈
前の SQL の
WINDOWSTART
の値は、データのウィンドウ境界のいずれかと一致するように変更する必要があります。そうしなければ、結果は返されません。出力は以下のようになります。
+------------------------------+------------------------------+------------------------------+ |WINDOWSTART |WINDOWEND |NUMUSERS | +------------------------------+------------------------------+------------------------------+ |1611774930000 |1611774960000 |8 | |1611774960000 |1611774990000 |1 | |1611774990000 |1611775020000 |17 | |1611775020000 |1611775050000 |22 | Query terminated
(省略可): すべての永続的なクエリを表示します。
SHOW QUERIES;
出力は以下のようになります。
Query ID | Query Type | Status | Sink Name | Sink Kafka Topic | Query String ---------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------- CSAS_PAGEVIEWS_ENRICHED_33 | PERSISTENT | RUNNING:1 | PAGEVIEWS_ENRICHED | PAGEVIEWS_ENRICHED | CREATE STREAM PAGEVIEWS_ENRICHED WITH (KAFKA_TOPIC='PAGEVIEWS_ENRICHED', PARTITIONS=1, REPLICAS=1) AS SELECT USERS_ORIGINAL.ID ID, PAGEVIEWS_ORIGINAL.PAGEID PAGEID, USERS_ORIGINAL.REGIONID REGIONID, USERS_ORIGINAL.GENDER GENDER FROM PAGEVIEWS_ORIGINAL PAGEVIEWS_ORIGINAL LEFT OUTER JOIN USERS_ORIGINAL USERS_ORIGINAL ON ((PAGEVIEWS_ORIGINAL.USERID = USERS_ORIGINAL.ID)) EMIT CHANGES; CSAS_PAGEVIEWS_FEMALE_35 | PERSISTENT | RUNNING:1 | PAGEVIEWS_FEMALE | PAGEVIEWS_FEMALE | CREATE STREAM PAGEVIEWS_FEMALE WITH (KAFKA_TOPIC='PAGEVIEWS_FEMALE', PARTITIONS=1, REPLICAS=1) AS SELECT * FROM PAGEVIEWS_ENRICHED PAGEVIEWS_ENRICHED WHERE (PAGEVIEWS_ENRICHED.GENDER = 'FEMALE') EMIT CHANGES; CTAS_PAGEVIEWS_REGIONS_39 | PERSISTENT | RUNNING:1 | PAGEVIEWS_REGIONS | PAGEVIEWS_REGIONS | CREATE TABLE PAGEVIEWS_REGIONS WITH (KAFKA_TOPIC='PAGEVIEWS_REGIONS', PARTITIONS=1, REPLICAS=1, VALUE_FORMAT='avro') AS SELECT PAGEVIEWS_ENRICHED.GENDER GENDER, PAGEVIEWS_ENRICHED.REGIONID REGIONID, COUNT(*) NUMUSERS FROM PAGEVIEWS_ENRICHED PAGEVIEWS_ENRICHED WINDOW TUMBLING ( SIZE 30 SECONDS ) GROUP BY PAGEVIEWS_ENRICHED.GENDER, PAGEVIEWS_ENRICHED.REGIONID EMIT CHANGES; CSAS_PAGEVIEWS_FEMALE_LIKE_89_37 | PERSISTENT | RUNNING:1 | PAGEVIEWS_FEMALE_LIKE_89 | pageviews_enriched_r8_r9 | CREATE STREAM PAGEVIEWS_FEMALE_LIKE_89 WITH (KAFKA_TOPIC='pageviews_enriched_r8_r9', PARTITIONS=1, REPLICAS=1) AS SELECT * FROM PAGEVIEWS_FEMALE PAGEVIEWS_FEMALE WHERE ((PAGEVIEWS_FEMALE.REGIONID LIKE '%_8') OR (PAGEVIEWS_FEMALE.REGIONID LIKE '%_9')) EMIT CHANGES; ---------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------- For detailed information on a Query run: EXPLAIN <Query ID>;
(省略可): クエリ実行時間のメトリクスや詳細を調べます。ターゲットの Kafka トピックなどの情報が使用できます。また、処理されているメッセージのスループットを示す数値も使用できます。
DESCRIBE EXTENDED PAGEVIEWS_REGIONS;
出力は以下のようになります。
Name : PAGEVIEWS_REGIONS Type : TABLE Timestamp field : Not set - using <ROWTIME> Key format : KAFKA Value format : AVRO Kafka topic : PAGEVIEWS_REGIONS (partitions: 1, replication: 1) Statement : CREATE TABLE PAGEVIEWS_REGIONS WITH (KAFKA_TOPIC='PAGEVIEWS_REGIONS', PARTITIONS=1, REPLICAS=1, VALUE_FORMAT='avro') AS SELECT PAGEVIEWS_ENRICHED.GENDER GENDER, PAGEVIEWS_ENRICHED.REGIONID REGIONID, COUNT(*) NUMUSERS FROM PAGEVIEWS_ENRICHED PAGEVIEWS_ENRICHED WINDOW TUMBLING ( SIZE 30 SECONDS ) GROUP BY PAGEVIEWS_ENRICHED.GENDER, PAGEVIEWS_ENRICHED.REGIONID EMIT CHANGES; Field | Type --------------------------------------------------------------------- KSQL_COL_0 | VARCHAR(STRING) (primary key) (Window type: TUMBLING) NUMUSERS | BIGINT --------------------------------------------------------------------- Queries that write from this TABLE ----------------------------------- CTAS_PAGEVIEWS_REGIONS_39 (RUNNING) : CREATE TABLE PAGEVIEWS_REGIONS WITH (KAFKA_TOPIC='PAGEVIEWS_REGIONS', PARTITIONS=1, REPLICAS=1, VALUE_FORMAT='avro') AS SELECT PAGEVIEWS_ENRICHED.GENDER GENDER, PAGEVIEWS_ENRICHED.REGIONID REGIONID, COUNT(*) NUMUSERS FROM PAGEVIEWS_ENRICHED PAGEVIEWS_ENRICHED WINDOW TUMBLING ( SIZE 30 SECONDS ) GROUP BY PAGEVIEWS_ENRICHED.GENDER, PAGEVIEWS_ENRICHED.REGIONID EMIT CHANGES; For query topology and execution plan please run: EXPLAIN <QueryId> Local runtime statistics ------------------------ messages-per-sec: 2.89 total-messages: 3648 last-message: 2021-01-27T19:36:11.197Z (Statistics of the local KSQL server interaction with the Kafka topic PAGEVIEWS_REGIONS) Consumer Groups summary: Consumer Group : _confluent-ksql-default_query_CTAS_PAGEVIEWS_REGIONS_39 Kafka topic : PAGEVIEWS_ENRICHED Max lag : 5 Partition | Start Offset | End Offset | Offset | Lag ------------------------------------------------------ 0 | 0 | 7690 | 7685 | 5 ------------------------------------------------------ Kafka topic : _confluent-ksql-default_query_CTAS_PAGEVIEWS_REGIONS_39-Aggregate-GroupBy-repartition Max lag : 5 Partition | Start Offset | End Offset | Offset | Lag ------------------------------------------------------ 0 | 6224 | 6229 | 6224 | 5 ------------------------------------------------------
ksqlDB でのネスト化されたスキーマ(STRUCT)の使用¶
Struct のサポートにより、JSON と Avro のどちらでも Kafka のトピック内のネスト化データのモデリングとアクセスが可能になります。
ここでは ksql-datagen
ツールを使用して、ネスト化された address
フィールドを含むサンプルデータをいくつか作成します。これを新しいウィンドウで実行し、そのままにしておきます。
$CONFLUENT_HOME/bin/ksql-datagen \
quickstart=orders \
format=avro \
topic=orders \
msgRate=1
ksqlDB のコマンドプロンプトから、そのトピックを ksqlDB に登録します。
CREATE STREAM orders
(
ordertime BIGINT,
orderid INT,
itemid STRING,
orderunits DOUBLE,
address STRUCT<city STRING, state STRING, zipcode BIGINT>
)
WITH (KAFKA_TOPIC='orders', VALUE_FORMAT='avro');
出力は以下のようになります。
Message
----------------
Stream created
----------------
DESCRIBE
関数を使用してスキーマを調べると、STRUCT
が含まれています。
DESCRIBE orders;
出力は以下のようになります。
Field | Type
----------------------------------------------------------------------------------
ORDERTIME | BIGINT
ORDERID | INTEGER
ITEMID | VARCHAR(STRING)
ORDERUNITS | DOUBLE
ADDRESS | STRUCT<CITY VARCHAR(STRING), STATE VARCHAR(STRING), ZIPCODE BIGINT>
----------------------------------------------------------------------------------
For runtime statistics and query details run: DESCRIBE EXTENDED <Stream,Table>;
->
表記を使用してデータのクエリを実行し、Struct の内容にアクセスします。
SELECT ORDERID, ADDRESS->CITY FROM ORDERS EMIT CHANGES LIMIT 5;
出力は以下のようになります。
+-----------------------------------+-----------------------------------+
|ORDERID |ADDRESS__CITY |
+-----------------------------------+-----------------------------------+
|1188 |City_95 |
|1189 |City_24 |
|1190 |City_57 |
|1191 |City_37 |
|1192 |City_82 |
Limit Reached
Query terminated
ストリーム同士の結合¶
ストリーム同士の結合を使用することにより、共通のキーについて 2 つの ストリーム(イベントのストリーム)を結合できます。一例として、注文イベントのストリームと出荷イベントのストリームが挙げられます。これらを注文のキーで結合することにより、注文と併せて出荷の情報を見ることができるようになります。
ksqlDB CLI で 2 つのストリームを新たに作成します。kafka-console-producer
コマンドを使用して、Kafka トピックとして new_orders
と shipments
を作成します。
CREATE STREAM ステートメントを使用して、new_orders
と shipments
トピックにストリームを登録します。
CREATE STREAM new_orders (order_id INT, total_amount DOUBLE, customer_name VARCHAR)
WITH (KAFKA_TOPIC='new_orders', VALUE_FORMAT='JSON', PARTITIONS=2);
CREATE STREAM shipments (order_id INT, shipment_id INT, warehouse VARCHAR)
WITH (KAFKA_TOPIC='shipments', VALUE_FORMAT='JSON', PARTITIONS=2);
注釈
ksqlDB は、これらのステートメントが実行される際に基盤となるトピックを Kafka で作成します。また、REPLICAS
のカウントを指定できます。
2 つの CREATE STREAM
ステートメントの後の出力は以下のようになります。
Message
----------------
Stream created
----------------
INSERT VALUES ステートメントを使用して、ストリームにサンプルデータを入力します。
-- Insert values in NEW_ORDERS:
-- insert supplying the list of columns to insert:
INSERT INTO new_orders (order_id, customer_name, total_amount)
VALUES (1, 'Bob Smith', 10.50);
-- shorthand syntax can be used when inserting values for all columns (except ROWTIME), in column order:
INSERT INTO new_orders VALUES (2, 3.32, 'Sarah Black');
INSERT INTO new_orders VALUES (3, 21.00, 'Emma Turner');
-- Insert values in SHIPMENTS:
INSERT INTO shipments VALUES (1, 42, 'Nashville');
INSERT INTO shipments VALUES (3, 43, 'Palo Alto');
データのクエリを実行してトピックにそのデータが存在することを確認します。
ちなみに
以下を実行して、各ストリームの beginning
から読み取るよう ksqlDB に指示します。
SET 'auto.offset.reset' = 'earliest';
現在の ksqlDB CLI セッションで既に実行している場合はスキップできます。
new_orders
トピックについて以下を実行します。
SELECT * FROM new_orders EMIT CHANGES LIMIT 3;
出力は以下のようになります。
+-------------------------+-------------------------+-------------------------+
|ORDER_ID |TOTAL_AMOUNT |CUSTOMER_NAME |
+-------------------------+-------------------------+-------------------------+
|1 |10.5 |Bob Smith |
|2 |3.32 |Sarah Black |
|3 |21.0 |Emma Turner |
Limit Reached
Query terminated
shipments
トピックについて以下を実行します。
SELECT * FROM shipments EMIT CHANGES LIMIT 2;
出力は以下のようになります。
+-------------------------+-------------------------+-------------------------+
|ORDER_ID |SHIPMENT_ID |WAREHOUSE |
+-------------------------+-------------------------+-------------------------+
|1 |42 |Nashville |
|3 |43 |Palo Alto |
Limit Reached
Query terminated
以下のクエリを実行します。これにより、1 時間の結合ウィンドウを基に、注文とそれに伴う出荷が表示されます。
SELECT o.order_id, o.total_amount, o.customer_name, s.shipment_id, s.warehouse
FROM new_orders o
INNER JOIN shipments s
WITHIN 2 HOURS
ON o.order_id = s.order_id
EMIT CHANGES;
出力は以下のようになります。
+--------------+--------------+--------------+--------------+--------------+
|O_ORDER_ID |TOTAL_AMOUNT |CUSTOMER_NAME |SHIPMENT_ID |WAREHOUSE |
+--------------+--------------+--------------+--------------+--------------+
|1 |10.5 |Bob Smith |42 |Nashville |
|3 |21.0 |Emma Turner |43 |Palo Alto |
なお、ORDER_ID=2
のメッセージには、対応する SHIPMENT_ID
または WAREHOUSE
がありません。これは、指定された時間ウィンドウ内の shipments
ストリームに対応する行がないためです。
他のターミナルで、ksqlDB CLI を起動します。
LOG_DIR=./ksql_logs $CONFLUENT_HOME/bin/ksql
以下の INSERT VALUES ステートメントを入力して、注文 ID 2 の出荷を挿入します。
最初の ksqlDB CLI ウィンドウに切り替えます。これで 3 行目が出力されています。
+--------------+--------------+--------------+--------------+--------------+
|O_ORDER_ID |TOTAL_AMOUNT |CUSTOMER_NAME |SHIPMENT_ID |WAREHOUSE |
+--------------+--------------+--------------+--------------+--------------+
|1 |10.5 |Bob Smith |42 |Nashville |
|2 |3.32 |Sarah Black |49 |London |
|3 |21.0 |Emma Turner |43 |Palo Alto |
^CQuery terminated
Ctrl + C キーを押して SELECT クエリをキャンセルし、ksqlDB のプロンプトに戻ります。
テーブル同士の結合¶
テーブル同士の結合を使用することにより、共通のキーについて 2 つの テーブル を結合できます。ksqlDB のテーブルは、指定した キー について最新の 値 を提供します。結合は キー に対してのみ可能であり、1 対多(1:N)の結合は現在のセマンティックモデルではサポートされていません。
この例では、あるシステムから得た倉庫の位置データがあり、別のシステムからの倉庫の規模に関するデータによりこれを拡張します。
ksqlDB CLI で、両方のトピックを ksqlDB のテーブルとして登録します。なおこの例では、倉庫 ID はキーと WAREHOUSE_ID フィールドの値の両方に保管されています。
CREATE TABLE warehouse_location (warehouse_id INT PRIMARY KEY, city VARCHAR, country VARCHAR)
WITH (KAFKA_TOPIC='warehouse_location', VALUE_FORMAT='JSON', PARTITIONS=2);
CREATE TABLE warehouse_size (warehouse_id INT PRIMARY KEY, square_footage DOUBLE)
WITH (KAFKA_TOPIC='warehouse_size', VALUE_FORMAT='JSON', PARTITIONS=2);
2 つの CREATE TABLE
ステートメントの後の出力は以下のようになります。
Message
---------------
Table created
---------------
INSERT INTO warehouse_location (warehouse_id, city, country) VALUES (1, 'Leeds', 'UK');
INSERT INTO warehouse_location (warehouse_id, city, country) VALUES (2, 'Sheffield', 'UK');
INSERT INTO warehouse_location (warehouse_id, city, country) VALUES (3, 'Berlin', 'Germany');
INSERT INTO warehouse_size (warehouse_id, square_footage) VALUES (1, 16000);
INSERT INTO warehouse_size (warehouse_id, square_footage) VALUES (2, 42000);
INSERT INTO warehouse_size (warehouse_id, square_footage) VALUES (3, 94000);
warehouse_location
テーブルを調べます。
SELECT * FROM warehouse_location EMIT CHANGES LIMIT 3;
出力は以下のようになります。
+-------------------------+-------------------------+-------------------------+
|WAREHOUSE_ID |CITY |COUNTRY |
+-------------------------+-------------------------+-------------------------+
|1 |Leeds |UK |
|2 |Sheffield |UK |
|3 |Berlin |Germany |
Limit Reached
Query terminated
warehouse_size
テーブルを調べます。
SELECT * FROM warehouse_size EMIT CHANGES LIMIT 3;
出力は以下のようになります。
+---------------------------------------+---------------------------------------+
|WAREHOUSE_ID |SQUARE_FOOTAGE |
+---------------------------------------+---------------------------------------+
|1 |16000.0 |
|2 |42000.0 |
|3 |94000.0 |
Limit Reached
Query terminated
ここで 2 つのテーブルを結合します。
SELECT wl.warehouse_id, wl.city, wl.country, ws.square_footage
FROM warehouse_location wl
LEFT JOIN warehouse_size ws
ON wl.warehouse_id=ws.warehouse_id
EMIT CHANGES
LIMIT 3;
出力は以下のようになります。
+------------------+------------------+------------------+------------------+
|WL_WAREHOUSE_ID |CITY |COUNTRY |SQUARE_FOOTAGE |
+------------------+------------------+------------------+------------------+
|1 |Leeds |UK |16000.0 |
|2 |Sheffield |UK |42000.0 |
|3 |Berlin |Germany |94000.0 |
Limit Reached
Query terminated
INSERT INTO¶
INSERT INTO 構文を使用して、複数ストリームのコンテンツをマージすることができます。例として、同じイベントタイプを複数の異なる送信元から取得する場合などが挙げられます。
それぞれが異なるトピックへ書き込む 2 つの datagen プロセスを実行し、ローカルインストールとサードパーティの双方から到着する注文データをシミュレートします。
ちなみに
これらのコマンドは、それぞれ個別のウィンドウで実行する必要があります。演習が完了したら、Ctrl + C を押してそれらを終了します。
$CONFLUENT_HOME/bin/ksql-datagen \
quickstart=orders \
format=json \
topic=orders_local \
msgRate=2
$CONFLUENT_HOME/bin/ksql-datagen \
quickstart=orders \
format=json \
topic=orders_3rdparty \
msgRate=2
ksqlDB CLI で、それぞれのソーストピックを登録します。
CREATE STREAM orders_src_local
(
ordertime BIGINT,
orderid INT,
itemid STRING,
orderunits DOUBLE,
address STRUCT<city STRING, state STRING, zipcode BIGINT>
)
WITH (KAFKA_TOPIC='orders_local', VALUE_FORMAT='JSON');
CREATE STREAM orders_src_3rdparty
(
ordertime BIGINT,
orderid INT,
itemid STRING,
orderunits DOUBLE,
address STRUCT<city STRING, state STRING, zipcode BIGINT>
)
WITH (KAFKA_TOPIC='orders_3rdparty', VALUE_FORMAT='JSON');
各 CREATE STREAM ステートメントの後に、メッセージが表示されます。
Message
----------------
Stream created
----------------
標準の CREATE STREAM … AS
構文を使用して出力ストリームを作成します。複数の送信元のデータが共通のターゲットに結合されるため、データの流れの情報を追加すると有用です。SELECT
の一部に含めるだけでこれを実行できます。
CREATE STREAM all_orders AS SELECT 'LOCAL' AS SRC, * FROM orders_src_local EMIT CHANGES;
出力は以下のようになります。
Message
------------------------------------------
Created query with ID CSAS_ALL_ORDERS_71
------------------------------------------
DESCRIBE
コマンドを使用してターゲットストリームのスキーマを調べます。
DESCRIBE all_orders;
出力は以下のようになります。
Name : ALL_ORDERS
Field | Type
----------------------------------------------------------------------------------
SRC | VARCHAR(STRING)
ORDERTIME | BIGINT
ORDERID | INTEGER
ITEMID | VARCHAR(STRING)
ORDERUNITS | DOUBLE
ADDRESS | STRUCT<CITY VARCHAR(STRING), STATE VARCHAR(STRING), ZIPCODE BIGINT>
----------------------------------------------------------------------------------
For runtime statistics and query details run: DESCRIBE EXTENDED <Stream,Table>;
サードパーティの注文のストリームを既存の出力ストリームに追加します。
INSERT INTO all_orders SELECT '3RD PARTY' AS SRC, * FROM orders_src_3rdparty EMIT CHANGES;
出力は以下のようになります。
Message
--------------------------------------
Created query with ID INSERTQUERY_73
--------------------------------------
出力ストリームのクエリを実行して、各送信元からのデータが書き込まれていることを検証します。
SELECT * FROM all_orders EMIT CHANGES;
この出力は以下のようになります。両方のソーストピックからのメッセージが含まれていることに注目します(それぞれ LOCAL
と 3RD PARTY
により示されています)。
+---------------------+---------------------+---------------------+---------------------+---------------------+---------------------+
|SRC |ORDERTIME |ORDERID |ITEMID |ORDERUNITS |ADDRESS |
+---------------------+---------------------+---------------------+---------------------+---------------------+---------------------+
|3RD PARTY |1491006356222 |1583 |Item_169 |2.091966572094054 |{CITY=City_91, STATE=|
| | | | | |State_51, ZIPCODE=184|
| | | | | |74} |
|LOCAL |1504382324241 |1630 |Item_410 |0.6462658185260942 |{CITY=City_55, STATE=|
| | | | | |State_38, ZIPCODE=372|
| | | | | |44} |
|3RD PARTY |1512567250385 |1584 |Item_357 |7.205193136057381 |{CITY=City_91, STATE=|
| | | | | |State_19, ZIPCODE=457|
| | | | | |45} |
^CQuery terminated
Ctrl + C を押して SELECT
クエリをキャンセルし ksqlDB のプロンプトに戻ります。
終了¶
ksqlDB¶
注釈
永続的なクエリは、手動で終了するまで ksqlDB アプリケーションとして継続的に実行されます。ksqlDB CLI を終了しても永続的なクエリは終了されません。
SHOW QUERIES;
の出力から、終了するクエリ ID を特定します。たとえばクエリ IDCTAS_PAGEVIEWS_REGIONS_15
を終了するには、以下を実行します。TERMINATE CTAS_PAGEVIEWS_REGIONS_15;
ちなみに
実行されているクエリの実際の名前は異なる場合があります。
SHOW QUERIES;
の出力を参照してください。exit
コマンドを実行して ksqlDB CLI を終了します。ksql> exit Exiting ksqlDB.
Confluent CLI¶
CLI を使用して Confluent Platform を実行している場合は、このコマンドで停止させることができます。
$CONFLUENT_HOME/bin/confluent local services stop