Confluent Platform のクイックスタート

Confluent Platform といくつかの SQL ステートメントを使用して、サンプルのデータストリームを処理するリアルタイムアプリケーションを構築します。

Confluent Control Center 内の ksqlDB Flow ビュー

このクイックスタートでは、以下のことを行います。

すべて終えると、よく使われる SQL ステートメントを使用して、データストリームを消費し、処理を行うリアルタイムアプリができあがります。

ちなみに

  • 以下の手順では Docker または tar アーカイブを使用しますが、Confluent Platform をインストールするには、パッケージマネージャーや systemd、Kubernetes、Ansible など、他にも方法があります。詳細については「オンプレミスのデプロイ」を参照してください。
  • フルマネージド型の環境で Confluent を実行する場合は、Confluent Cloud のクイックスタート をお試しください。

前提条件

Docker と tar アーカイブのどちらを使用するかによって、Confluent Platform のインストールに関する考慮事項や要件に違いがあります。

以下の手順では、Docker および Docker Compose を使用して Confluent Platform をダウンロードし、実行します。

考慮事項
  • ダウンロードに比較的長い時間(最大で数分程度)がかかります。
  • 最短のステップ数でインストールと起動ができます。
  • dockerdocker-compose が必要です。
  • アンインストールには docker system prune を使用します。
前提条件
  • Docker
    • Docker バージョン 1.11 またはそれ以降が インストールされ動作している
    • Docker Compose が インストール済みである 。Docker Compose は、Docker for Mac および Docker for Windows でデフォルトでインストールされます。
    • Mac: Docker のメモリーが少なくとも 6 GB 割り当てられていること(Mac)。Docker Desktop for Mac を使用しているとき、Docker メモリーの割り当てはデフォルトで 2 GB です。Docker Desktop アプリでデフォルトの割り当てを 6 GB に変更するには、PreferencesResourcesAdvanced の順に移動します。
    • Windows: Docker Desktop for Windows は WSL 2 上で実行されます。詳細については、『How to Run Confluent on Windows in Minutes』を参照してください。
  • インターネット接続
  • Confluent Platform で現在サポートされる オペレーティングシステム
  • Docker でのネットワークと Kafka
    • 内部コンポーネントと外部コンポーネントが Docker ネットワークに通信できるように、ホストとポートを構成します。詳細については、こちらの記事 を参照してください。
  • curl のインストール(任意)
    • 以下の手順では、Docker Compose ファイルをダウンロードします。ダウンロードにはさまざまな方法がありますが、この手順では、ファイルのダウンロードに使用できる具体的なコマンドとして curl を説明します。

ステップ 1: Confluent Platform のダウンロードと起動

このステップでは、Docker イメージまたはアーカイブファイルの形式で Confluent Platform のバイナリをダウンロードします。

  1. 次のように、Confluent Platform all-in-one Docker Compose ファイル の内容をダウンロードするか、コピーします。

    curl --silent --output docker-compose.yml \
      https://raw.githubusercontent.com/confluentinc/cp-all-in-one/7.1.1-post/cp-all-in-one/docker-compose.yml
    
  2. -d オプションを指定して Confluent Platform スタックを起動し、デタッチモードで実行します。

    docker-compose up -d
    

    Confluent Platform の各コンポーネントはそれぞれ別のコンテナーで起動されます。出力は以下のようになります。

    Creating network "cp-all-in-one_default" with the default driver
    Creating zookeeper ... done
    Creating broker    ... done
    Creating schema-registry ... done
    Creating rest-proxy      ... done
    Creating connect         ... done
    Creating ksql-datagen    ... done
    Creating ksqldb-server   ... done
    Creating control-center  ... done
    Creating ksqldb-cli      ... done
    
  3. サービスが稼働中であるかを検証します。

    docker-compose ps
    

    出力は以下のようになります。

         Name                    Command               State                Ports
    ------------------------------------------------------------------------------------------
    broker            /etc/confluent/docker/run        Up      0.0.0.0:29092->29092/tcp,
                                                               0.0.0.0:9092->9092/tcp
    connect           /etc/confluent/docker/run        Up      0.0.0.0:8083->8083/tcp,
                                                               9092/tcp
    control-center    /etc/confluent/docker/run        Up      0.0.0.0:9021->9021/tcp
    ksqldb-cli        /bin/sh                          Up
    ksql-datagen      bash -c echo Waiting for K ...   Up
    ksqldb-server     /etc/confluent/docker/run        Up      0.0.0.0:8088->8088/tcp
    rest-proxy        /etc/confluent/docker/run        Up      0.0.0.0:8082->8082/tcp
    schema-registry   /etc/confluent/docker/run        Up      0.0.0.0:8081->8081/tcp
    zookeeper         /etc/confluent/docker/run        Up      0.0.0.0:2181->2181/tcp,
                                                               2888/tcp, 3888/tcp
    

    数分後、ステートが Up になっていないコンポーネントがある場合は、再度 docker-compose up -d コマンドを実行するか、docker-compose restart <image-name> を試します。以下に例を示します。

    docker-compose restart control-center
    

ステップ 2: データを保管する Kafka トピックの作成

Confluent Platform では、リアルタイムストリーミングイベントは Kafka トピックに保管されます。トピックとは、基本的には追加のみのログです。詳細については、Apache Kafka の概要 を参照してください。

このステップでは、Confluent Control Center を使用して 2 つのトピックを作成します。Control Center は、本稼働環境のデータパイプラインおよびイベントストリーミングアプリケーションを構築およびモニタリングするための機能を備えています。

トピック名は pageviewsusers です。この後の手順では、これらのトピックに対してデータを生成するデータジェネレーターを作成します。

pageviews トピックの作成

Confluent Control Center では、UI で数回クリックするだけでトピックを作成できます。

  1. Control Center (http://localhost:9021)に移動します。Control Center が起動し、読み込みが完了するまでに 1 ~ 2 分かかる場合があります。

  2. controlcenter.cluster タイルをクリックします。

    Confluent Control Center のクラスタータイル
  3. ナビゲーションメニューで Topics をクリックしてトピックのリストを開きます。Add a topic をクリックして pageviews トピックの作成を開始します。

    Confluent Control Center の Topics ページ
  4. Topic name フィールドに pageviews と入力し、Create with defaults をクリックします。トピック名では大文字と小文字が区別されます。**

    Confluent Control Center での Kafka トピックの作成

users トピックの作成

前のステップと同じ手順を実行して、users トピックを作成します。

  1. ナビゲーションメニューで Topics をクリックしてトピックのリストを開きます。Add a topic をクリックして users トピックの作成を開始します。

  2. Topic name フィールドに users と入力し、Create with defaults をクリックします。

  3. users ページで、Configuration をクリックして users トピックの詳細を確認します。

    Confluent Control Center のトピックの Configuration ページ

ステップ 3: 模擬データの生成

In Confluent Platform, you get events from an external source by using a connector, which enables streaming large volumes of data to and from your cluster. Confluent publishes many connectors for integrating with external systems, like MongoDb and Elasticsearch. For more information, see Connect Overview.

このステップでは、Datagen Source Connector を実行して模擬データを生成します。模擬データは、前のステップで作成した pageviews トピックおよび users トピックに保管されます。

  1. ナビゲーションメニューで Connect をクリックします。

  2. Connect clusters リストで connect-default クラスターをクリックします。

  3. Add connector をクリックして、pageviews データ用のコネクターを作成します。

  4. DatagenConnector タイルを選択します。

    ちなみに

    ソースコネクターのみを表示するには、Filter by category をクリックし、Sources を選択します。

  5. 名前 フィールドで、コネクターの名前として datagen-pageviews を入力します。

  6. 次の構成値を入力します。

    • Key converter class: org.apache.kafka.connect.storage.StringConverter
    • kafka.topic: pageviews
    • max.interval: 100
    • quickstart: pageviews
  7. Next をクリックして、コネクターの構成を確認します。設定に問題がない場合は、Launch をクリックします。

    Confluent Control Center でのコネクターの構成の確認

Datagen Source Connector コネクターの 2 つ目のインスタンスを実行し、users トピックに対して模擬データを生成します。

  1. ナビゲーションメニューで Connect をクリックします。

  2. Connect clusters リストで connect-default をクリックします。

  3. Add connector をクリックします。

  4. DatagenConnector タイルを選択します。

  5. 名前 フィールドで、コネクターの名前として datagen-users を入力します。

  6. 次の構成値を入力します。

    • Key converter class: org.apache.kafka.connect.storage.StringConverter
    • kafka.topic: users
    • max.interval: 1000
    • quickstart: users
  7. Next をクリックして、コネクターの構成を確認します。設定に問題がない場合は、Launch をクリックします。

  8. ナビゲーションメニューで Topics をクリックし、リストにある users をクリックします。

  9. Messages をクリックして、datagen-users コネクターが users トピックに対してデータを生成していることを確認します。

    Confluent Control Center の Topics ページに表示される受信メッセージ

トピックのスキーマの検査

デフォルトでは、Datagen Source Connector では Avro フォーマットでデータが生成されます。このフォーマットで pageviews および users メッセージのスキーマが定義されます。

Schema Registry により、クラスターに送信されたメッセージのスキーマが正しいことが確認されます。詳細については、「Schema Registry の概要」を参照してください。

  1. ナビゲーションメニューで Topics をクリックし、トピックリストにある pageviews をクリックします。

  2. Schema をクリックして、pageviews メッセージ値に適用される Avro スキーマを確認します。

    出力は以下のようになります。

    {
      "connect.name": "ksql.pageviews",
      "fields": [
        {
          "name": "viewtime",
          "type": "long"
        },
        {
          "name": "userid",
          "type": "string"
        },
        {
          "name": "pageid",
          "type": "string"
        }
      ],
      "name": "pageviews",
      "namespace": "ksql",
      "type": "record"
    }
    

ステップ 4: SQL ステートメントを使用したストリームおよびテーブルの作成

このステップでは、よく使われる SQL 構文を使用して、pageviews トピックのストリームおよび users トピックのテーブルを作成します。トピックにストリームまたはテーブルを登録すると、そのストリームまたはテーブルを SQL ステートメントで使用できます。

注釈

ストリーム は、これまでに発生した一連のファクト(イベント)を表す、変更不可で追加のみのコレクションです。一度ストリームに挿入された行を変更することはできません。新しい行をストリームの末尾に追加することができますが、既存の行をアップグレードまたは削除することはできません。

テーブル は、時間の経過に伴う変更をモデル化した、変更可能なコレクションです。キーごとの最新のデータを表示するには、行キーを使用します。各キーの最新の行を除くすべての行が定期的に削除されます。また、各行にタイムスタンプがあるため、"ウィンドウ化" されたテーブルを定義できます。これにより、集約や結合などのステートフルな操作で、同じキーを持つレコードを期間ごとにグループ化する方法を制御できます。ウィンドウはレコードキーで追跡されます。

ストリームとテーブルを組み合わせることでデータベースが実現されます。詳細については、『Stream Processing』を参照してください。

ストリーム処理アプリケーション用のデータベースである ksqlDB に SQL エンジンが実装されています。以下のステップでは、よく使われる SQL 構文を使用してリアルタイムデータを処理できる ksqlDB アプリケーションを作成する方法を説明します。

このサンプルアプリでは、次のストリーム処理操作を示します。

  • pageviews ストリームと users テーブルを 結合 して、拡張されたページビューイベントのストリームを作成します。

  • 拡張されたストリームに region フィールドの フィルター を適用します。

  • フィルターを適用したストリームの ウィンドウ化されたビュー を作成して、最近更新された行のみが表示されるようにします。ウィンドウの サイズ は 30 秒とします。

    ちなみに

    これらの処理ステップは、3 つの SQL ステートメントのみで実装されます。

以下のステップでは、CREATE STREAMCREATE TABLE のステートメントを使用して、pageviews トピックおよび users トピックのテーブルにストリームを登録する方法を説明します。ストリームまたはテーブルをトピックに登録すると、トピックのデータに対して SQL クエリを実行できます。

  1. ナビゲーションメニューで ksqlDB をクリックします。

  2. ksqlDB アプリケーションをクリックして、ksqlDB ページを開きます。SQL ステートメントの編集用、および作成したストリームとテーブルのモニタリング用のタブがあります。

  3. 以下の SQL をコピーして、エディターウィンドウに貼り付けます。このステートメントは、pageviews トピックに pageviews_stream という名前のストリームを登録します。ストリーム名およびテーブル名では、大文字と小文字が区別されません。

    CREATE STREAM pageviews_stream
      WITH (KAFKA_TOPIC='pageviews', VALUE_FORMAT='AVRO');
    
    Confluent Control Center に表示された ksqlDB クエリの結果
  4. Run query をクリックしてステートメントを実行します。結果ウィンドウの出力は次のようになります。

    {
      "@type": "currentStatus",
      "statementText": "CREATE STREAM PAGEVIEWS_STREAM (VIEWTIME BIGINT, USERID STRING, PAGEID STRING) WITH (KAFKA_TOPIC='pageviews', KEY_FORMAT='KAFKA', VALUE_FORMAT='AVRO', VALUE_SCHEMA_ID=2);",
      "commandId": "stream/`PAGEVIEWS_STREAM`/create",
      "commandStatus": {
        "status": "SUCCESS",
        "message": "Stream created",
        "queryId": null
      },
      "commandSequenceNumber": 2,
      "warnings": []
    }
    
  5. SELECT クエリを使用して、データがストリームを移動していることを確認します。以下の SQL をエディターにコピーして Run query をクリックします。

    SELECT * FROM pageviews_stream EMIT CHANGES;
    

    出力は以下のようになります。

    Confluent Control Center に表示された ksqlDB の SELECT クエリの結果
  6. Stop をクリックして SELECT クエリを終了します。

    重要

    SELECT クエリを停止しても、ストリーム内のデータの移動は停止しません。

  7. 以下の SQL をエディターウィンドウにコピーして Run query をクリックします。このステートメントは、users トピックに users_table という名前のテーブルを登録します。

    CREATE TABLE users_table (id VARCHAR PRIMARY KEY)
      WITH (KAFKA_TOPIC='users', VALUE_FORMAT='AVRO');
    

    出力は以下のようになります。

    {
      "@type": "currentStatus",
      "statementText": "CREATE TABLE USERS_TABLE (ID STRING PRIMARY KEY, REGISTERTIME BIGINT, USERID STRING, REGIONID STRING, GENDER STRING) WITH (KAFKA_TOPIC='users', KEY_FORMAT='KAFKA', VALUE_FORMAT='AVRO', VALUE_SCHEMA_ID=2);",
      "commandId": "table/`USERS_TABLE`/create",
      "commandStatus": {
         "status": "SUCCESS",
         "message": "Table created",
         "queryId": null
      },
      "commandSequenceNumber": 4,
      "warnings": []
    }
    

    テーブルを登録する際には、PRIMARY KEY を指定する必要があります。ksqlDB のテーブルは、他の SQL システムのテーブルと同様です。テーブルにはゼロ行以上の行があり、各行は PRIMARY KEY で識別されます。

ストリームおよびテーブルのスキーマの確認

Schema Registry は Confluent Platform とともにインストールされ、スタックで実行されます。そのため、CREATE STREAM および CREATE TABLE のステートメントでメッセージスキーマを指定する必要はありません。Avro、JSON_SR、および Protobuf フォーマットでは、Schema Registry により自動的にスキーマが推測されます。

  1. Streams をクリックして、現在登録されているストリームを確認します。リストで PAGEVIEWS_STREAM をクリックして、ストリームの詳細を確認します。

    Schema セクションで、datagen-pageviews コネクターで生成されたメッセージ値のフィールド名とデータ型を確認できます。

    Confluent Control Center の ksqlDB ストリームの詳細
  2. Tables をクリックして、現在登録されているテーブルを確認します。リストで USERS_TABLE をクリックして、テーブルの詳細を確認します。

    Schema セクションで、datagen-users コネクターで生成されたメッセージ値のフィールド名とデータ型を確認できます。

    Confluent Control Center の ksqlDB テーブルの詳細

データを処理するクエリの作成

このステップでは、ページビューおよびユーザー行を検査して処理する SQL クエリを記述します。さまざまな種類のクエリを作成できます。

  • 一時クエリ: 永続的でない、クライアント側のクエリです。LIMIT 句を使用するか、手動で終了します。一時クエリでは新しいトピックは作成されません。
  • 永続的なクエリ: サーバー側のクエリであり、新しいストリームまたはテーブルが出力され、これらには新しいトピックが使用されます。TERMINATE ステートメントを発行するまで実行されます。永続的なクエリの構文では、CREATE STREAM AS SELECT または CREATE TABLE AS SELECT のステートメントが使用されます。
  • プッシュクエリ: サブスクリプションに対して継続的に結果を生成するクエリです。プッシュクエリの構文では、EMIT CHANGES キーワードが使用されます。プッシュクエリには、一時的なものと永続的なものがあります。
  • プルクエリ: 従来型のリレーショナルデータベースに対するクエリのように、「現在」時点の結果を取得するクエリです。プルクエリは 1 回ずつ実行され、テーブルの現在の状態が返されます。テーブルは新しいイベントの到着ごとに順次更新されるため、プルクエリは低レイテンシで実行されることが予想されます。プルクエリはすべて一時クエリです。

pageviews のクエリ

  1. Editor をクリックしてクエリエディターに戻ります。

  2. 以下の SQL をエディターにコピーして Run query をクリックします。このステートメントでは、pageviews_stream の 3 行を返す一時クエリを作成します。

    SELECT pageid FROM pageviews_stream EMIT CHANGES LIMIT 3;
    

    出力は以下のようになります。

    Confluent Control Center に表示された ksqlDB クエリの結果

    Card view アイコンまたは Table view アイコンをクリックして出力のレイアウトを変更します。

ストリームとテーブルの結合

このステップでは、pageviews_streamusers_tableuserid キーで結合する永続的なクエリを使用して user_pageviews という名前のストリームを作成します。この結合により、ページビューデータが拡張され、ページを参照したユーザーの情報が付加されます。結合された行は、新しい "シンク" トピックに書き込まれます。デフォルトでは、このシンクトピックの名前は新しいストリームと同じです。

ちなみに

WITH 句で KAFKA_TOPIC キーワードを使用すると、シンクトピックの名前を指定できます。

以下のステップでは、ストリームとテーブルを結合し、結合後のストリームの出力を表示する方法を説明します。

  1. 以下の SQL をエディターにコピーして Run query をクリックします。

    CREATE STREAM user_pageviews
      AS SELECT users_table.id AS userid, pageid, regionid, gender
        FROM pageviews_stream
        LEFT JOIN users_table ON pageviews_stream.userid = users_table.id
    EMIT CHANGES;
    

    出力は以下のようになります。

    {
      "@type": "currentStatus",
      "statementText": "CREATE STREAM USER_PAGEVIEWS WITH (KAFKA_TOPIC='USER_PAGEVIEWS', PARTITIONS=1, REPLICAS=1) AS SELECT\n  USERS_TABLE.ID USERID,\n  PAGEVIEWS_STREAM.PAGEID PAGEID,\n  USERS_TABLE.REGIONID REGIONID,\n  USERS_TABLE.GENDER GENDER\nFROM PAGEVIEWS_STREAM PAGEVIEWS_STREAM\nLEFT OUTER JOIN USERS_TABLE USERS_TABLE ON ((PAGEVIEWS_STREAM.USERID = USERS_TABLE.ID))\nEMIT CHANGES;",
      "commandId": "stream/`USER_PAGEVIEWS`/create",
      "commandStatus": {
         "status": "SUCCESS",
         "message": "Created query with ID CSAS_USER_PAGEVIEWS_5",
         "queryId": "CSAS_USER_PAGEVIEWS_5"
      },
      "commandSequenceNumber": 6,
      "warnings": []
    }
    

    ちなみに

    • 出力で強調表示されている行で、クエリの内部識別子が CSAS_USER_PAGEVIEWS_5 であることがわかります。先頭に "CSAS" が付いており、これは CREATE STREAM AS SELECT の頭文字を取ったものです。
    • テーブルの識別子には、先頭に "CTAS" が付いており、これは CREATE TABLE AS SELECT の頭文字を取ったものです。
  2. Streams をクリックして、アクセスできるストリームのリストを開きます。

  3. USER_PAGEVIEWS を選択し、Query stream をクリックします。

    エディターが開いて SELECT 一時クエリが表示され、user_pageviews ストリームのストリーミング出力が結果ウィンドウに表示されます。結合されたストリームには、pageviews_streamusers_table のすべてのフィールドが含まれます。

    注釈

    クエリでは EMIT CHANGES 構文が使用されており、これが "プッシュクエリ" であることがわかります。プッシュクエリでは、ストリームまたはテーブルに対するクエリを実行して、結果に対するサブスクリプションを使用できます。停止するまで継続されます。詳細については、プッシュクエリ を参照してください。

    Confluent Control Center に表示された ksqlDB の結合クエリの結果
  4. Stop をクリックして一時的なプッシュクエリを終了します。

ストリームへのフィルターの適用

このステップでは、pageviews_region_like_89 という名前のストリームを作成します。このストリームは、末尾が 8 または 9regionid 値を含む user_pageviews 行で構成されます。このクエリの結果は pageviews_filtered_r8_r9 という名前の新しいトピックに書き込まれます。トピック名は、KAFKA_TOPIC キーワードを使用してクエリ内で明示的に指定されています。

  1. 以下の SQL をエディターにコピーして Run query をクリックします。

    CREATE STREAM pageviews_region_like_89
      WITH (KAFKA_TOPIC='pageviews_filtered_r8_r9', VALUE_FORMAT='AVRO')
        AS SELECT * FROM user_pageviews
        WHERE regionid LIKE '%_8' OR regionid LIKE '%_9'
    EMIT CHANGES;
    

    出力は以下のようになります。

    {
      "@type": "currentStatus",
      "statementText": "CREATE STREAM PAGEVIEWS_REGION_LIKE_89 WITH (KAFKA_TOPIC='pageviews_filtered_r8_r9', PARTITIONS=1, REPLICAS=1, VALUE_FORMAT='AVRO') AS SELECT *\nFROM USER_PAGEVIEWS USER_PAGEVIEWS\nWHERE ((USER_PAGEVIEWS.REGIONID LIKE '%_8') OR (USER_PAGEVIEWS.REGIONID LIKE '%_9'))\nEMIT CHANGES;",
      "commandId": "stream/`PAGEVIEWS_REGION_LIKE_89`/create",
      "commandStatus": {
         "status": "SUCCESS",
         "message": "Created query with ID CSAS_PAGEVIEWS_REGION_LIKE_89_7",
         "queryId": "CSAS_PAGEVIEWS_REGION_LIKE_89_7"
      },
      "commandSequenceNumber": 8,
      "warnings": []
    }
    
  2. pageviews_region_like_89 ストリームのフィルターが適用された出力を確認します。以下の SQL をエディターにコピーして Run query をクリックします。

    SELECT * FROM pageviews_region_like_89 EMIT CHANGES;
    

    出力は以下のようになります。

    Confluent Control Center に表示された ksqlDB のフィルタークエリの結果

ウィンドウ化されたビューの作成

このステップでは、pageviews_per_region_89 という名前のテーブルを作成して、SIZE が 30 秒の タンブリングウィンドウ でリージョン 8 および 9 からのページビューの数をカウントします。クエリの結果は、行をカウントしてグループ化する集計であるため、結果はストリームではなくテーブルです。

  1. 以下の SQL をエディターにコピーして Run query をクリックします。

    CREATE TABLE pageviews_per_region_89 WITH (KEY_FORMAT='JSON')
      AS SELECT userid, gender, regionid, COUNT(*) AS numusers
        FROM pageviews_region_like_89
        WINDOW TUMBLING (SIZE 30 SECOND)
        GROUP BY userid, gender, regionid
        HAVING COUNT(*) > 1
    EMIT CHANGES;
    

    出力は以下のようになります。

    {
      "@type": "currentStatus",
      "statementText": "CREATE TABLE PAGEVIEWS_PER_REGION_89 WITH (KAFKA_TOPIC='PAGEVIEWS_PER_REGION_89', KEY_FORMAT='JSON', PARTITIONS=1, REPLICAS=1) AS SELECT\n  PAGEVIEWS_REGION_LIKE_89.GENDER GENDER,\n  PAGEVIEWS_REGION_LIKE_89.REGIONID REGIONID,\n  COUNT(*) NUMUSERS\nFROM PAGEVIEWS_REGION_LIKE_89 PAGEVIEWS_REGION_LIKE_89\nWINDOW TUMBLING ( SIZE 30 SECONDS ) \nGROUP BY PAGEVIEWS_REGION_LIKE_89.GENDER, PAGEVIEWS_REGION_LIKE_89.REGIONID\nHAVING (COUNT(*) > 1)\nEMIT CHANGES;",
      "commandId": "table/`PAGEVIEWS_PER_REGION_89`/create",
      "commandStatus": {
         "status": "SUCCESS",
         "message": "Created query with ID CTAS_PAGEVIEWS_PER_REGION_89_9",
         "queryId": "CTAS_PAGEVIEWS_PER_REGION_89_9"
      },
      "commandSequenceNumber": 10,
      "warnings": []
    }
    
  2. pageviews_per_region_89 テーブルのウィンドウ化された出力を確認します。以下の SQL をエディターにコピーして Run query をクリックします。

    SELECT * FROM pageviews_per_region_89 EMIT CHANGES;
    

    出力は以下のようになります。

    Confluent Control Center に表示された ksqlDB のウィンドウ化されたクエリの結果

    NUMUSERS 列は、30 秒の時間枠内にクリックしたユーザーの数を示しています。

プルクエリを使用したテーブルのスナップショットの作成

プルクエリを使用して、テーブルの現在のステートを取得できます。プルクエリでは、クエリの発行時点での指定のキーの行が返されます。プルクエリは 1 回実行されると終了します。

このステップでは、pageviews_per_region_89 テーブルに対してクエリを実行して、Region_9User_1 を含むすべての行を取得します。

以下の SQL をエディターにコピーして Run query をクリックします。

SELECT * FROM pageviews_per_region_89
  WHERE userid = 'User_1' AND gender='FEMALE' AND regionid='Region_9';

出力は以下のようになります。

Confluent Control Center に表示された ksqlDB のプルクエリの結果

ストリームとテーブルの確認

  1. All available streams and tables ペインには、アクセスできるすべてのストリームおよびテーブルが表示されます。PAGEVIEWS_PER_REGION をクリックして、pageviews_per_region テーブルのフィールドを確認します。

    Confluent Control Center の All available streams and tables ペイン
  2. All available streams and tables セクションで KSQL_PROCESSING_LOG をクリックすると、処理ログのフィールドが表示されます。これには、ネストしたデータ構造も含まれます。処理ログでは、SQL ステートメントの処理中に発生したエラーを確認できます。他のストリームと同様に、処理ログに対してクエリを実行することができます。詳細については、『Processing log』を参照してください。

  3. Persistent queries をクリックして、作成したストリームとテーブルを確認します。

    Confluent Control Center の永続的なクエリの詳細ペイン

    このページでは、クエリが実行されているかどうかの確認、クエリの情報の確認、実行中のクエリの終了ができます。

ステップ 5: アプリのストリームトポロジーの視覚化

作成したストリーミングアプリケーションでは、Datagen コネクターから pageviews トピックおよび users トピックにイベントが流れます。行に対して結合とフィルターの処理が行われ、最後のステップでは、ストリーミングデータのテーブルビューで行が集計されます。

フロービューを使用してシステム全体をエンドツーエンドで確認することができます。

  1. Flow をクリックして、フロービューを開きます。アプリのストリームトポロジーが表示されます。ストリーム、テーブル、それらを作成するために実行したステートメントを確認できます。

  2. USER_PAGEVIEWS をクリックして、結合されたストリームを確認します。

    Confluent Control Center 内の ksqlDB Flow ビュー
  3. 図の中の他のノードをクリックして、アプリ内を流れるデータを確認します。

ステップ 6: アンインストールとクリーンアップ

Confluent Platform の操作が済んだら、簡単な手順で削除できます。削除すると、ストレージやその他のシステムリソースが解放されます。

Docker での作業が完了したら、Docker コンテナーおよびイメージを停止し、削除することができます。

  1. 以下のコマンドを実行して、Confluent の Docker コンテナーを停止します。

    docker-compose stop
    
  2. Docker コンテナーを停止した後に、以下のコマンドを実行して Docker システムを削除します。これらのコマンドを実行すると、コンテナー、ネットワーク、ボリューム、イメージが削除され、ディスク領域が解放されます。

    docker system prune -a --volumes --filter "label=io.confluent.docker"
    

    詳細については、Docker の公式ドキュメントを参照してください。

次のステップ

  • このクイックスタートの自動化バージョン を実行できます。これは、Confluent Platform のローカルインストール向けです。
  • Confluent 開発者: ここに用意されているさまざまなコースで Confluent Platform のプログラミングを学べます。
  • Confluent Education: さまざまなリソースやハンズオントレーニング、認定試験など、トレーニングや認定に関するガイダンスが用意されています。
  • AWS ワークショップ: フルマネージド型の Confluent Cloud on AWS を使用して、リアルタイムにクレジットアプリケーションを処理するサンプルアプリを作成できます。
  • ksqlDB ドキュメント: ストリーミング ETL、リアルタイムモニタリング、異常検出などのユースケースについて、ksqlDB を使用したデータ処理について確認できます。一連の スクリプト化されたデモ により、ksqlDB の使用方法も参照してください。
  • Kafka チュートリアル: ステップバイステップの指示に従って、基本的な Kafka、Kafka Streams、ksqlDB のチュートリアルを試してみることができます。
  • Kafka Streams ドキュメント: ストリーム処理アプリケーションを Java または Scala で構築する方法を確認できます。
  • Kafka Connect ドキュメント: Kafka を他のシステムと連携させ、すぐに使用できるコネクター をダウンロードして、Kafka 内外のデータをリアルタイムで簡単に取り込む方法を確認できます。
  • Kafka クライアントドキュメント: Go、Python、.NET、C/C++ などのプログラミング言語を使用して、Kafka に対してデータの読み取りおよび書き込みを行う方法を確認できます。
  • ビデオ、デモ、および参考文献: Confluent Platform のチュートリアルやサンプルを試したり、デモやスクリーンキャストを参照したり、ホワイトペーパーやブログで調べたりすることができます。