ksqlDB および Confluent Control Center を使用した Apache Kafka® に対するストリーミングクエリの作成¶
Confluent Control Center で ksqlDB を使用して Kafka のメッセージに対するストリーミングクエリを作成することができます。
前提条件 :
- Confluent Platform がインストールおよび実行されている。このインストールには Kafka ブローカー、ksqlDB、Control Center、ZooKeeper、Schema Registry、REST Proxy および Connect が含まれています。
- TAR または ZIP を使用して Confluent Platform をインストールした場合は、インストールディレクトリまで移動します。このチュートリアル全体で使用するパスおよびコマンドは、カレントディレクトリがこのインストールディレクトリ $CONFLUENT_HOME であると想定しています。
- Confluent Platform のローカルインストールを開始する際には、Confluent CLI の インストール を検討します。
- Java: バージョン 1.8 以降。Oracle Java JRE または JDK 1.8 以降をローカルマシンにインストールする必要があります。
トピックの作成とデータの生成¶
Kafka のトピック pageviews
および users
を作成し、データを生成します。これらの手順では、Confluent Platform の ksqlDB datagen ツールを使用します。
新しいターミナルウィンドウを開いて以下のコマンドを実行し、データジェネレーターを使用して
pageviews
トピックを作成しデータを生成します。以下の例は、DELIMITED フォーマットのデータを継続的に生成します。$CONFLUENT_HOME/bin/ksql-datagen quickstart=pageviews format=avro topic=pageviews msgRate=5
別のターミナルウィンドウを開き、以下のコマンドを実行して、データジェネレーターを使用して
users
トピックの Kafka データを生成します。以下の例は、DELIMITED フォーマットのデータを継続的に生成します。$CONFLUENT_HOME/bin/ksql-datagen quickstart=users format=avro topic=users msgRate=1
ちなみに
Confluent Platform にある kafka-console-producer
CLI を使用して Kafka データを生成することもできます。
ksqlDB CLI の起動¶
新しいターミナルウィンドウを開いて以下のコマンドを実行し、LOG_DIR
環境変数を設定して ksqlDB CLI を起動します。
LOG_DIR=./ksql_logs $CONFLUENT_HOME/bin/ksql
このコマンドは、CLI のログを ./ksql_logs
ディレクトリへルーティングします。ディレクトリのパスはカレントディレクトリからの相対パスです。デフォルトでは http://localhost:8088
で実行されている ksqlDB サーバーを CLI が検索します。
Control Center を使用したトピックの調査¶
ブラウザーを開いて http://localhost:9021/ へ移動します。Confluent Control Center が開き、クラスターの ホーム ページが表示されます。ナビゲーションバーで、ksqlDB で使用するクラスターをクリックします。
ナビゲーションメニューで Topics をクリックして、既に作成した
pageviews
トピックおよびusers
トピックを表示します。
Control Center での ksqlDB を使用したトピックの調査¶
ナビゲーションメニューで ksqlDB をクリックして ksqlDB クラスターのページを開き、表示されている ksqlDB アプリケーションをクリックして ksqlDB Editor を開きます。
編集用のウィンドウで、SHOW TOPICS ステートメントを使用して Kafka クラスター上で使用可能なトピックを表示します。Run query をクリックしてクエリを開始します。
SHOW TOPICS;
Query Results ウィンドウで、ページの末尾までスクロールして、既に作成した
pageviews
トピックおよびusers
トピックを表示します。出力は以下のようになります。{ "name": "pageviews", "replicaInfo": [ 1 ] }, { "name": "users", "replicaInfo": [ 1 ] }
コンシューマーとコンシューマーグループの数を表示するには、SHOW TOPICS EXTENDED コマンドを使用します。
編集用のウィンドウで、PRINT TOPIC ステートメントを使用して
users
トピックのレコードを調べます。Run query をクリックしてクエリを開始します。PRINT 'users' FROM BEGINNING;
出力は以下のようになります。
このクエリは、明示的に終了するまで処理を続けます。Stop をクリックしてクエリを終了します。
ストリームおよびテーブルの作成¶
pageviews
トピックと users
トピックに対するストリーミングクエリを作成するために、これらのトピックをストリームとテーブルとして ksqlDB に登録します。ksqlDB エディターで CREATE STREAM ステートメントと CREATE TABLE ステートメントを使用できます。また、Control Center UI を使用することもできます。
この例は、以下のスキーマを使用した pageviews
トピックと users
トピックからのクエリレコードです。

ksqlDB エディターでストリームを作成する方法¶
ksqlDB エディターで CREATE STREAM ステートメントと CREATE TABLE ステートメントを使用して、ksqlDB CLI の場合とほぼ同じ方法でストリームやテーブルを作成できます。
以下のコードを編集用のウィンドウにコピーして Run をクリックします。
CREATE STREAM pageviews_original (viewtime bigint, userid varchar, pageid varchar) WITH (kafka_topic='pageviews', value_format='DELIMITED');
出力は以下のようになります。
Streams をクリックして、作成した
pageviews_original
ストリームを調べます。
ksqlDB エディターでテーブルを作成する方法¶
CREATE TABLE ステートメントを使用して、トピックのテーブルを登録します。
以下のコードをエディターのウィンドウにコピーして Run query をクリックします。
CREATE TABLE users (userid VARCHAR PRIMARY KEY, registertime BIGINT, id VARCHAR, regionid VARCHAR, gender VARCHAR) WITH (KAFKA_TOPIC='users', VALUE_FORMAT='DELIMITED');
出力は以下のようになります。
エディターのウィンドウで SELECT クエリを使用して
users
テーブルのレコードを調べます。SELECT * FROM users EMIT CHANGES;
出力は以下のようになります。
このクエリは、明示的に終了するまで処理を続けます。Stop をクリックしてクエリを終了します。
永続的なクエリの作成¶
ストリームとして登録した pageviews
トピックとテーブルとして登録した users
トピックに対して、TERMINATE ステートメントで終了するまで実行を続けるストリーミングクエリを作成できます。
以下のコードを編集用のウィンドウにコピーして Run をクリックします。
CREATE STREAM pageviews_enriched AS SELECT users.userid AS userid, pageid, regionid, gender FROM pageviews_original LEFT JOIN users ON pageviews_original.userid = users.userid EMIT CHANGES;
出力は以下のようになります。
永続的なクエリを調べるために、Running Queries ページに移動します。前のクエリで作成した
pageviews_enriched
ストリームの詳細情報が表示されます。Explain をクリックすると、永続的なクエリのスキーマとプロパティが表示されます。
永続的なクエリのモニタリング¶
Confluent Control Center を使用して、永続的なクエリを視覚的な方法でモニタリングできます。
ナビゲーションメニューで Consumers をクリックし、
pageviews_enriched
クエリに対応するコンシューマーグループを見つけます。名前は_confluent-ksql-default_query_CSAS_PAGEVIEWS_ENRICHED_
で始まります。Consumer lag ページが開きます。Consumption をクリックして、
pageviews_enriched
クエリがレコードを消費するレートを表示します。Last four hours をクリックして、リストから Last 30 minutes と Apply を選択します。出力は以下のようになります。
クエリのプロパティ¶
クエリを実行する前に ksqlDB Editor でプロパティを割り当てることができます。
ナビゲーションメニューで ksqlDB をクリックして ksqlDB クラスターのページを開き、デフォルトのアプリケーションをクリックして ksqlDB Editor を開きます。
Add query properties をクリックし
auto.offset.reset
フィールドを Earliest に設定します。以下のコードを編集用のウィンドウにコピーして Run をクリックします。
CREATE STREAM pageviews_female AS SELECT * FROM pageviews_enriched WHERE gender = 'FEMALE' EMIT CHANGES;
pageviews_female
ストリームはpageviews
トピックの最も古いレコードから処理を開始します。したがって、トピックの先頭から使用可能なすべてのレコードを消費します。auto.offset.reset
プロパティがpageviews_female
ストリームに適用されていることを確認します。ナビゲーションメニューで Consumers をクリックし、pageviews_female
ストリームに対応するコンシューマーグループを見つけます。名前は_confluent-ksql-default_query_CSAS_PAGEVIEWS_FEMALE_
で始まります。Consumption をクリックして
pageviews_female
クエリがレコードを消費するレートを表示します。必ずタイムスケールを Last 30 minutes に設定します。グラフは 100 パーセントになっています。これは
pageviews_female
ストリームが開始されすべてのレコードが消費されたためです。
ストリームおよびテーブルの表示¶
すべての永続的なクエリ、ストリーム、テーブルを、統合された 1 つの画面で見ることができます。
ナビゲーションメニューで ksqlDB をクリックして ksqlDB クラスターのページを開き、デフォルトのアプリケーションをクリックして ksqlDB Editor を開きます。
ページの右側にある All available streams and tables セクションを確認します。
KSQL_PROCESSING_LOG をクリックしてストリームを開きます。ストリームのスキーマが表示され、ネストされたデータ構造も含まれています。
選択したレコードのダウンロード¶
クエリ結果のウィンドウでレコードを選択し、JSON ファイルとしてダウンロードすることができます。
以下のコードを編集用のウィンドウにコピーして Run をクリックします。
SELECT * FROM PAGEVIEWS_FEMALE EMIT CHANGES;
クエリ結果のウィンドウで一時停止のボタンをクリックし、レコードを選択して Download をクリックします。
フロービューを使用してトポロジーを調べる方法¶
Control Center により、イベントが ksqlDB アプリケーションをどのように流れるかを表示することができます。
ksqlDB ページで Flow をクリックします。
グラフ内の PAGEVIEWS_ENRICHED ノードをクリックすると、現在のメッセージやスキーマをはじめとする
PAGEVIEWS_ENRICHED
ストリームの詳細が表示されます。ksqlDB アプリケーションのトポロジーに関する詳細を参照するには、グラフ内の他のノードをクリックします。
クリーンアップ¶
シャットダウンとクリーンアップのタスクを実行します。
- それぞれのコマンドウィンドウで Ctrl + C キーを押すことにより、実行中の各プロデューサー(
users
トピックやpageviews
トピックへデータを送信する)を停止することができます。 - Confluent Platform を停止するには、「
confluent local services stop
」と入力します。 - 別のテストを最初から実行する前に既存のデータ(トピック、スキーマ、メッセージ)を削除する場合は、「
confluent local destroy
」と入力します。