Aggregate a Stream in a Tumbling Window

Aggregation over windows is central to processing streaming data. Confluent Cloud for Apache Flink®️ supports Windowing Table-Valued Functions (Windowing TVFs), a SQL-standard syntax for splitting an infinite stream into windows of finite size and computing aggregations within each window. This is often used to find the min/max/average within a group, finding the first or last record or calculating totals.

In this topic, you create a streaming source of mock data that models player scores in a game room, and you run a statement that computes the sum, maximum, and minimum scores with a ten-second TUMBLE window. Tumbling windows have a fixed size - in this case 10 seconds - and every record is assigned to one window.

This topic shows the following steps:

Prerequisites

You need the following prerequisites to use Confluent Cloud for Apache Flink.

  • Access to Confluent Cloud.

  • The organization ID, environment ID, and compute pool ID for your organization.

  • The OrganizationAdmin, EnvironmentAdmin, or FlinkAdmin role for creating compute pools, or the FlinkDeveloper role if you already have a compute pool. If you don’t have the appropriate role, reach out to your OrganizationAdmin or EnvironmentAdmin.

  • The Confluent CLI. To use the Flink SQL shell, update to the latest version of the Confluent CLI by running the following command:

    confluent update --yes
    

    If you used homebrew to install the Confluent CLI, update the CLI by using the brew upgrade command, instead of confluent update.

    For more information, see Confluent CLI.

Step 1: Create a streaming data source

The streaming data for this topic is produced by a Datagen Source connector that’s configured with the Gaming player activity template. It produces mock data to an Apache Kafka® topic named gaming_player_activity_source. The connector produces player score records that are randomly generated from the gaming_player_activity.avro file.

  1. Log in to the Confluent Cloud Console and navigate to the environment that hosts Flink SQL.

  2. In the navigation menu, select Connectors.

    The Connectors page opens.

  3. Click Add connector

    The Connector Plugins page opens.

  4. In the Search connectors box, enter “datagen”.

  5. From the search results, select the Datagen Source connector.

    Screenshot that shows search results for the datagen connector
  6. At the Add Datagen Source Connector screen, complete the following:

  1. Click on Additional configuration.

  2. Click Add new topic, and in the Topic name field, enter “gaming_player_activity_source”.

  3. Click Create with defaults. Confluent Cloud creates the Kafka topic that the connector produces records to.

    Note

    When you’re in a Confluent Cloud environment that has Flink SQL, a SQL table is created automatically when you create a Kafka topic.

Step 2: View aggregated results in a tumbling window

  1. Run the following statement in your Confluent CLI Flink shell to start a windowed query on the gaming_player records.

    SELECT
      window_start,
      window_end,
      SUM(points) AS total,
      MIN(points) as min_points,
      MAX(points) as max_points
    FROM TABLE(TUMBLE(TABLE gaming_player_activity_source, DESCRIPTOR($rowtime), INTERVAL '10' SECOND))
    GROUP BY window_start, window_end;
    

    Your output should resemble:

    window_start            window_end              total min_points max_points
    2023-09-12 08:54:20.000 2023-09-12 08:54:30.000 4375  12         493
    2023-09-12 08:54:30.000 2023-09-12 08:54:40.000 4285  13         487
    2023-09-12 08:54:40.000 2023-09-12 08:54:50.000 4329  29         495
    ...
    
  2. Now that you’re producing more data, you can push the window to 1 second. Adjust the statement accordingly.

The Flink job defined through the query sums up the points of all players for every 10-second window (so-called tumbling windows) as well as the minimum and maximum value.