Compare Current and Previous Values in a Data Stream

Confluent Cloud for Apache Flink®️ provides a LAG function, which is a built-in function that enables you to access data from a previous row in the same result set without the need for a self-join. It gives you the ability to analyze the differences between consecutive rows or to create more complex calculations based on previous row values. This can be particularly useful in scenarios where you need to compare the current row’s data with the previous row’s data, such as calculating the difference in sales from one day to the next.

In this topic, you create a streaming source of mock data that models player scores in a game room, and you use the LAG function to see how the players scores change over time.

This topic shows the following steps:

Prerequisites

You need the following prerequisites to use Confluent Cloud for Apache Flink.

  • Access to Confluent Cloud.

  • The organization ID, environment ID, and compute pool ID for your organization.

  • The OrganizationAdmin, EnvironmentAdmin, or FlinkAdmin role for creating compute pools, or the FlinkDeveloper role if you already have a compute pool. If you don’t have the appropriate role, reach out to your OrganizationAdmin or EnvironmentAdmin.

  • The Confluent CLI. To use the Flink SQL shell, update to the latest version of the Confluent CLI by running the following command:

    confluent update --yes
    

    If you used homebrew to install the Confluent CLI, update the CLI by using the brew upgrade command, instead of confluent update.

    For more information, see Confluent CLI.

Step 1: Create a streaming data source for video gaming data

The streaming data for this topic is produced by a Datagen Source connector that is configured with the Gaming Player Activity data. It produces mock data to an Apache Kafka® topic named gaming_player_activity.

  1. Log in to the Confluent Cloud Console and select the environment where you’re using Flink SQL. If you don’t have an environment set up, you can review Flink Quick Start with Confluent Cloud Console or Flink Quick Start with the SQL Shell.

  2. Select the cluster in which you will be using Flink SQL.

  3. Select Connectors from the left side navigation menu.

    The Connectors page opens.

  4. Click Add connector

    The Connector Plugins page opens.

  5. In the Search connectors box, enter “datagen”.

  6. From the search results, select the Datagen Source connector.

    Screenshot that shows search results for the datagen connector
  7. At the Add Datagen Source Connector screen, complete the following:

  1. Click Add new topic, and in the Topic name field, enter “gaming_player_activity”.

  2. Click Create with defaults. Confluent Cloud creates the Kafka topic that the connector produces records to.

    Note

    When you’re in a Confluent Cloud environment that has Flink SQL, a SQL table is created automatically when you create a Kafka topic.

Step 2: View aggregated results

  1. In your Confluent Cloud Console workspace or in the Flink SQL shell, run the following statement to start a query using the LAG function. This enables comparing the current and previous scores of a game player.

    Remember to select your catalog and database before running the query. You can do this from the top-right controls in the Confluent Cloud workspace, or by running USE statements in the Flink SQL shell.

    SELECT $rowtime AS row_time
      , player_id
      , game_room_id
      , points
      , LAG(points, 1) OVER (PARTITION BY player_id ORDER BY $rowtime) previous_points_value
     FROM gaming_player_activity;
    

    Your output should resemble:

    row_time                player_id game_room_id points previous_points_value
    2024-01-11 15:42:00.557 1014      3409         424    88
    2024-01-11 15:42:01.079 1014      2472         243    424
    2024-01-11 15:42:01.391 1014      2910         343    243
    2024-01-11 15:42:01.482 1014      3742         113    343
    2024-01-11 15:42:01.681 1014      4226         78     113
    2024-01-11 15:42:01.910 1014      1531         354    78
    ...
    
  2. Compare the points column with the previous_points_value column. You can find each value in the points column listed in the next row of the previous_points_value column, thanks to the LAG function in the SELECT statement.