Kafka Streams Demo Application

This demo showcases Apache Kafka® Streams API (source code) and ksqlDB (see blog post Hands on: Building a Streaming Application with KSQL and video Demo: Build a Streaming Application with ksqlDB).

The music application demonstrates how to build a simple music charts application that continuously computes, in real-time, the latest charts such as Top 5 songs per music genre. It exposes its latest processing results – the latest charts – via the Kafka Interactive Queries feature and a REST API. The application’s input data is in Avro format and comes from two sources: a stream of play events (think: “song X was played”) and a stream of song metadata (“song X was written by artist Y”).

The following screencast shows a live bit of the music demo application:

Prerequisites

Confluent Platform is supported in various operating systems and software versions (see Supported Versions and Interoperability for details). This example has been validated with the specific configuration described below. If you are running the example in Windows, which is not officially supported, the example may still work if you update the example code in GitHub, replacing the symlink .env with the contents of config.env.

  • macOS 10.15.3
  • Confluent Platform 6.0.0
  • Java 11.0.6 2020-01-14 LTS
  • bash version 3.2.57
  • jq 1.6
  • (Docker-based examples) Docker version 19.03.8
  • (Docker-based examples) Docker Compose docker-compose version 1.25.4

Start the music application

To run this demo, complete the following steps:

  1. In Docker’s advanced settings, increase the memory dedicated to Docker to at least 8GB (default is 2GB).

  2. Clone the Confluent examples repository:

    git clone https://github.com/confluentinc/examples.git
    
  3. Navigate to the examples/music/ directory and switch to the Confluent Platform release branch:

    cd examples/music/
    git checkout 6.0.0-post
    
  4. Start the demo by running a single command that brings up all the Docker containers. This takes about 2 minutes to complete.

    docker-compose up -d
    
  5. View the Confluent Control Center logs and validate that it is running.

    docker-compose logs -f control-center | grep -i "Started NetworkTrafficServerConnector"
    
  6. Verify that you see in the Confluent Control Center logs:

    INFO Started NetworkTrafficServerConnector@5533dc72{HTTP/1.1,[http/1.1]}{0.0.0.0:9021} (org.eclipse.jetty.server.AbstractConnector)
    
  7. Note the available endpoints of the Kafka brokers, Confluent Schema Registry, and ZooKeeper, from within the containers and from your host machine:

    Endpoint Parameter Value (from within containers) Value (from host machine)
    Kafka brokers bootstrap.servers kafka:29092 localhost:9092
    Confluent Schema Registry schema.registry.url http://schema-registry:8081 http://localhost:8081
    ZooKeeper zookeeper.connect zookeeper:2181 localhost:2181

View messages in Kafka topics

The docker-compose.yml file spins up a few containers, one of which is kafka-music-data-generator, which is continuously generating input data for the music application by writing into two Kafka topics in Avro format. This allows you to look at live, real-time data when testing the Kafka music application.

  • play-events : stream of play events (“song X was played”)
  • song-feed : stream of song metadata (“song X was written by artist Y”)
../../../_images/ksql-music-demo-source-data-explore.jpg
  1. From your web browser, navigate to Confluent Control Center.

  2. Click on Topics and select any topic to view its messages.

    ../../../_images/inspect_topic.png
  3. You may also use the ksqlDB query editor in Confluent Control Center to view messages. For example, to see the Kafka messages in play-events, click on ksqlDB enter the following ksqlDB query into the editor:

    PRINT "play-events";
    
  4. Verify your output resembles:

    ../../../_images/topic_ksql_play_events.png
  5. Enter the following ksqlDB query into the editor to view the Kafka messages in song-feed.

    PRINT "song-feed" FROM BEGINNING;
    
  6. You can also use command line tools to view messages in the Kafka topics. View the messages in the topic play-events.

    docker-compose exec schema-registry \
        kafka-avro-console-consumer \
            --bootstrap-server kafka:29092 \
            --topic play-events
    
  7. Verify your output resembles:

    {"song_id":11,"duration":60000}
    {"song_id":10,"duration":60000}
    {"song_id":12,"duration":60000}
    {"song_id":2,"duration":60000}
    {"song_id":1,"duration":60000}
    
  8. View the messages in the topic song-feed.

    docker-compose exec schema-registry \
        kafka-avro-console-consumer \
            --bootstrap-server kafka:29092 \
            --topic song-feed \
            --from-beginning
    
  9. Verify your output resembles:

    {"id":1,"album":"Fresh Fruit For Rotting Vegetables","artist":"Dead Kennedys","name":"Chemical Warfare","genre":"Punk"}
    {"id":2,"album":"We Are the League","artist":"Anti-Nowhere League","name":"Animal","genre":"Punk"}
    {"id":3,"album":"Live In A Dive","artist":"Subhumans","name":"All Gone Dead","genre":"Punk"}
    {"id":4,"album":"PSI","artist":"Wheres The Pope?","name":"Fear Of God","genre":"Punk"}
    

Validate the Kafka Streams application

The Kafka music application has a REST API, run in the Docker container kafka-music-application, that you can interactively query using curl.

  1. List all running application instances of the Kafka Music application.

    curl -sXGET http://localhost:7070/kafka-music/instances | jq .
    
  2. Verify your output resembles:

    [
      {
        "host": "kafka-music-application",
        "port": 7070,
        "storeNames": [
          "all-songs",
          "song-play-count",
          "top-five-songs",
          "top-five-songs-by-genre"
        ]
      }
    ]
    
  3. Get the latest Top 5 songs across all music genres

    curl -sXGET http://localhost:7070/kafka-music/charts/top-five | jq .
    
  4. Verify your output resembles:

    [
      {
        "artist": "Jello Biafra And The Guantanamo School Of Medicine",
        "album": "The Audacity Of Hype",
        "name": "Three Strikes",
        "plays": 70
      },
      {
        "artist": "Hilltop Hoods",
        "album": "The Calling",
        "name": "The Calling",
        "plays": 67
      },
      ...
    ]
    
  5. The REST API exposed by the Kafka Music application supports further operations. See the top-level instructions in its source code for details.

Create the ksqlDB application

In this section, create ksqlDB queries that are the equivalent to the Kafka Streams.

../../../_images/ksql-music-demo-overview.jpg

You have two options to proceed:

  • manually: step through the tutorial, creating each ksqlDB command one at a time
  • automatically: submits all the ksqlDB commands via the ksqlDB SCRIPT command:

Manually

Prefix the names of the ksqlDB streams and tables with ksql_. This is not required but do it so that you can run these ksqlDB queries alongside the Kafka Streams API version of this music demo and avoid naming conflicts.

  1. Create a new stream called ksql_playevents from the play-events topic. From the ksqlDB application, select Add a stream.

    ../../../_images/add_a_stream.png
  2. Select the topic play-events and then fill out the fields as shown below. Because Confluent Control Center integrates with Confluent Schema Registry, ksqlDB automatically detects the fields song_id and duration and their respective data types.

    ../../../_images/ksql_playevents.png
  3. Do some basic filtering on the newly created stream ksql_playevents, e.g. to qualify songs that were played for at least 30 seconds. From the ksqlDB query editor:

    SELECT * FROM ksql_playevents WHERE DURATION > 30000 EMIT CHANGES;
    
  4. The above query is not persistent. It stops if this screen is closed. To make the query persistent and stay running until explicitly terminated, prepend the previous query with CREATE STREAM ... AS. From the ksqlDB query editor:

    CREATE STREAM ksql_playevents_min_duration AS SELECT * FROM ksql_playevents WHERE DURATION > 30000;
    
  5. Verify this persistent query shows up in the Running Queries tab.

  6. The original Kafka topic song-feed has a key of type Long, which maps to ksqlDB’s BIGINT sql type, and the ID field stores a copy of the key. Create a TABLE from the original Kafka topic song-feed:

    CREATE TABLE ksql_song (SONG_ID BIGINT PRIMARY KEY) WITH (KAFKA_TOPIC='song-feed', VALUE_FORMAT='AVRO');
    
  7. View the contents of this table and confirm that the entries in this ksqlDB table have a ROWKEY that matches the String ID of the song.

    SELECT * FROM ksql_song EMIT CHANGES limit 5;
    
  8. DESCRIBE the table to see the fields associated with this topic and notice that the field ID is of type BIGINT.

    ../../../_images/describe_songfeed.png
  9. At this point we have created a stream of filtered play events called ksql_playevents_min_duration and a table of song metadata called ksql_song. Enrich the stream of play events with song metadata using a Stream-Table JOIN. This results in a new stream of play events enriched with descriptive song information like song title along with each play event.

    
    
  10. Notice the addition of a clause 1 AS KEYCOL. For every row, this creates a new field KEYCOL that has a value of 1. KEYCOL can be later used in other derived streams and tables to do aggregations on a global basis.

  11. Now you can create a top music chart for all time to see which songs get played the most. Use the COUNT function on the stream ksql_songplays that we created above.

    
    
  12. While the all-time greatest hits are cool, it would also be good to see stats for just the last 30 seconds. Create another query, adding in a WINDOW clause, which gives counts of play events for all songs, in 30-second intervals.

    --Convert TABLE to STREAM
    
  13. Congratulations, you built a streaming application that processes data in real-time! The application enriched a stream of play events with song metadata and generated top counts. Any downstream systems can consume results from your ksqlDB queries for further processing. If you were already familiar with SQL semantics, hopefully this tutorial wasn’t too hard to follow.

    SELECT * FROM ksql_songplaycounts30 EMIT CHANGES;
    
    ../../../_images/counts_results.png

Automatically

  1. View the ksqlDB statements.sql.

    --The STREAM and TABLE names are prefixed with `ksql_` to enable you to run this demo
    --concurrently with the Kafka Streams Music Demo java application, to avoid conflicting names
    
    
    --The play-events Kafka topic is a feed of song plays, generated by KafkaMusicExampleDriver
    CREATE STREAM ksql_playevents WITH (KAFKA_TOPIC='play-events', VALUE_FORMAT='AVRO');
    
    --Filter the play events to only accept events where the duration is >= 30 seconds
    CREATE STREAM ksql_playevents_min_duration AS SELECT * FROM ksql_playevents WHERE DURATION > 30000;
    
    --The song-feed Kafka topic contains all of the songs available in the streaming service, generated by KafkaMusicExampleDriver
    CREATE TABLE ksql_song (SONG_ID BIGINT PRIMARY KEY) WITH (KAFKA_TOPIC='song-feed', VALUE_FORMAT='AVRO');
    
    --Join the plays with song as we will use it later for charting
    CREATE STREAM ksql_songplays AS SELECT plays.SONG_ID AS ID, ALBUM, ARTIST, NAME, GENRE, DURATION FROM ksql_playevents_min_duration plays LEFT JOIN ksql_song songs ON plays.SONG_ID = songs.SONG_ID;
    
    --Track song play counts in 30 second intervals, with a single partition for global view across multiple partitions (https://github.com/confluentinc/ksql/issues/1053)
    CREATE TABLE ksql_songplaycounts30 WITH (PARTITIONS=1) AS SELECT ID AS K1, NAME AS K2, GENRE AS K3, AS_VALUE(ID) AS ID, AS_VALUE(NAME) AS NAME, AS_VALUE(GENRE) AS GENRE, COUNT(*) AS COUNT FROM ksql_songplays WINDOW TUMBLING (size 30 second) GROUP BY ID, NAME, GENRE;
    --Convert TABLE to STREAM
    CREATE STREAM ksql_songplaycounts30stream (ID BIGINT, NAME VARCHAR, GENRE VARCHAR, COUNT BIGINT) WITH (kafka_topic='KSQL_SONGPLAYCOUNTS30', value_format='AVRO');
    
    --Track song play counts for all time, with a single partition for global view across multiple partitions (https://github.com/confluentinc/ksql/issues/1053)
    CREATE TABLE ksql_songplaycounts WITH (PARTITIONS=1) AS SELECT ID AS K1, NAME AS K2, GENRE AS K3, AS_VALUE(ID) AS ID, AS_VALUE(NAME) AS NAME, AS_VALUE(GENRE) AS GENRE, COUNT(*) AS COUNT FROM ksql_songplays GROUP BY ID, NAME, GENRE;
    --Convert TABLE to STREAM
    CREATE STREAM ksql_songplaycountsstream (ID BIGINT, NAME VARCHAR, GENRE VARCHAR, COUNT BIGINT) WITH (kafka_topic='KSQL_SONGPLAYCOUNTS', value_format='AVRO');
    
    --Top Five song counts for all time based on ksql_songplaycountsstream
    --At this time, `TOPK` does not support sorting by one column and selecting the value of another column (https://github.com/confluentinc/ksql/issues/403)
    --So the results are just counts but not names of the songs associated with the counts
    CREATE TABLE ksql_top5 AS SELECT 1 AS KEYCOL, TOPK(COUNT,5) FROM ksql_songplaycountsstream GROUP BY 1;
    --Top Five songs for each genre based on each WINDOW of ksql_songplaycounts
    CREATE TABLE ksql_top5bygenre AS SELECT GENRE, TOPK(COUNT,5) FROM ksql_songplaycountsstream GROUP BY GENRE;
    
    
  2. Launch the ksqlDB CLI:

    docker-compose exec ksqldb-cli ksql http://ksqldb-server:8088
    
  3. Run the script statements.sql that executes the ksqlDB statements.

    RUN SCRIPT '/tmp/statements.sql';
    

    The output shows either a blank message, or Executing statement, similar to this:

     Message
    ---------
     Executing statement
    ---------
    

    After the RUN SCRIPT command completes, exit out of the ksqldb-cli with a CTRL+D command

Stop the music application

  1. When you are done, make sure to stop the demo.

    docker-compose down
    

Troubleshooting

  1. Verify the status of the Docker containers show Up state.

    docker-compose ps
    

    Your output should resemble:

                  Name                            Command                  State                            Ports
    -----------------------------------------------------------------------------------------------------------------------------------
    control-center                     /etc/confluent/docker/run        Up             0.0.0.0:9021->9021/tcp
    kafka                              /etc/confluent/docker/run        Up             0.0.0.0:29092->29092/tcp, 0.0.0.0:9092->9092/tcp
    kafka-music-application            bash -c echo Waiting for K ...   Up             0.0.0.0:7070->7070/tcp
    kafka-music-data-generator         bash -c echo Waiting for K ...   Up             7070/tcp
    ksqldb-cli                         /bin/sh                          Up
    ksqldb-server                      /etc/confluent/docker/run        Up (healthy)   0.0.0.0:8088->8088/tcp
    schema-registry                    /etc/confluent/docker/run        Up             0.0.0.0:8081->8081/tcp
    zookeeper                          /etc/confluent/docker/run        Up             0.0.0.0:2181->2181/tcp, 2888/tcp, 3888/tcp
    
  2. Confluent Control Center displays messages from topics, streams, and tables as new messages arrive. In this demo the data is sourced from an application running in a Docker container called kafka-music-data-generator. If you notice that Confluent Control Center is not displaying messages, you can try restarting this application.

    docker-compose restart kafka-music-data-generator