Important
You are viewing documentation for an older version of Confluent Platform. For the latest, click here.
|ak| Streams Demo Application¶
This demo showcases |ak-tm| Streams API (source code) and ksqlDB (see blog post Hands on: Building a Streaming Application with KSQL and video Demo: Build a Streaming Application with ksqlDB).
The music application demonstrates how to build a simple music charts application that continuously computes, in real-time, the latest charts such as Top 5 songs per music genre. It exposes its latest processing results – the latest charts – via the |ak| Interactive Queries feature and a REST API. The application’s input data is in Avro format and comes from two sources: a stream of play events (think: “song X was played”) and a stream of song metadata (“song X was written by artist Y”).
The following screencast shows a live bit of the music demo application:
Prerequisites¶
Start the music application¶
To run this demo, complete the following steps:
In Docker’s advanced settings, increase the memory dedicated to Docker to at least 8GB (default is 2GB).
Clone the Confluent examples repository:
git clone https://github.com/confluentinc/examples.git
Navigate to the
examples/music/
directory and switch to the Confluent Platform release branch:cd examples/music/ git checkout |release_post_branch|
Start the demo by running a single command that brings up all the Docker containers. This takes about 2 minutes to complete.
docker-compose up -d
View the Confluent Control Center logs and validate that it is running.
docker-compose logs -f control-center | grep -i "Started NetworkTrafficServerConnector"
Verify that you see in the Confluent Control Center logs:
INFO Started NetworkTrafficServerConnector@5533dc72{HTTP/1.1,[http/1.1]}{0.0.0.0:9021} (org.eclipse.jetty.server.AbstractConnector)
Note the available endpoints of the |ak| brokers, Confluent Schema Registry, and ZooKeeper, from within the containers and from your host machine:
Endpoint Parameter Value (from within containers) Value (from host machine) |ak| brokers bootstrap.servers
kafka:29092
localhost:9092
Confluent Schema Registry schema.registry.url
http://schema-registry:8081
http://localhost:8081
ZooKeeper zookeeper.connect
zookeeper:2181
localhost:2181
View messages in |ak| topics¶
The :devx-examples:`docker-compose.yml|music/docker-compose.yml` file spins up a few containers, one of which is kafka-music-data-generator
, which is continuously generating input data for the music application by writing into two |ak| topics in Avro format.
This allows you to look at live, real-time data when testing the |ak| music application.
play-events
: stream of play events (“song X was played”)song-feed
: stream of song metadata (“song X was written by artist Y”)
From your web browser, navigate to Confluent Control Center.
Click on
Topics
and select any topic to view its messages.You may also use the ksqlDB query editor in Confluent Control Center to view messages. For example, to see the |ak| messages in
play-events
, click onksqlDB
enter the following ksqlDB query into the editor:PRINT "play-events";
Verify your output resembles:
Enter the following ksqlDB query into the editor to view the |ak| messages in
song-feed
.PRINT "song-feed" FROM BEGINNING;
You can also use command line tools to view messages in the |ak| topics. View the messages in the topic
play-events
.docker-compose exec schema-registry \ kafka-avro-console-consumer \ --bootstrap-server kafka:29092 \ --topic play-events
Verify your output resembles:
{"song_id":11,"duration":60000} {"song_id":10,"duration":60000} {"song_id":12,"duration":60000} {"song_id":2,"duration":60000} {"song_id":1,"duration":60000}
View the messages in the topic
song-feed
.docker-compose exec schema-registry \ kafka-avro-console-consumer \ --bootstrap-server kafka:29092 \ --topic song-feed \ --from-beginning
Verify your output resembles:
{"id":1,"album":"Fresh Fruit For Rotting Vegetables","artist":"Dead Kennedys","name":"Chemical Warfare","genre":"Punk"} {"id":2,"album":"We Are the League","artist":"Anti-Nowhere League","name":"Animal","genre":"Punk"} {"id":3,"album":"Live In A Dive","artist":"Subhumans","name":"All Gone Dead","genre":"Punk"} {"id":4,"album":"PSI","artist":"Wheres The Pope?","name":"Fear Of God","genre":"Punk"}
Validate the |kstreams| application¶
The |ak| music application has a REST API, run in the Docker container kafka-music-application
, that you can interactively query using curl
.
List all running application instances of the |ak| Music application.
curl -sXGET http://localhost:7070/kafka-music/instances | jq .
Verify your output resembles:
[ { "host": "kafka-music-application", "port": 7070, "storeNames": [ "all-songs", "song-play-count", "top-five-songs", "top-five-songs-by-genre" ] } ]
Get the latest Top 5 songs across all music genres
curl -sXGET http://localhost:7070/kafka-music/charts/top-five | jq .
Verify your output resembles:
[ { "artist": "Jello Biafra And The Guantanamo School Of Medicine", "album": "The Audacity Of Hype", "name": "Three Strikes", "plays": 70 }, { "artist": "Hilltop Hoods", "album": "The Calling", "name": "The Calling", "plays": 67 }, ... ]
The REST API exposed by the Kafka Music application supports further operations. See the top-level instructions in its source code for details.
Create the ksqlDB application¶
In this section, create ksqlDB queries that are the equivalent to the |kstreams|.
You have two options to proceed:
- manually: step through the tutorial, creating each ksqlDB command one at a time
- automatically: submits all the :devx-examples:`ksqlDB commands|music/statements.sql` via the ksqlDB
SCRIPT
command:
Manually¶
Prefix the names of the ksqlDB streams and tables with ksql_
. This is not required but do it so that you can run these ksqlDB queries alongside the |kstreams| API version of this music demo and avoid naming conflicts.
Create a new stream called
ksql_playevents
from theplay-events
topic. From the ksqlDB application, select Add a stream.Select the topic
play-events
and then fill out the fields as shown below. Because Confluent Control Center integrates with Confluent Schema Registry, ksqlDB automatically detects the fieldssong_id
andduration
and their respective data types.Do some basic filtering on the newly created stream
ksql_playevents
, e.g. to qualify songs that were played for at least 30 seconds. From the ksqlDB query editor:SELECT * FROM ksql_playevents WHERE DURATION > 30000 EMIT CHANGES;
The above query is not persistent. It stops if this screen is closed. To make the query persistent and stay running until explicitly terminated, prepend the previous query with
CREATE STREAM ... AS
. From the ksqlDB query editor:Verify this persistent query shows up in the
Running Queries
tab.The original Kafka topic
song-feed
has a key of typeLong
, which maps to ksqlDB’sBIGINT
sql type, and the ID field stores a copy of the key. Create aTABLE
from the original Kafka topicsong-feed
:View the contents of this table and confirm that the entries in this ksqlDB table have a
ROWKEY
that matches the String ID of the song.SELECT * FROM ksql_song EMIT CHANGES limit 5;
DESCRIBE
the table to see the fields associated with this topic and notice that the fieldID
is of typeBIGINT
.At this point we have created a stream of filtered play events called
ksql_playevents_min_duration
and a table of song metadata calledksql_song
. Enrich the stream of play events with song metadata using a Stream-TableJOIN
. This results in a new stream of play events enriched with descriptive song information like song title along with each play event.Notice the addition of a clause
1 AS KEYCOL
. For every row, this creates a new fieldKEYCOL
that has a value of 1.KEYCOL
can be later used in other derived streams and tables to do aggregations on a global basis.Now you can create a top music chart for all time to see which songs get played the most. Use the
COUNT
function on the streamksql_songplays
that we created above, and since it would also be good to see stats for just the last 30 seconds, add in aWINDOW
clause which gives counts of play events for all songs, in 30-second intervals.Congratulations, you built a streaming application that processes data in real-time! The application enriched a stream of play events with song metadata and generated top counts. Any downstream systems can consume results from your ksqlDB queries for further processing. If you were already familiar with SQL semantics, hopefully this tutorial wasn’t too hard to follow.
SELECT * FROM ksql_songplaycounts30 EMIT CHANGES;
Automatically¶
View the :devx-examples:`ksqlDB statements.sql|music/statements.sql`.
Launch the ksqlDB CLI:
docker-compose exec ksqldb-cli ksql http://ksqldb-server:8088
Run the script :devx-examples:`statements.sql|music/statements.sql` that executes the ksqlDB statements.
RUN SCRIPT '/tmp/statements.sql';
The output shows either a blank message, or
Executing statement
, similar to this:Message --------- Executing statement ---------
After the
RUN SCRIPT
command completes, exit out of theksqldb-cli
with aCTRL+D
command
Stop the music application¶
When you are done, make sure to stop the demo.
docker-compose down
Troubleshooting¶
Verify the status of the Docker containers show
Up
state.docker-compose ps
Your output should resemble:
Name Command State Ports ----------------------------------------------------------------------------------------------------------------------------------- control-center /etc/confluent/docker/run Up 0.0.0.0:9021->9021/tcp kafka /etc/confluent/docker/run Up 0.0.0.0:29092->29092/tcp, 0.0.0.0:9092->9092/tcp kafka-music-application bash -c echo Waiting for K ... Up 0.0.0.0:7070->7070/tcp kafka-music-data-generator bash -c echo Waiting for K ... Up 7070/tcp ksqldb-cli /bin/sh Up ksqldb-server /etc/confluent/docker/run Up (healthy) 0.0.0.0:8088->8088/tcp schema-registry /etc/confluent/docker/run Up 0.0.0.0:8081->8081/tcp zookeeper /etc/confluent/docker/run Up 0.0.0.0:2181->2181/tcp, 2888/tcp, 3888/tcp
Confluent Control Center displays messages from topics, streams, and tables as new messages arrive. In this demo the data is sourced from an application running in a Docker container called
kafka-music-data-generator
. If you notice that Confluent Control Center is not displaying messages, you can try restarting this application.docker-compose restart kafka-music-data-generator