Compare Current and Previous Values in a Data Stream¶
Confluent Cloud for Apache Flink®️ provides a LAG function, which is a built-in function that enables you to access data from a previous row in the same result set without the need for a self-join. It gives you the ability to analyze the differences between consecutive rows or to create more complex calculations based on previous row values. This can be particularly useful in scenarios where you need to compare the current row’s data with the previous row’s data, such as calculating the difference in sales from one day to the next.
In this topic, you create a streaming source of mock data that models player scores in a game room, and you use the LAG function to see how the players scores change over time.
This topic shows the following steps:
Prerequisites¶
You need the following prerequisites to use Confluent Cloud for Apache Flink.
Access to Confluent Cloud.
The organization ID, environment ID, and compute pool ID for your organization.
The OrganizationAdmin, EnvironmentAdmin, or FlinkAdmin role for creating compute pools, or the FlinkDeveloper role if you already have a compute pool. If you don’t have the appropriate role, reach out to your OrganizationAdmin or EnvironmentAdmin.
The Confluent CLI. To use the Flink SQL shell, update to the latest version of the Confluent CLI by running the following command:
confluent update --yes
If you used homebrew to install the Confluent CLI, update the CLI by using the
brew upgrade
command, instead ofconfluent update
.For more information, see Confluent CLI.
Step 1: Create a streaming data source for video gaming data¶
The streaming data for this topic is produced by a Datagen Source connector
that is configured with the Gaming Player Activity data. It produces mock
data to an Apache Kafka® topic named gaming_player_activity
.
Log in to the Confluent Cloud Console and select the environment where you’re using Flink SQL. If you don’t have an environment set up, you can review Flink Quick Start with Confluent Cloud Console or Flink Quick Start with the SQL Shell.
Select the cluster in which you will be using Flink SQL.
Select Connectors from the left side navigation menu.
The Connectors page opens.
Click Add connector
The Connector Plugins page opens.
In the Search connectors box, enter “datagen”.
From the search results, select the Datagen Source connector.
At the Add Datagen Source Connector screen, complete the following:
Click Add new topic, and in the Topic name field, enter “gaming_player_activity”.
Click Create with defaults. Confluent Cloud creates the Kafka topic that the connector produces records to.
Note
When you’re in a Confluent Cloud environment that has Flink SQL, a SQL table is created automatically when you create a Kafka topic.
- Select the way you want to provide Kafka Cluster credentials. You can
choose one of the following options:
- Global Access: Allows your connector to access everything you have access to. With global access, connector access will be linked to your account. This option is not recommended for production.
- Granular access: Limits the access for your connector. You will be able to manage connector access through a service account. This option is recommended for production.
- Use an existing API key: Allows you to enter an API key and secret part you have stored. You can enter an API key and secret (or generate these in the Cloud Console).
- Confluent recommends useing Global Access as long as you don’t have any other requirements.
- Click Continue.
On the Configuration page, select AVRO for the output record value format.
Selecting AVRO configures the connector to associate a schema with the
gaming_player_activity
topic and register it with Schema Registry.In the Select a template section, click Show more options, click the Gaming player activity tile, and click Continue.
- For Connector sizing, leave the slider at the default of 1 task and click Continue.
In the Connector name box, Select the text and replace it with “gaming_player_activity_connector”.
Click Continue to start the connector.
The status of your new connector reads Provisioning, which lasts for a few seconds. When the status of the new connector changes from Provisioning to Running, you have a producer sending an event stream to your topic in the Confluent Cloud cluster.
The default configuration of the DataGen connector sends only one message per second, and for this topic, you need a faster data rate. Click your connector, go to Settings and click Edit for the Advanced configuration. Change the value of Max interval between messages (ms) to 10, which makes the connector send 100 messages per second and creates a good data stream. This enables properly analyzing and exploring streaming data.
In your Confluent Cloud Console workspace or in the Flink SQL shell, run the following command to see the data flowing in.
SELECT * FROM gaming_player_activity;
If you add
$rowtime
to theSELECT
statement you can see the creation time of the datapoint. RunSELECT $rowtime, * FROM gaming_player_activity;
Step 2: View aggregated results¶
In your Confluent Cloud Console workspace or in the Flink SQL shell, run the following statement to start a query using the LAG function. This enables comparing the current and previous scores of a game player.
Remember to select your catalog and database before running the query. You can do this from the top-right controls in the Confluent Cloud workspace, or by running USE statements in the Flink SQL shell.
SELECT $rowtime AS row_time , player_id , game_room_id , points , LAG(points, 1) OVER (PARTITION BY player_id ORDER BY $rowtime) previous_points_value FROM gaming_player_activity;
Your output should resemble:
row_time player_id game_room_id points previous_points_value 2024-01-11 15:42:00.557 1014 3409 424 88 2024-01-11 15:42:01.079 1014 2472 243 424 2024-01-11 15:42:01.391 1014 2910 343 243 2024-01-11 15:42:01.482 1014 3742 113 343 2024-01-11 15:42:01.681 1014 4226 78 113 2024-01-11 15:42:01.910 1014 1531 354 78 ...
Compare the
points
column with theprevious_points_value column
. You can find each value in thepoints
column listed in the next row of theprevious_points_value column
, thanks to the LAG function in the SELECT statement.