Create an Aggregation Pipeline for Stream Designer on Confluent Cloud¶
The following steps show how to create a pipeline in Stream Designer that aggregates clickstream data and reports errors in a table.
Step 1: Create a pipeline project¶
A Stream Designer pipeline project defines all the components that are deployed for an application. In this step, you create a pipeline project and a canvas for designing the component graph.
Log in to the Confluent Cloud Console and open the Cluster Overview page for the cluster you want to use for creating pipelines.
In the navigation menu, click Stream Designer.
Click Create pipeline.
The Create a new pipeline page opens.
Step 2: Create a connector definition¶
Your pipeline starts with data produced by the Datagen source connector. In this step, you create a pipeline definition for a connector that produces mock clickstream data to a Kafka topic.
Click Start with Connector and click Start building.
The Stream Designer canvas opens, with the Source Connector details view visible.
In the Source Connector page click the Datagen Source tile to open the Configuration page.
In the Topic textbox, type “clicks” and click Continue to open the Kafka credentials page.
Ensure that the the Global access tile is selected and click Generate API key & download to create the API key for the Datagen connector.
A text file containing the newly generated API key and secret is downloaded to your local machine.
Click Continue to configure the connector’s output.
In the Select output record value format section, click JSON_SR, and in the Select a template section, click Clickstream.
Click Continue to open the Sizing page.
In the Connector sizing section, leave the Minimum number of tasks setting at
1
.Click Continue to open the Review and launch page.
In the Connector name textbox, enter “Datagen_clickstream” and click Continue.
The Datagen source connector is configured and appears on the canvas with a corresponding topic component. The topic component is configured with the name you provided during connector configuration. Also, a stream is registered on the topic.
Step 3: Register a stream on the topic¶
Stream Designer enables registering a stream on an underlying topic. In this step, you create a stream named “clickstream” that corresponds with the “clicks” topic.
Click the Stream component to open the Stream Configuration dialog.
In the Name textbox, enter “CLICKSTREAM”.
In the Value Format dropdown, select JSON_SR.
Click Save.
The Topic component updates with the stream name.
Step 4: Create a filter query definition¶
Stream Designer enables defining queries on your streams. In this step, you create
the definition for a filter component and specify its behavior by using a
simple SQL statement. The filter produces a stream of click events that have
a status code greater than 400
.
Hover over the CLICKSTREAM component and click +.
A context menu appears showing the components that accept a stream as input.
In the context menu, click Filter.
A Query component appears and the filter configuration dialog opens.
Note
You can also add a filter by clicking Filter in the Components menu and manually connecting the components on the canvas.
In the Filter Expression field, enter the following SQL:
(CAST(CLICKSTREAM.STATUS AS INT) > 400)
Click Save to create the filter definition.
The query component has an error state because it needs a sink topic or another query for its output, which you provide in the next step.
Step 5: Create a GROUP BY query definition¶
The next stage of the pipeline groups click messages by their status code, within a 60-second hopping window.
Hover over the filter component and click +.
A context menu appears showing the components that accept a stream as input.
In the context menu, click Group By.
A Group By component appears in the query, and the configuration dialog opens.
Note
You can also add a GROUP BY clause by clicking Group By in the Components menu and manually connecting the components on the canvas.
In the Group by Expression textbox, enter “CLICKSTREAM.STATUS”.
Click the Window dropdown and select HOPPING.
Fields for the hopping window parameters appear.
Configure the window with the following values:
- Window Size Duration: 60
- Window Size Unit: seconds
- Advance By Duration: 20
- Advance By Unit: seconds
Your configuration should resemble:
Click Save.
Step 6: Create a HAVING query definition¶
The next stage of the pipeline extracts clicks from an aggregation that has
more than 5
of the same type of error within the time window.
Hover over the Group By component and click +.
A context menu appears showing the components that accept grouped input.
In the context menu, click Having.
A Having component appears in the query, and the configuration dialog opens.
Note
You can also add a HAVING clause by clicking Having in the Components menu and manually connecting the components on the canvas.
In the Having Expression textbox, enter the following SQL:
(COUNT(*) > 5) AND (COUNT(*) IS NOT NULL)
Your configuration should resemble:
Click Save.
Step 7: Create a mapping definition¶
In this step, you define a projection that maps columns to an output table.
Hover over the Having component and click +.
A context menu appears showing the components that accept HAVING input.
In the context menu, click Project.
A Project component appears in the query, and the configuration dialog opens.
Note
You can also add a projection by clicking Project in the Components menu and manually connecting the components on the canvas.
In the Projection Expression textbox, enter the following SQL, which defines the output table’s key column to be the status code.
STATUS AS K1
Click Add Projection Expression and in the textbox, enter the following SQL:
AS_VALUE(STATUS) AS STATUS
Click Add Projection Expression and in the textbox, enter the following SQL:
COUNT(*) AS ERRORS
Click Add Projection Expression and in the textbox, enter the following SQL:
WINDOWSTART AS EVENT_TS
Your configuration should resemble:
Click Save.
The query definition is complete. The query component shows an error state because it needs a sink topic for its output, which you provide in the next step.
Step 8: Create an error summary table definition¶
The last step is to create a table from the projected columns that reports the error counts within the window.
Hover over the Project component and click +.
A context menu appears showing the components that accept mapping input.
In the context menu, click Table.
A Table component appears on the canvas, and the configuration dialog opens.
Note
You can also add a table by clicking Table in the Components menu and manually connecting the components on the canvas.
In the Name textbox, enter “errors_per_minute_alert” and click Save.
The aggregation pipeline is ready to activate.
Step 9: Activate the pipeline¶
In this step, you enable security for the pipeline and activate it.
Click Activate to deploy the pipeline components.
The Pipeline activation dialog opens.
In the ksqlDB Cluster dropdown, select the ksqlDB cluster to use for your pipeline logic.
Note
If you don’t have a ksqlDB cluster yet, click Create new ksqlDB cluster to open the ksqlDB Cluster page and then click Add cluster. When you’ve finished creating the ksqlDB cluster, return to the Create new pipeline dialog and click the refresh button to see your new ksqlDB cluster in the dropdown.
In the Activation privileges section, click Grant privileges.
Click Confirm to activate your pipeline.
After a few seconds, the state of each component goes from Activating to Activated.
Note
If the filter component reports an error like
Did not find any value schema for the topic
, wait for the Datagen source connector to provision completely and activate the pipeline again.When all components show the Activated state, click the errors_per_minute topic, and in the details view, click Messages.
The aggregated status messages appear, showing the number of errors and the timestamp of the window in which the messages were aggregated.
Step 10: Deactivate the pipeline¶
To avoid incurring costs, click Deactivate pipeline to delete all resources created by the pipeline.
When you deactivate a pipeline, you have the option of retaining or deleting topics in the pipeline.
-
The Pipeline Settings dialog opens.
Click Deactivate pipeline to delete all resources created by the pipeline.
The Revert pipeline to draft? dialog appears. Click the dropdowns to delete or retain the listed topics. For this example, keep the Delete settings.
Click Confirm and revert to draft to deactivate the pipeline and delete topics.
Step 11: Delete the pipeline¶
When all components have completed deactivation, you can delete the pipeline safely.
Click the settings icon.
The Pipeline Settings dialog opens.
Click Delete pipeline. In the Delete pipeline dialog, enter “confirm” and click Confirm.
The pipeline and associated resources are deleted. You are returned to the Pipelines list.