Kafka Streams デモアプリケーション¶
このデモでは、Apache Kafka® Streams API(ソースコード)および ksqlDB(ブログ記事「Hands on: Building a Streaming Application with KSQL」とビデオ「Demo: Build a Streaming Application with ksqlDB」を参照)の実際の動作を紹介します。
このミュージックアプリケーションでは、音楽ジャンルごとにトップ 5 の楽曲を継続的にリアルタイムで計算する、シンプルなミュージックチャートアプリケーションの構築方法を示します。Kafka の 対話型クエリ 機能と REST API を使用して、最新の処理結果である最新のチャートを表示します。アプリケーションの入力データは Avro 形式で、そのソースとなるのは、再生イベントのストリーム("楽曲 X が再生された" など)と、楽曲のメタデータのストリーム("楽曲 X の作曲者はアーティスト Y")の 2 つです。
次の スクリーンキャスト は、このミュージックデモアプリケーションが動作しているところを紹介したものです。
前提条件¶
Confluent Platform は、さまざまなオペレーティングシステムおよびソフトウェアバージョンでサポートされています(詳細については「サポートされているバージョンおよび相互運用性」を参照)。このサンプルは、以下に説明する特定の構成で検証されています。この例の Windows での実行は正式にサポートされていませんが、GitHub のサンプルコードを変更して symlink .env
を config.env の内容で置き換えると、Windows でも動作する可能性があります。
- macOS 10.15.3
- Confluent Platform 6.0.6
- Java 11.0.6 2020-01-14 LTS
- bash バージョン 3.2.57
- jq 1.6
- (Docker ベースのサンプル)Docker バージョン 19.03.8
- (Docker ベースのサンプル)Docker Compose docker-compose バージョン 1.25.4
ミュージックアプリケーションの開始¶
このデモを実行するには、次の手順を実行します。
Docker の詳細 設定 で、Docker 専用のメモリーを 8 GB 以上に増やします(デフォルトは 2 GB)。
Confluent の examples リポジトリのクリーンを作成します。
git clone https://github.com/confluentinc/examples.git
examples/music/
ディレクトリに移動し、Confluent Platform のリリースブランチに切り替えます。cd examples/music/ git checkout 6.0.6-post
すべての Docker コンテナーを起動する単一のコマンドを実行して、デモを開始します。これが完了するまでには 2 分ほどかかります。
docker-compose up -d
Confluent Control Center のログを表示し、実行されていることを確認します。
docker-compose logs -f control-center | grep -i "Started NetworkTrafficServerConnector"
Confluent Control Center のログで次の行を確認します。
INFO Started NetworkTrafficServerConnector@5533dc72{HTTP/1.1,[http/1.1]}{0.0.0.0:9021} (org.eclipse.jetty.server.AbstractConnector)
コンテナー内およびホストマシンから、Kafka ブローカー、Confluent Schema Registry、ZooKeeper の利用可能なエンドポイントを確認します。
エンドポイント パラメーター 値(コンテナー内から) 値(ホストマシンから) Kafka ブローカー bootstrap.servers
kafka:29092
localhost:9092
Confluent Schema Registry schema.registry.url
http://schema-registry:8081
http://localhost:8081
ZooKeeper zookeeper.connect
zookeeper:2181
localhost:2181
Kafka トピック内のメッセージの表示¶
docker-compose.yml ファイルはいくつかのコンテナーを立ち上げます。その 1 つに kafka-music-data-generator
があります。このコンテナーは、2 つの Kafka トピックに Avro 形式のデータを書き込んで、ミュージックアプリケーションの入力データを継続的に生成します。これにより、Kafka ミュージックアプリケーションをテストするときに、リアルタイムのライブデータを表示することができます。
play-events
: 再生イベントのストリーム("楽曲 X が再生された")song-feed
: 楽曲のメタデータのストリーム("楽曲 X の作曲者はアーティスト Y")
ウェブブラウザーで Confluent Control Center に移動します。
Topics
をクリックし、メッセージを表示するトピックを選択します。Confluent Control Center で ksqlDB クエリエディターを使用して、メッセージを表示することもできます。たとえば、
play-events
内の Kafka メッセージを表示するには、ksqlDB
をクリックし、エディターに次の ksqlDB クエリを入力します。PRINT "play-events";
以下のように出力されることを確認します。
エディターに次の ksqlDB クエリを入力して、
song-feed
内の Kafka メッセージを表示します。PRINT "song-feed" FROM BEGINNING;
コマンドラインツールを使用して Kafka トピック内のメッセージを表示することもできます。
play-events
トピック内のメッセージを表示するには、次のコマンドを実行します。docker-compose exec schema-registry \ kafka-avro-console-consumer \ --bootstrap-server kafka:29092 \ --topic play-events
以下のように出力されることを確認します。
{"song_id":11,"duration":60000} {"song_id":10,"duration":60000} {"song_id":12,"duration":60000} {"song_id":2,"duration":60000} {"song_id":1,"duration":60000}
song-feed
トピック内のメッセージを表示します。docker-compose exec schema-registry \ kafka-avro-console-consumer \ --bootstrap-server kafka:29092 \ --topic song-feed \ --from-beginning
以下のように出力されることを確認します。
{"id":1,"album":"Fresh Fruit For Rotting Vegetables","artist":"Dead Kennedys","name":"Chemical Warfare","genre":"Punk"} {"id":2,"album":"We Are the League","artist":"Anti-Nowhere League","name":"Animal","genre":"Punk"} {"id":3,"album":"Live In A Dive","artist":"Subhumans","name":"All Gone Dead","genre":"Punk"} {"id":4,"album":"PSI","artist":"Wheres The Pope?","name":"Fear Of God","genre":"Punk"}
Kafka Streams アプリケーションの検証¶
Kafka ミュージックアプリケーションには、Docker コンテナー kafka-music-application
で動作する REST API が実装されており、curl
を使用して対話的にクエリを実行できます。
Kafka ミュージックアプリケーションの実行中のアプリケーションインスタンスの一覧を取得します。
curl -sXGET http://localhost:7070/kafka-music/instances | jq .
以下のように出力されることを確認します。
[ { "host": "kafka-music-application", "port": 7070, "storeNames": [ "all-songs", "song-play-count", "top-five-songs", "top-five-songs-by-genre" ] } ]
すべての音楽ジャンルを合わせた最新のトップ 5 の楽曲を取得します。
curl -sXGET http://localhost:7070/kafka-music/charts/top-five | jq .
以下のように出力されることを確認します。
[ { "artist": "Jello Biafra And The Guantanamo School Of Medicine", "album": "The Audacity Of Hype", "name": "Three Strikes", "plays": 70 }, { "artist": "Hilltop Hoods", "album": "The Calling", "name": "The Calling", "plays": 67 }, ... ]
Kafka ミュージックアプリケーション が公開する REST API では、他の操作もサポートしています。詳細については、ソースコードの上部に記述されている説明 を参照してください。
ksqlDB アプリケーションの作成¶
このセクションでは、Kafka Streams と同等の ksqlDB クエリを作成します。
これを進めるには、次の 2 つの方法があります。
- 手動: チュートリアルに従って、各 ksqlDB コマンドを 1 つずつ順番に作成します。
- 自動: ksqlDB の
SCRIPT
コマンドを使用して、すべての ksqlDB コマンド を送信します。
手動¶
ksqlDB のストリームとテーブルの名前には ksql_
というプレフィックスを付けます。これは必須ではありませんが、そうすることで ksqlDB クエリと Kafka Streams API バージョンのミュージックデモの名前の競合が回避され、両方を同時に実行できるようになります。
play-events
トピックから、ksql_playevents
という新しいストリームを作成します。ksqlDB アプリケーションで Add a stream を選択します。play-events
トピックを選択し、各フィールドに次のように入力します。Confluent Control Center は Confluent Schema Registry に統合されているため、ksqlDB は自動的にsong_id
およびduration
のフィールドとそれぞれのデータ型を検出します。新しく作成されたストリーム
ksql_playevents
に対して基本的なフィルター処理を実行します。例として、30 秒以上再生された楽曲を取得します。ksqlDB クエリエディターで、次のように入力します。SELECT * FROM ksql_playevents WHERE DURATION > 30000 EMIT CHANGES;
上記のクエリは永続的ではなく、この画面を閉じると終了します。クエリを永続化し、明示的に終了されるまで動作し続けるようにするには、上記のクエリの先頭に
CREATE STREAM ... AS
を追加します。ksqlDB クエリエディターで、次のように入力します。CREATE STREAM ksql_playevents_min_duration AS SELECT * FROM ksql_playevents WHERE DURATION > 30000;
この永続的なクエリが
Running Queries
タブに表示されることを確認します。元の Kafka のトピック
song-feed
には、Long
型のキーが含まれています。これは ksqlDB のBIGINT
sql 型にマップされ、ID フィールドにキーのコピーが格納されます。元の Kafka のトピックsong-feed
からTABLE
を作成します。CREATE TABLE ksql_song (SONG_ID BIGINT PRIMARY KEY) WITH (KAFKA_TOPIC='song-feed', VALUE_FORMAT='AVRO');
テーブルの内容を表示して、この ksqlDB テーブル内のエントリに、楽曲の文字列 ID と一致する
ROWKEY
が含まれていることを確認します。SELECT * FROM ksql_song EMIT CHANGES limit 5;
テーブルについて
DESCRIBE
を実行して、このトピックの関連付けられたフィールドを表示し、ID
フィールドの型がBIGINT
であることを確認します。ここまでの手順で、フィルター処理された再生イベントのストリーム
ksql_playevents_min_duration
と、楽曲のメタデータのテーブルksql_song
が作成されました。ストリームとテーブルのJOIN
を使用して、再生イベントのストリームに楽曲のメタデータの情報を追加します。この結果として、それぞれの再生イベントに楽曲のタイトルなどの説明的な情報が組み合わされた、新しい再生イベントのストリームが作成されます。CREATE STREAM ksql_songplays AS SELECT plays.SONG_ID AS ID, ALBUM, ARTIST, NAME, GENRE, DURATION FROM ksql_playevents_min_duration plays LEFT JOIN ksql_song songs ON plays.SONG_ID = songs.SONG_ID;
1 AS KEYCOL
という句が追加されていることに注目してください。これにより、各行にKEYCOL
という新しいフィールドが作成され、値が 1 に設定されます。KEYCOL
は、派生された他のストリームやテーブルで、後からグローバルに集約を実行するときに使用できます。ここまで来たら、最も再生回数の多い楽曲を表示するトップミュージックチャートを作成できます。上記で作成した
ksql_songplays
ストリームに対してCOUNT
関数を使用します。最新 30 秒間の統計を確認できればいいので、すべての楽曲の再生イベントのカウントを 30 秒間隔で取得するWINDOW
句を追加します。CREATE TABLE ksql_songplaycounts30 WITH (PARTITIONS=1) AS SELECT ID AS K1, NAME AS K2, GENRE AS K3, AS_VALUE(ID) AS ID, AS_VALUE(NAME) AS NAME, AS_VALUE(GENRE) AS GENRE, COUNT(*) AS COUNT FROM ksql_songplays WINDOW TUMBLING (size 30 second) GROUP BY ID, NAME, GENRE;
これで、データをリアルタイムで処理するストリーミングアプリケーションが構築されました。アプリケーションでは、再生イベントのストリームに楽曲のメタデータを追加し、トップカウントを生成しました。ダウンロードシステムでは、これらの ksqlDB クエリの結果を消費してさらに処理することができます。既に SQL セマンティクスに慣れている方には、このチュートリアルはそれほど難しくないと思います。
SELECT * FROM ksql_songplaycounts30 EMIT CHANGES;
自動¶
ksqlDB の statements.sql を確認します。
--The STREAM and TABLE names are prefixed with `ksql_` to enable you to run this demo --concurrently with the Kafka Streams Music Demo java application, to avoid conflicting names --The play-events Kafka topic is a feed of song plays, generated by KafkaMusicExampleDriver CREATE STREAM ksql_playevents WITH (KAFKA_TOPIC='play-events', VALUE_FORMAT='AVRO'); --Filter the play events to only accept events where the duration is >= 30 seconds CREATE STREAM ksql_playevents_min_duration AS SELECT * FROM ksql_playevents WHERE DURATION > 30000; --The song-feed Kafka topic contains all of the songs available in the streaming service, generated by KafkaMusicExampleDriver CREATE TABLE ksql_song (SONG_ID BIGINT PRIMARY KEY) WITH (KAFKA_TOPIC='song-feed', VALUE_FORMAT='AVRO'); --Join the plays with song as we will use it later for charting CREATE STREAM ksql_songplays AS SELECT plays.SONG_ID AS ID, ALBUM, ARTIST, NAME, GENRE, DURATION FROM ksql_playevents_min_duration plays LEFT JOIN ksql_song songs ON plays.SONG_ID = songs.SONG_ID; --Track song play counts in 30 second intervals, with a single partition for global view across multiple partitions (https://github.com/confluentinc/ksql/issues/1053) CREATE TABLE ksql_songplaycounts30 WITH (PARTITIONS=1) AS SELECT ID AS K1, NAME AS K2, GENRE AS K3, AS_VALUE(ID) AS ID, AS_VALUE(NAME) AS NAME, AS_VALUE(GENRE) AS GENRE, COUNT(*) AS COUNT FROM ksql_songplays WINDOW TUMBLING (size 30 second) GROUP BY ID, NAME, GENRE; --Convert TABLE to STREAM CREATE STREAM ksql_songplaycounts30stream (ID BIGINT, NAME VARCHAR, GENRE VARCHAR, COUNT BIGINT) WITH (kafka_topic='KSQL_SONGPLAYCOUNTS30', value_format='AVRO'); --Track song play counts for all time, with a single partition for global view across multiple partitions (https://github.com/confluentinc/ksql/issues/1053) CREATE TABLE ksql_songplaycounts WITH (PARTITIONS=1) AS SELECT ID AS K1, NAME AS K2, GENRE AS K3, AS_VALUE(ID) AS ID, AS_VALUE(NAME) AS NAME, AS_VALUE(GENRE) AS GENRE, COUNT(*) AS COUNT FROM ksql_songplays GROUP BY ID, NAME, GENRE; --Convert TABLE to STREAM CREATE STREAM ksql_songplaycountsstream (ID BIGINT, NAME VARCHAR, GENRE VARCHAR, COUNT BIGINT) WITH (kafka_topic='KSQL_SONGPLAYCOUNTS', value_format='AVRO'); --Top Five song counts for all time based on ksql_songplaycountsstream --At this time, `TOPK` does not support sorting by one column and selecting the value of another column (https://github.com/confluentinc/ksql/issues/403) --So the results are just counts but not names of the songs associated with the counts CREATE TABLE ksql_top5 AS SELECT 1 AS KEYCOL, TOPK(COUNT,5) FROM ksql_songplaycountsstream GROUP BY 1; --Top Five songs for each genre based on each WINDOW of ksql_songplaycounts CREATE TABLE ksql_top5bygenre AS SELECT GENRE, TOPK(COUNT,5) FROM ksql_songplaycountsstream GROUP BY GENRE;
ksqlDB CLI を起動します。
docker-compose exec ksqldb-cli ksql http://ksqldb-server:8088
ksqlDB ステートメントを実行するスクリプト statements.sql を実行します。
RUN SCRIPT '/tmp/statements.sql';
出力には、次のように、空白のメッセージまたは
Executing statement
が表示されます。Message --------- Executing statement ---------
RUN SCRIPT
コマンドが完了したら、CTRL+D
コマンドでksqldb-cli
を終了します。
ミュージックアプリケーションの停止¶
完了したら、忘れずにデモを終了します。
docker-compose down
トラブルシューティング¶
Docker コンテナーのステータスが
Up
ステートを示していることを確認します。docker-compose ps
出力は以下のようになります。
Name Command State Ports ----------------------------------------------------------------------------------------------------------------------------------- control-center /etc/confluent/docker/run Up 0.0.0.0:9021->9021/tcp kafka /etc/confluent/docker/run Up 0.0.0.0:29092->29092/tcp, 0.0.0.0:9092->9092/tcp kafka-music-application bash -c echo Waiting for K ... Up 0.0.0.0:7070->7070/tcp kafka-music-data-generator bash -c echo Waiting for K ... Up 7070/tcp ksqldb-cli /bin/sh Up ksqldb-server /etc/confluent/docker/run Up (healthy) 0.0.0.0:8088->8088/tcp schema-registry /etc/confluent/docker/run Up 0.0.0.0:8081->8081/tcp zookeeper /etc/confluent/docker/run Up 0.0.0.0:2181->2181/tcp, 2888/tcp, 3888/tcp
Confluent Control Center では、トピック、ストリーム、テーブルからのメッセージが新しく到着するたびに表示されます。このデモでは、
kafka-music-data-generator
という Docker コンテナーで動作するアプリケーションからデータが供給されます。Confluent Control Center でメッセージが表示されない場合は、このアプリケーションを再起動してみることをお勧めします。docker-compose restart kafka-music-data-generator