Aggregate a Stream in a Tumbling Window¶
Important
Confluent Cloud for Apache Flink®️ is currently available for Preview. A Preview feature is a Confluent Cloud component that is being introduced to gain early feedback from developers. Preview features can be used for evaluation and non-production testing purposes or to provide feedback to Confluent. The warranty, SLA, and Support Services provisions of your agreement with Confluent do not apply to Preview features. Confluent may discontinue providing Preview releases of the Preview features at any time in Confluent’s sole discretion. Check out Getting Help for questions, feedback and requests.
For SQL features and limitations in the preview program, see Notable Limitations in Public Preview.
Aggregation over windows is central to processing streaming data. Confluent Cloud for Apache Flink®️ supports Windowing Table-Valued Functions (Windowing TVFs), a SQL-standard syntax for splitting an infinite stream into windows of finite size and computing aggregations within each window. This is often used to find the min/max/average within a group, finding the first or last record or calculating totals.
In this topic, you create a streaming source of mock data that models player scores in a game room, and you run a statement that computes the sum, maximum, and minimum scores with a ten-second TUMBLE window. Tumbling windows have a fixed size - in this case 10 seconds - and every record is assigned to one window.
This topic shows the following steps:
Prerequisites¶
You need the following prerequisites to use Confluent Cloud for Apache Flink.
Access to Confluent Cloud.
The organization ID, environment ID, and compute pool ID for your organization at hand.
The OrganizationAdmin, EnvironmentAdmin, or FlinkAdmin role for creating compute pools, or the FlinkDevelper role if you already have a compute pool. If you don’t have the appropriate role, reach out to your OrganizationAdmin or EnvironmentAdmin.
The Confluent CLI. To use the SQL shell, update to the latest version of the Confluent CLI by running the following command:
confluent update --yes
If you used homebrew to install the Confluent CLI, update the CLI by using the
brew upgrade
command, instead ofconfluent update
.For more information, see Confluent CLI.
Step 1: Create a streaming data source¶
The streaming data for this topic is produced by a Datagen Source connector
that’s configured with the Gaming player activity template. It produces mock data
to an Apache Kafka® topic named gaming_player_activity_source
. The connector produces
player score records that are randomly generated from the
gaming_player_activity.avro
file.
Log in to the Confluent Cloud Console and navigate to the environment that hosts SQL.
In the navigation menu, select Connectors.
The Connectors page opens.
Click Add connector
The Connector Plugins page opens.
In the Search connectors box, enter “datagen”.
From the search results, select the Datagen Source connector.
At the Add Datagen Source Connector screen, complete the following:
Click Add new topic, and in the Topic name field, enter “gaming_player_activity_source”.
Click Create with defaults. Confluent Cloud creates the Kafka topic that the connector produces records to.
Note
When you’re in a Confluent Cloud environment that has SQL, a SQL table is created automatically when you create a Kafka topic.
- Select the way you want to provide Kafka Cluster credentials. You can
choose one of the following options:
- Global Access: Allows your connector to access everything you have access to. With global access, connector access will be linked to your account. This option is not recommended for production.
- Granular access: Limits the access for your connector. You will be able to manage connector access through a service account. This option is recommended for production.
- Use an existing API key: Allows you to enter an API key and secret part you have stored. You can enter an API key and secret (or generate these in the Cloud Console).
- We’d recommend to go with Global Access as long as you don’t have any other requirements.
- Click Continue.
On the Configuration page, select AVRO for the output record value format.
Selecting AVRO configures the connector to associate a schema with the
gaming_player_activity_source
topic and register it with Schema Registry.In the Select a template section, click Show more options, click the Gaming player activity tile, and click Continue.
Click Continue.
- For Connector sizing, leave the slider at the default of 1 task and click Continue.
In the Connector name box, Select the text and replace it with “gaming_player_activity_source_connector”.
Click Continue to start the connector.
The status of your new connector reads Provisioning, which lasts for a few seconds. When the status of the new connector changes from Provisioning to Running, you have a producer sending an event stream to your topic in the Confluent Cloud cluster.
As the default configuration of the DataGen connector only sends one message per second we want to tweak the settings a bit. Click on your connector, go to Settings and click on Edit for the Advanced configuration. Change the value of Max interval between messages (ms) to 10 which will make the connector send 100 messages per second and create a decent data stream, that will allow us to properly look into streaming windows.
Run SELECT * FROM gaming_player_activity_source;
in your Confluent CLI Flink shell to see the data flowing in.If you add
$rowtime
to theSELECT
statement you can see the creation time of the datapoint. RunSELECT $rowtime, * FROM gaming_player_activity_source;
Step 2: View aggregated results in a tumbling window¶
Run the following statement in your Confluent CLI Flink shell to start a windowed query on the
gaming_player
records.SELECT window_start, window_end, SUM(points) AS total, MIN(points) as min_points, MAX(points) as max_points FROM TABLE(TUMBLE(TABLE gaming_player_activity_source, DESCRIPTOR($rowtime), INTERVAL '10' SECOND)) GROUP BY window_start, window_end;
Your output should resemble:
window_start window_end total min_points max_points 2023-09-12 08:54:20.000 2023-09-12 08:54:30.000 4375 12 493 2023-09-12 08:54:30.000 2023-09-12 08:54:40.000 4285 13 487 2023-09-12 08:54:40.000 2023-09-12 08:54:50.000 4329 29 495 ...
As we are now producing more data we could also push the window to 1 second. Adjust the statement accordingly.
What do you see here as output? The Flink job defined through the query, sums up the points of all players for every 10 second window (so called tumbling windows) as well as the minimum and maximum value.
Congratulations. Using time value window functions in streaming SQL will bring you one step closer to being a streaming SQL expert.