ksqlDB および Confluent Control Center を使用した Apache Kafka® に対するストリーミングクエリの作成

Confluent Control Center で ksqlDB を使用して Kafka のメッセージに対するストリーミングクエリを作成することができます。

前提条件 :

  • Confluent Platform がインストールおよび実行されている。このインストールには Kafka ブローカー、ksqlDB、Control Center、ZooKeeper、Schema Registry、REST Proxy および Connect が含まれています。
  • TAR または ZIP を使用して Confluent Platform をインストールした場合は、インストールディレクトリまで移動します。このチュートリアル全体で使用するパスおよびコマンドは、カレントディレクトリがこのインストールディレクトリ $CONFLUENT_HOME であると想定しています。
  • Confluent Platform のローカルインストールを開始する際には、Confluent CLI の インストール を検討します。
  • Java: バージョン 1.8 以降。Oracle Java JRE または JDK 1.8 以降をローカルマシンにインストールする必要があります。

トピックの作成とデータの生成

Kafka のトピック pageviews および users を作成し、データを生成します。これらの手順では、Confluent Platform の ksqlDB datagen ツールを使用します。

  1. 新しいターミナルウィンドウを開いて以下のコマンドを実行し、データジェネレーターを使用して pageviews トピックを作成しデータを生成します。以下の例は、DELIMITED フォーマットのデータを継続的に生成します。

    $CONFLUENT_HOME/bin/ksql-datagen quickstart=pageviews format=json topic=pageviews msgRate=5
    
  2. 別のターミナルウィンドウを開き、以下のコマンドを実行して、データジェネレーターを使用して users トピックの Kafka データを生成します。以下の例は、DELIMITED フォーマットのデータを継続的に生成します。

    $CONFLUENT_HOME/bin/ksql-datagen quickstart=users format=avro topic=users msgRate=1
    

ちなみに

Confluent Platform にある kafka-console-producer CLI を使用して Kafka データを生成することもできます。

ksqlDB CLI の起動

新しいターミナルウィンドウを開いて以下のコマンドを実行し、LOG_DIR 環境変数を設定して ksqlDB CLI を起動します。

LOG_DIR=./ksql_logs $CONFLUENT_HOME/bin/ksql

このコマンドは、CLI のログを ./ksql_logs ディレクトリへルーティングします。ディレクトリのパスはカレントディレクトリからの相対パスです。デフォルトでは http://localhost:8088 で実行されている ksqlDB サーバーを CLI が検索します。

Control Center を使用したトピックの調査

  1. ブラウザーを開いて http://localhost:9021/ へ移動します。Confluent Control Center が開き、クラスターの ホーム ページが表示されます。ナビゲーションバーで、ksqlDB で使用するクラスターをクリックします。

  2. ナビゲーションメニューで Topics をクリックして、既に作成した pageviews トピックおよび users トピックを表示します。

    Topics ページを示す Confluent Control Center のスクリーンショット

Control Center での ksqlDB を使用したトピックの調査

  1. ナビゲーションメニューで ksqlDB をクリックして ksqlDB クラスターのページを開き、表示されている ksqlDB アプリケーションをクリックして ksqlDB Editor を開きます。

    ksqldDB アプリケーションリストを示す Confluent Control Center のスクリーンショット
  2. 編集用のウィンドウで、SHOW TOPICS ステートメントを使用して Kafka クラスター上で使用可能なトピックを表示します。Run query をクリックしてクエリを開始します。

    SHOW TOPICS;
    
    |ksqldb| Editor を示す Confluent Control Center のスクリーンショット
  3. Query Results ウィンドウで、ページの末尾までスクロールして、既に作成した pageviews トピックおよび users トピックを表示します。出力は以下のようになります。

    {
      "name": "pageviews",
      "replicaInfo": [
        1
      ]
    },
    {
      "name": "users",
      "replicaInfo": [
        1
      ]
    }
    

    コンシューマーとコンシューマーグループの数を表示するには、SHOW TOPICS EXTENDED コマンドを使用します。

  4. 編集用のウィンドウで、PRINT TOPIC ステートメントを使用して users トピックのレコードを調べます。Run query をクリックしてクエリを開始します。

    PRINT 'users' FROM BEGINNING;
    

    出力は以下のようになります。

    Confluent Control Center の ksqlDB SHOW TOPIC ステートメントを示すスクリーンショット
  5. このクエリは、明示的に終了するまで処理を続けます。Stop をクリックしてクエリを終了します。

ストリームおよびテーブルの作成

pageviews トピックと users トピックに対するストリーミングクエリを作成するために、これらのトピックをストリームとテーブルとして ksqlDB に登録します。ksqlDB エディターで CREATE STREAM ステートメントと CREATE TABLE ステートメントを使用できます。また、Control Center UI を使用することもできます。

この例は、以下のスキーマを使用した pageviews トピックと users トピックからのクエリレコードです。

pageviews ストリームおよび共通の userid 列を持つ users テーブルを示す ER 図

ksqlDB エディターでストリームを作成する方法

ksqlDB エディターで CREATE STREAM ステートメントと CREATE TABLE ステートメントを使用して、ksqlDB CLI の場合とほぼ同じ方法でストリームやテーブルを作成できます。

  1. 以下のコードを編集用のウィンドウにコピーして Run をクリックします。

    CREATE STREAM pageviews_original (viewtime bigint, userid varchar, pageid varchar) WITH
      (kafka_topic='pageviews', value_format='JSON');
    

    出力は以下のようになります。

    Confluent Control Center の ksqlDB CREATE STREAM ステートメントのスクリーンショット
  2. Streams をクリックして、作成した pageviews_original ストリームを調べます。

    Confluent Control Center の ksqlDB Streams ページのスクリーンショット

ksqlDB エディターでテーブルを作成する方法

CREATE TABLE ステートメントを使用して、トピックのテーブルを登録します。

  1. 以下のコードをエディターのウィンドウにコピーして Run query をクリックします。

    CREATE TABLE users_original (id VARCHAR PRIMARY KEY) WITH
      (KAFKA_TOPIC='users', VALUE_FORMAT='AVRO');
    

    出力は以下のようになります。

    Confluent Control Center の ksqlDB CREATE TABLE ステートメントのスクリーンショット

    注釈

    CREATE TABLE ステートメントは、CREATE STREAM ステートメントのように列のセットを定義していないことがわかります。これは、値のフォーマットが Avro であり、DataGen ツールが Schema Registry へ Avro スキーマをパブリッシュしているためです。ksqlDB はそのスキーマを Schema Registry から取得し、テーブルの SQL スキーマの構築に使用します。必要であれば、スキーマを指定することもできます。

  2. エディターのウィンドウで SELECT クエリを使用して users_original テーブルのレコードを調べます。

    SELECT * FROM users_original EMIT CHANGES;
    

    出力は以下のようになります。

    Confluent Cloud のテーブルに対する ksqlDB SELECT クエリのスクリーンショット
  3. このクエリは、明示的に終了するまで処理を続けます。Stop をクリックしてクエリを終了します。

永続的なクエリの作成

ストリームとして登録した pageviews トピックとテーブルとして登録した users トピックに対して、TERMINATE ステートメントで終了するまで実行を続けるストリーミングクエリを作成できます。

  1. 以下のコードを編集用のウィンドウにコピーして Run をクリックします。

    CREATE STREAM pageviews_enriched AS
      SELECT users_original.id AS userid, pageid, regionid, gender
      FROM pageviews_original
      LEFT JOIN users_original
        ON pageviews_original.userid = users_original.id
      EMIT CHANGES;
    

    出力は以下のようになります。

    Confluent Control Center の ksqlDB CREATE STREAM AS SELECT ステートメントのスクリーンショット
  2. 実行中のクエリを調べるために、Persistent Queries ページに移動します。前のクエリで作成した pageviews_enriched ストリームの詳細情報が表示されます。

    Confluent Control Center の ksqlDB Running Queries ページのスクリーンショット
  3. Explain をクリックすると、永続的なクエリのスキーマとプロパティが表示されます。

    Confluent Control Center の ksqlDB Explain Query ページのスクリーンショット

永続的なクエリのモニタリング

Confluent Control Center を使用して、永続的なクエリを視覚的な方法でモニタリングできます。

  1. ナビゲーションメニューで Consumers をクリックし、pageviews_enriched クエリに対応するコンシューマーグループを見つけます。名前は _confluent-ksql-default_query_CSAS_PAGEVIEWS_ENRICHED_ で始まります。Consumer lag ページが開きます。

    Confluent Control Center の Consumer Lag ページのスクリーンショット
  2. Consumption をクリックして、pageviews_enriched クエリがレコードを消費するレートを表示します。Last four hours をクリックして、リストから Last 30 minutesApply を選択します。

    出力は以下のようになります。

    Confluent Control Center の Consumption ページのスクリーンショット

クエリのプロパティ

クエリを実行する前に ksqlDB Editor でプロパティを割り当てることができます。

  1. ナビゲーションメニューで ksqlDB をクリックして ksqlDB アプリケーションのページを開き、デフォルトのアプリケーションをクリックして ksqlDB Editor を開きます。

  2. Add query properties をクリックし auto.offset.reset フィールドを Earliest に設定します。

  3. 以下のコードを編集用のウィンドウにコピーして Run をクリックします。

    CREATE STREAM pageviews_female AS
      SELECT * FROM pageviews_enriched
      WHERE gender = 'FEMALE'
      EMIT CHANGES;
    
    ksqlDB Editor ページでクエリプロパティを設定する方法を示すスクリーンショット

    pageviews_female ストリームは pageviews トピックの最も古いレコードから処理を開始します。したがって、トピックの先頭から使用可能なすべてのレコードを消費します。

  4. auto.offset.reset プロパティが pageviews_female ストリームに適用されていることを確認します。ナビゲーションメニューで Consumers をクリックし、pageviews_female ストリームに対応するコンシューマーグループを見つけます。名前は _confluent-ksql-default_query_CSAS_PAGEVIEWS_FEMALE_ で始まります。

    Consumption をクリックして pageviews_female クエリがレコードを消費するレートを表示します。必ずタイムスケールを Last 30 minutes に設定します。

    Confluent Control Center の Consumption ページのスクリーンショット

    グラフは 100 パーセントになっています。これは pageviews_female ストリームが開始されすべてのレコードが消費されたためです。

ストリームおよびテーブルの表示

すべての永続的なクエリ、ストリーム、テーブルを、統合された 1 つの画面で見ることができます。

  1. ナビゲーションメニューで ksqlDB をクリックして ksqlDB クラスターのページを開き、デフォルトのアプリケーションをクリックして ksqlDB Editor を開きます。

  2. ページの右側にある All available streams and tables セクションを確認します。

  3. KSQL_PROCESSING_LOG をクリックしてストリームを開きます。ストリームのスキーマが表示され、ネストされたデータ構造も含まれています。

    Confluent Control Center の統合された ksqlDB ストリームおよびテーブルペインのスクリーンショット

選択したレコードのダウンロード

クエリ結果のウィンドウでレコードを選択し、JSON ファイルとしてダウンロードすることができます。

  1. 以下のコードを編集用のウィンドウにコピーして Run をクリックします。

    SELECT * FROM pageviews_female EMIT CHANGES;
    
  2. クエリ結果のウィンドウで一時停止のボタンをクリックし、レコードを選択してからフォーマットを選んで、Download をクリックします。

    Confluent Control Center で SQL クエリ結果を JSON ファイルにダウンロードする方法を示すスクリーンショット

フロービューを使用してトポロジーを調べる方法

Control Center により、イベントが ksqlDB アプリケーションをどのように流れるかを表示することができます。

  1. ksqlDB ページで Flow をクリックします。

    Confluent Control Center の ksqlDB Flow View を示すスクリーンショット
  2. グラフ内の PAGEVIEWS_ENRICHED ノードをクリックすると、現在のメッセージやスキーマをはじめとする PAGEVIEWS_ENRICHED ストリームの詳細が表示されます。

    Confluent Control Center の ksqlDB Flow View を示すスクリーンショット
  3. ksqlDB アプリケーションのトポロジーに関する詳細を参照するには、グラフ内の他のノードをクリックします。

クリーンアップ

シャットダウンとクリーンアップのタスクを実行します。

  • それぞれのコマンドウィンドウで Ctrl + C キーを押すことにより、実行中の各プロデューサー(users トピックや pageviews トピックへデータを送信する)を停止することができます。
  • Confluent Platform を停止するには、「confluent local services stop」と入力します。
  • 別のテストを最初から実行する前に既存のデータ(トピック、スキーマ、メッセージ)を削除する場合は、「confluent local destroy」と入力します。