Create an Aggregation Pipeline for Stream Designer on Confluent Cloud

The following steps show how to create a pipeline in Stream Designer that aggregates clickstream data and reports errors in a table.

Step 1: Create a pipeline project

A Stream Designer pipeline project defines all the components that are deployed for an application. In this step, you create a pipeline project and a canvas for designing the component graph.

  1. Log in to the Confluent Cloud Console and open the Cluster Overview page for the cluster you want to use for creating pipelines.

  2. In the navigation menu, click Stream Designer.

  3. Click Create pipeline.

    The Create a new pipeline page opens.

    Stream Designer Create New Pipeline page in Confluent Cloud Console

Step 2: Create a connector definition

Your pipeline starts with data produced by the Datagen source connector. In this step, you create a pipeline definition for a connector that produces mock clickstream data to a Kafka topic.

  1. Click Start with Connector and click Start building.

    The Stream Designer canvas opens, with the Source Connector details view visible.

  2. In the Source Connector page click the Datagen Source tile to open the Configuration page.

  3. In the Topic textbox, type “clicks” and click Continue to open the Kafka credentials page.

  4. Ensure that the the Global access tile is selected and click Generate API key & download to create the API key for the Datagen connector.

    A text file containing the newly generated API key and secret is downloaded to your local machine.

  5. Click Continue to configure the connector’s output.

  6. In the Select output record value format section, click JSON_SR, and in the Select a template section, click Clickstream.

  7. Click Continue to open the Sizing page.

  8. In the Connector sizing section, leave the Minimum number of tasks setting at 1.

  9. Click Continue to open the Review and launch page.

  10. In the Connector name textbox, enter “Datagen_clickstream” and click Continue.

    The Datagen source connector is configured and appears on the canvas with a corresponding topic component. The topic component is configured with the name you provided during connector configuration. Also, a stream is registered on the topic.

    Stream Designer with Datagen source connector and topic components in Confluent Cloud Console

Step 3: Register a stream on the topic

Stream Designer enables registering a stream on an underlying topic. In this step, you create a stream named “clickstream” that corresponds with the “clicks” topic.

  1. Click the Stream component to open the Stream Configuration dialog.

  2. In the Name textbox, enter “CLICKSTREAM”.

  3. In the Value Format dropdown, select JSON_SR.

  4. Click Save.

    The Topic component updates with the stream name.

    Stream Designer with topic and stream components in Confluent Cloud Console

Step 4: Create a filter query definition

Stream Designer enables defining queries on your streams. In this step, you create the definition for a filter component and specify its behavior by using a simple SQL statement. The filter produces a stream of click events that have a status code greater than 400.

  1. Hover over the CLICKSTREAM component and click +.

    A context menu appears showing the components that accept a stream as input.

    Stream Designer and stream context menu
  2. In the context menu, click Filter.

    A Query component appears and the filter configuration dialog opens.

    Note

    You can also add a filter by clicking Filter in the Components menu and manually connecting the components on the canvas.

  3. In the Filter Expression field, enter the following SQL:

    (CAST(CLICKSTREAM.STATUS AS INT) > 400)
    
    Stream Designer and filter configuration in Confluent Cloud Console
  4. Click Save to create the filter definition.

    Stream Designer and filter configuration in Confluent Cloud Console

    The query component has an error state because it needs a sink topic or another query for its output, which you provide in the next step.

Step 5: Create a GROUP BY query definition

The next stage of the pipeline groups click messages by their status code, within a 60-second hopping window.

  1. Hover over the filter component and click +.

    A context menu appears showing the components that accept a stream as input.

    Stream Designer and GROUP BY context menu in Confluent Cloud Console
  2. In the context menu, click Group By.

    A Group By component appears in the query, and the configuration dialog opens.

    Note

    You can also add a GROUP BY clause by clicking Group By in the Components menu and manually connecting the components on the canvas.

  3. In the Group by Expression textbox, enter “CLICKSTREAM.STATUS”.

  4. Click the Window dropdown and select HOPPING.

    Fields for the hopping window parameters appear.

  5. Configure the window with the following values:

    • Window Size Duration: 60
    • Window Size Unit: seconds
    • Advance By Duration: 20
    • Advance By Unit: seconds

    Your configuration should resemble:

    Stream Designer and GROUP BY configuration in Confluent Cloud Console
  6. Click Save.

Step 6: Create a HAVING query definition

The next stage of the pipeline extracts clicks from an aggregation that has more than 5 of the same type of error within the time window.

  1. Hover over the Group By component and click +.

    A context menu appears showing the components that accept grouped input.

    Stream Designer and GROUP BY context menu in Confluent Cloud Console
  2. In the context menu, click Having.

    A Having component appears in the query, and the configuration dialog opens.

    Note

    You can also add a HAVING clause by clicking Having in the Components menu and manually connecting the components on the canvas.

  3. In the Having Expression textbox, enter the following SQL:

    (COUNT(*) > 5) AND (COUNT(*) IS NOT NULL)
    

    Your configuration should resemble:

    Stream Designer and HAVING configuration in Confluent Cloud Console
  4. Click Save.

Step 7: Create a mapping definition

In this step, you define a projection that maps columns to an output table.

  1. Hover over the Having component and click +.

    A context menu appears showing the components that accept HAVING input.

    Stream Designer and HAVING context menu in Confluent Cloud Console
  2. In the context menu, click Project.

    A Project component appears in the query, and the configuration dialog opens.

    Note

    You can also add a projection by clicking Project in the Components menu and manually connecting the components on the canvas.

  3. In the Projection Expression textbox, enter the following SQL, which defines the output table’s key column to be the status code.

    STATUS AS K1
    
  4. Click Add Projection Expression and in the textbox, enter the following SQL:

    AS_VALUE(STATUS) AS STATUS
    
  5. Click Add Projection Expression and in the textbox, enter the following SQL:

    COUNT(*) AS ERRORS
    
  6. Click Add Projection Expression and in the textbox, enter the following SQL:

    WINDOWSTART AS EVENT_TS
    

    Your configuration should resemble:

    Stream Designer and mapping configuration in Confluent Cloud Console
  7. Click Save.

    Stream Designer and built query in Confluent Cloud Console

    The query definition is complete. The query component shows an error state because it needs a sink topic for its output, which you provide in the next step.

Step 8: Create an error summary table definition

The last step is to create a table from the projected columns that reports the error counts within the window.

  1. Hover over the Project component and click +.

    A context menu appears showing the components that accept mapping input.

    Stream Designer and HAVING context menu in Confluent Cloud Console
  2. In the context menu, click Table.

    A Table component appears on the canvas, and the configuration dialog opens.

    Note

    You can also add a table by clicking Table in the Components menu and manually connecting the components on the canvas.

  3. In the Name textbox, enter “errors_per_minute_alert” and click Save.

    The aggregation pipeline is ready to activate.

    Stream Designer and aggregation pipeline in Confluent Cloud Console

Step 9: Activate the pipeline

In this step, you enable security for the pipeline and activate it.

  1. Click Activate to deploy the pipeline components.

    The Pipeline activation dialog opens.

  2. In the ksqlDB Cluster dropdown, select the ksqlDB cluster to use for your pipeline logic.

    Note

    If you don’t have a ksqlDB cluster yet, click Create new ksqlDB cluster to open the ksqlDB Cluster page and then click Add cluster. When you’ve finished creating the ksqlDB cluster, return to the Create new pipeline dialog and click the refresh button to see your new ksqlDB cluster in the dropdown.

  3. In the Activation privileges section, click Grant privileges.

  4. Click Confirm to activate your pipeline.

    After a few seconds, the state of each component goes from Activating to Activated.

    Note

    If the filter component reports an error like Did not find any value schema for the topic, wait for the Datagen source connector to provision completely and activate the pipeline again.

  5. When all components show the Activated state, click the errors_per_minute topic, and in the details view, click Messages.

    The aggregated status messages appear, showing the number of errors and the timestamp of the window in which the messages were aggregated.

    Stream Designer and aggregated messages in Confluent Cloud Console

Step 10: Deactivate the pipeline

To avoid incurring costs, click Deactivate pipeline to delete all resources created by the pipeline.

When you deactivate a pipeline, you have the option of retaining or deleting topics in the pipeline.

  1. Click the settings icon (settings-icon).

    The Pipeline Settings dialog opens.

  2. Click Deactivate pipeline to delete all resources created by the pipeline.

    The Revert pipeline to draft? dialog appears. Click the dropdowns to delete or retain the listed topics. For this example, keep the Delete settings.

    Stream Designer showing the Revert dialog in Confluent Cloud Console
  3. Click Confirm and revert to draft to deactivate the pipeline and delete topics.

Step 11: Delete the pipeline

When all components have completed deactivation, you can delete the pipeline safely.

  1. Click the settings icon.

    The Pipeline Settings dialog opens.

    Stream Designer showing filtered messages flowing in Confluent Cloud Console
  2. Click Delete pipeline. In the Delete pipeline dialog, enter “confirm” and click Confirm.

  3. The pipeline and associated resources are deleted. You are returned to the Pipelines list.