Kafka Streams デモアプリケーション

このデモでは、Apache Kafka® Streams API(ソースコード)および ksqlDB(ブログ記事「Hands on: Building a Streaming Application with KSQL」とビデオ「Demo: Build a Streaming Application with ksqlDB」を参照)の実際の動作を紹介します。

このミュージックアプリケーションでは、音楽ジャンルごとにトップ 5 の楽曲を継続的にリアルタイムで計算する、シンプルなミュージックチャートアプリケーションの構築方法を示します。Kafka の 対話型クエリ 機能と REST API を使用して、最新の処理結果である最新のチャートを表示します。アプリケーションの入力データは Avro 形式で、そのソースとなるのは、再生イベントのストリーム("楽曲 X が再生された" など)と、楽曲のメタデータのストリーム("楽曲 X の作曲者はアーティスト Y")の 2 つです。

次の スクリーンキャスト は、このミュージックデモアプリケーションが動作しているところを紹介したものです。

前提条件

Confluent Platform は、さまざまなオペレーティングシステムおよびソフトウェアバージョンでサポートされています(詳細については「サポートされているバージョンおよび相互運用性」を参照)。このサンプルは、以下に説明する特定の構成で検証されています。この例の Windows での実行は正式にサポートされていませんが、GitHub のサンプルコードを変更して symlink .envconfig.env の内容で置き換えると、Windows でも動作する可能性があります。

  • macOS 10.15.3
  • Confluent Platform 6.0.6
  • Java 11.0.6 2020-01-14 LTS
  • bash バージョン 3.2.57
  • jq 1.6
  • (Docker ベースのサンプル)Docker バージョン 19.03.8
  • (Docker ベースのサンプル)Docker Compose docker-compose バージョン 1.25.4

ミュージックアプリケーションの開始

このデモを実行するには、次の手順を実行します。

  1. Docker の詳細 設定 で、Docker 専用のメモリーを 8 GB 以上に増やします(デフォルトは 2 GB)。

  2. Confluent の examples リポジトリのクリーンを作成します。

    git clone https://github.com/confluentinc/examples.git
    
  3. examples/music/ ディレクトリに移動し、Confluent Platform のリリースブランチに切り替えます。

    cd examples/music/
    git checkout 6.0.6-post
    
  4. すべての Docker コンテナーを起動する単一のコマンドを実行して、デモを開始します。これが完了するまでには 2 分ほどかかります。

    docker-compose up -d
    
  5. Confluent Control Center のログを表示し、実行されていることを確認します。

    docker-compose logs -f control-center | grep -i "Started NetworkTrafficServerConnector"
    
  6. Confluent Control Center のログで次の行を確認します。

    INFO Started NetworkTrafficServerConnector@5533dc72{HTTP/1.1,[http/1.1]}{0.0.0.0:9021} (org.eclipse.jetty.server.AbstractConnector)
    
  7. コンテナー内およびホストマシンから、Kafka ブローカー、Confluent Schema Registry、ZooKeeper の利用可能なエンドポイントを確認します。

    エンドポイント パラメーター 値(コンテナー内から) 値(ホストマシンから)
    Kafka ブローカー bootstrap.servers kafka:29092 localhost:9092
    Confluent Schema Registry schema.registry.url http://schema-registry:8081 http://localhost:8081
    ZooKeeper zookeeper.connect zookeeper:2181 localhost:2181

Kafka トピック内のメッセージの表示

docker-compose.yml ファイルはいくつかのコンテナーを立ち上げます。その 1 つに kafka-music-data-generator があります。このコンテナーは、2 つの Kafka トピックに Avro 形式のデータを書き込んで、ミュージックアプリケーションの入力データを継続的に生成します。これにより、Kafka ミュージックアプリケーションをテストするときに、リアルタイムのライブデータを表示することができます。

  • play-events : 再生イベントのストリーム("楽曲 X が再生された")
  • song-feed : 楽曲のメタデータのストリーム("楽曲 X の作曲者はアーティスト Y")
../../../_images/ksql-music-demo-source-data-explore.jpg
  1. ウェブブラウザーで Confluent Control Center に移動します。

  2. Topics をクリックし、メッセージを表示するトピックを選択します。

    ../../../_images/inspect_topic.png
  3. Confluent Control Center で ksqlDB クエリエディターを使用して、メッセージを表示することもできます。たとえば、play-events 内の Kafka メッセージを表示するには、ksqlDB をクリックし、エディターに次の ksqlDB クエリを入力します。

    PRINT "play-events";
    
  4. 以下のように出力されることを確認します。

    ../../../_images/topic_ksql_play_events.png
  5. エディターに次の ksqlDB クエリを入力して、song-feed 内の Kafka メッセージを表示します。

    PRINT "song-feed" FROM BEGINNING;
    
  6. コマンドラインツールを使用して Kafka トピック内のメッセージを表示することもできます。play-events トピック内のメッセージを表示するには、次のコマンドを実行します。

    docker-compose exec schema-registry \
        kafka-avro-console-consumer \
            --bootstrap-server kafka:29092 \
            --topic play-events
    
  7. 以下のように出力されることを確認します。

    {"song_id":11,"duration":60000}
    {"song_id":10,"duration":60000}
    {"song_id":12,"duration":60000}
    {"song_id":2,"duration":60000}
    {"song_id":1,"duration":60000}
    
  8. song-feed トピック内のメッセージを表示します。

    docker-compose exec schema-registry \
        kafka-avro-console-consumer \
            --bootstrap-server kafka:29092 \
            --topic song-feed \
            --from-beginning
    
  9. 以下のように出力されることを確認します。

    {"id":1,"album":"Fresh Fruit For Rotting Vegetables","artist":"Dead Kennedys","name":"Chemical Warfare","genre":"Punk"}
    {"id":2,"album":"We Are the League","artist":"Anti-Nowhere League","name":"Animal","genre":"Punk"}
    {"id":3,"album":"Live In A Dive","artist":"Subhumans","name":"All Gone Dead","genre":"Punk"}
    {"id":4,"album":"PSI","artist":"Wheres The Pope?","name":"Fear Of God","genre":"Punk"}
    

Kafka Streams アプリケーションの検証

Kafka ミュージックアプリケーションには、Docker コンテナー kafka-music-application で動作する REST API が実装されており、curl を使用して対話的にクエリを実行できます。

  1. Kafka ミュージックアプリケーションの実行中のアプリケーションインスタンスの一覧を取得します。

    curl -sXGET http://localhost:7070/kafka-music/instances | jq .
    
  2. 以下のように出力されることを確認します。

    [
      {
        "host": "kafka-music-application",
        "port": 7070,
        "storeNames": [
          "all-songs",
          "song-play-count",
          "top-five-songs",
          "top-five-songs-by-genre"
        ]
      }
    ]
    
  3. すべての音楽ジャンルを合わせた最新のトップ 5 の楽曲を取得します。

    curl -sXGET http://localhost:7070/kafka-music/charts/top-five | jq .
    
  4. 以下のように出力されることを確認します。

    [
      {
        "artist": "Jello Biafra And The Guantanamo School Of Medicine",
        "album": "The Audacity Of Hype",
        "name": "Three Strikes",
        "plays": 70
      },
      {
        "artist": "Hilltop Hoods",
        "album": "The Calling",
        "name": "The Calling",
        "plays": 67
      },
      ...
    ]
    
  5. Kafka ミュージックアプリケーション が公開する REST API では、他の操作もサポートしています。詳細については、ソースコードの上部に記述されている説明 を参照してください。

ksqlDB アプリケーションの作成

このセクションでは、Kafka Streams と同等の ksqlDB クエリを作成します。

../../../_images/ksql-music-demo-overview.jpg

これを進めるには、次の 2 つの方法があります。

  • 手動: チュートリアルに従って、各 ksqlDB コマンドを 1 つずつ順番に作成します。
  • 自動: ksqlDB の SCRIPT コマンドを使用して、すべての ksqlDB コマンド を送信します。

手動

ksqlDB のストリームとテーブルの名前には ksql_ というプレフィックスを付けます。これは必須ではありませんが、そうすることで ksqlDB クエリと Kafka Streams API バージョンのミュージックデモの名前の競合が回避され、両方を同時に実行できるようになります。

  1. play-events トピックから、ksql_playevents という新しいストリームを作成します。ksqlDB アプリケーションで Add a stream を選択します。

    ../../../_images/add_a_stream.png
  2. play-events トピックを選択し、各フィールドに次のように入力します。Confluent Control Center は Confluent Schema Registry に統合されているため、ksqlDB は自動的に song_id および duration のフィールドとそれぞれのデータ型を検出します。

    ../../../_images/ksql_playevents.png
  3. 新しく作成されたストリーム ksql_playevents に対して基本的なフィルター処理を実行します。例として、30 秒以上再生された楽曲を取得します。ksqlDB クエリエディターで、次のように入力します。

    SELECT * FROM ksql_playevents WHERE DURATION > 30000 EMIT CHANGES;
    
  4. 上記のクエリは永続的ではなく、この画面を閉じると終了します。クエリを永続化し、明示的に終了されるまで動作し続けるようにするには、上記のクエリの先頭に CREATE STREAM ... AS を追加します。ksqlDB クエリエディターで、次のように入力します。

    CREATE STREAM ksql_playevents_min_duration AS SELECT * FROM ksql_playevents WHERE DURATION > 30000;
    
  5. この永続的なクエリが Running Queries タブに表示されることを確認します。

  6. 元の Kafka のトピック song-feed には、Long 型のキーが含まれています。これは ksqlDB の BIGINT sql 型にマップされ、ID フィールドにキーのコピーが格納されます。元の Kafka のトピック song-feed から TABLE を作成します。

    CREATE TABLE ksql_song (SONG_ID BIGINT PRIMARY KEY) WITH (KAFKA_TOPIC='song-feed', VALUE_FORMAT='AVRO');
    
  7. テーブルの内容を表示して、この ksqlDB テーブル内のエントリに、楽曲の文字列 ID と一致する ROWKEY が含まれていることを確認します。

    SELECT * FROM ksql_song EMIT CHANGES limit 5;
    
  8. テーブルについて DESCRIBE を実行して、このトピックの関連付けられたフィールドを表示し、ID フィールドの型が BIGINT であることを確認します。

    ../../../_images/describe_songfeed.png
  9. ここまでの手順で、フィルター処理された再生イベントのストリーム ksql_playevents_min_duration と、楽曲のメタデータのテーブル ksql_song が作成されました。ストリームとテーブルの JOIN を使用して、再生イベントのストリームに楽曲のメタデータの情報を追加します。この結果として、それぞれの再生イベントに楽曲のタイトルなどの説明的な情報が組み合わされた、新しい再生イベントのストリームが作成されます。

    CREATE STREAM ksql_songplays AS SELECT plays.SONG_ID AS ID, ALBUM, ARTIST, NAME, GENRE, DURATION FROM ksql_playevents_min_duration plays LEFT JOIN ksql_song songs ON plays.SONG_ID = songs.SONG_ID;
    
  10. 1 AS KEYCOL という句が追加されていることに注目してください。これにより、各行に KEYCOL という新しいフィールドが作成され、値が 1 に設定されます。KEYCOL は、派生された他のストリームやテーブルで、後からグローバルに集約を実行するときに使用できます。

  11. ここまで来たら、最も再生回数の多い楽曲を表示するトップミュージックチャートを作成できます。上記で作成した ksql_songplays ストリームに対して COUNT 関数を使用します。最新 30 秒間の統計を確認できればいいので、すべての楽曲の再生イベントのカウントを 30 秒間隔で取得する WINDOW 句を追加します。

    CREATE TABLE ksql_songplaycounts30 WITH (PARTITIONS=1) AS SELECT ID AS K1, NAME AS K2, GENRE AS K3, AS_VALUE(ID) AS ID, AS_VALUE(NAME) AS NAME, AS_VALUE(GENRE) AS GENRE, COUNT(*) AS COUNT FROM ksql_songplays WINDOW TUMBLING (size 30 second) GROUP BY ID, NAME, GENRE;
    
  12. これで、データをリアルタイムで処理するストリーミングアプリケーションが構築されました。アプリケーションでは、再生イベントのストリームに楽曲のメタデータを追加し、トップカウントを生成しました。ダウンロードシステムでは、これらの ksqlDB クエリの結果を消費してさらに処理することができます。既に SQL セマンティクスに慣れている方には、このチュートリアルはそれほど難しくないと思います。

    SELECT * FROM ksql_songplaycounts30 EMIT CHANGES;
    
    ../../../_images/counts_results.png

自動

  1. ksqlDB の statements.sql を確認します。

    --The STREAM and TABLE names are prefixed with `ksql_` to enable you to run this demo
    --concurrently with the Kafka Streams Music Demo java application, to avoid conflicting names
    
    
    --The play-events Kafka topic is a feed of song plays, generated by KafkaMusicExampleDriver
    CREATE STREAM ksql_playevents WITH (KAFKA_TOPIC='play-events', VALUE_FORMAT='AVRO');
    
    --Filter the play events to only accept events where the duration is >= 30 seconds
    CREATE STREAM ksql_playevents_min_duration AS SELECT * FROM ksql_playevents WHERE DURATION > 30000;
    
    --The song-feed Kafka topic contains all of the songs available in the streaming service, generated by KafkaMusicExampleDriver
    CREATE TABLE ksql_song (SONG_ID BIGINT PRIMARY KEY) WITH (KAFKA_TOPIC='song-feed', VALUE_FORMAT='AVRO');
    
    --Join the plays with song as we will use it later for charting
    CREATE STREAM ksql_songplays AS SELECT plays.SONG_ID AS ID, ALBUM, ARTIST, NAME, GENRE, DURATION FROM ksql_playevents_min_duration plays LEFT JOIN ksql_song songs ON plays.SONG_ID = songs.SONG_ID;
    
    --Track song play counts in 30 second intervals, with a single partition for global view across multiple partitions (https://github.com/confluentinc/ksql/issues/1053)
    CREATE TABLE ksql_songplaycounts30 WITH (PARTITIONS=1) AS SELECT ID AS K1, NAME AS K2, GENRE AS K3, AS_VALUE(ID) AS ID, AS_VALUE(NAME) AS NAME, AS_VALUE(GENRE) AS GENRE, COUNT(*) AS COUNT FROM ksql_songplays WINDOW TUMBLING (size 30 second) GROUP BY ID, NAME, GENRE;
    --Convert TABLE to STREAM
    CREATE STREAM ksql_songplaycounts30stream (ID BIGINT, NAME VARCHAR, GENRE VARCHAR, COUNT BIGINT) WITH (kafka_topic='KSQL_SONGPLAYCOUNTS30', value_format='AVRO');
    
    --Track song play counts for all time, with a single partition for global view across multiple partitions (https://github.com/confluentinc/ksql/issues/1053)
    CREATE TABLE ksql_songplaycounts WITH (PARTITIONS=1) AS SELECT ID AS K1, NAME AS K2, GENRE AS K3, AS_VALUE(ID) AS ID, AS_VALUE(NAME) AS NAME, AS_VALUE(GENRE) AS GENRE, COUNT(*) AS COUNT FROM ksql_songplays GROUP BY ID, NAME, GENRE;
    --Convert TABLE to STREAM
    CREATE STREAM ksql_songplaycountsstream (ID BIGINT, NAME VARCHAR, GENRE VARCHAR, COUNT BIGINT) WITH (kafka_topic='KSQL_SONGPLAYCOUNTS', value_format='AVRO');
    
    --Top Five song counts for all time based on ksql_songplaycountsstream
    --At this time, `TOPK` does not support sorting by one column and selecting the value of another column (https://github.com/confluentinc/ksql/issues/403)
    --So the results are just counts but not names of the songs associated with the counts
    CREATE TABLE ksql_top5 AS SELECT 1 AS KEYCOL, TOPK(COUNT,5) FROM ksql_songplaycountsstream GROUP BY 1;
    --Top Five songs for each genre based on each WINDOW of ksql_songplaycounts
    CREATE TABLE ksql_top5bygenre AS SELECT GENRE, TOPK(COUNT,5) FROM ksql_songplaycountsstream GROUP BY GENRE;
    
    
  2. ksqlDB CLI を起動します。

    docker-compose exec ksqldb-cli ksql http://ksqldb-server:8088
    
  3. ksqlDB ステートメントを実行するスクリプト statements.sql を実行します。

    RUN SCRIPT '/tmp/statements.sql';
    

    出力には、次のように、空白のメッセージまたは Executing statement が表示されます。

     Message
    ---------
     Executing statement
    ---------
    

    RUN SCRIPT コマンドが完了したら、CTRL+D コマンドで ksqldb-cli を終了します。

ミュージックアプリケーションの停止

  1. 完了したら、忘れずにデモを終了します。

    docker-compose down
    

トラブルシューティング

  1. Docker コンテナーのステータスが Up ステートを示していることを確認します。

    docker-compose ps
    

    出力は以下のようになります。

                  Name                            Command                  State                            Ports
    -----------------------------------------------------------------------------------------------------------------------------------
    control-center                     /etc/confluent/docker/run        Up             0.0.0.0:9021->9021/tcp
    kafka                              /etc/confluent/docker/run        Up             0.0.0.0:29092->29092/tcp, 0.0.0.0:9092->9092/tcp
    kafka-music-application            bash -c echo Waiting for K ...   Up             0.0.0.0:7070->7070/tcp
    kafka-music-data-generator         bash -c echo Waiting for K ...   Up             7070/tcp
    ksqldb-cli                         /bin/sh                          Up
    ksqldb-server                      /etc/confluent/docker/run        Up (healthy)   0.0.0.0:8088->8088/tcp
    schema-registry                    /etc/confluent/docker/run        Up             0.0.0.0:8081->8081/tcp
    zookeeper                          /etc/confluent/docker/run        Up             0.0.0.0:2181->2181/tcp, 2888/tcp, 3888/tcp
    
  2. Confluent Control Center では、トピック、ストリーム、テーブルからのメッセージが新しく到着するたびに表示されます。このデモでは、kafka-music-data-generator という Docker コンテナーで動作するアプリケーションからデータが供給されます。Confluent Control Center でメッセージが表示されない場合は、このアプリケーションを再起動してみることをお勧めします。

    docker-compose restart kafka-music-data-generator