Create a Join Pipeline for Stream Designer on Confluent Cloud¶
The following steps show how to create a pipeline in Stream Designer that joins data from a stream and a table.
Step 1: Create a pipeline project¶
A Stream Designer pipeline project defines all the components that are deployed for an application. In this step, you create a pipeline project and a canvas for designing the component graph.
Log in to the Confluent Cloud Console and open the Cluster Overview page for the cluster you want to use for creating pipelines.
In the navigation menu, click Stream Designer.
Click Create pipeline.
The Create a new pipeline page opens.
Step 2: Create a pageviews connector definition¶
Your pipeline starts with data produced by the Datagen source connector. In this step, you create a pipeline definition for a connector that produces mock pageview data to a Kafka topic.
Click Start with Connector and then click Start building.
The Stream Designer canvas opens, with the Source Connector details view visible.
In the Source Connector page click the Datagen Source tile to open the Configuration page.
In the Topic textbox, type “pageviews_topic”.
Click Continue to open Kafka credentials page.
Ensure that the the Global access tile is selected and click Generate API key & download to create the API key for the Datagen connector.
A text file containing the newly generated API key and secret is downloaded to your local machine.
Click Continue to configure the connector’s output.
In the Select output record value format section, click JSON_SR, and in the Select a template section, click Pageviews.
Click Continue to open the Sizing page.
In the Connector sizing section, leave the minimum number of tasks at
1
.Click Continue to open the Review and launch page.
In the Connector name textbox, enter “Datagen_pageviews” and click Continue.
The Datagen source connector is configured and appears on the canvas with a corresponding topic component. The topic component is configured with the name you provided during connector configuration. Also, a stream is registered on the topic.
Step 3: Register a stream on the pageviews topic¶
Stream Designer enables registering a stream on an underlying topic.
In the Stream component, click Configure to open the stream configuration dialog.
In the Name textbox, enter “pageviews_stream”.
In the Value Format dropdown, select JSON_SR.
Click Save.
The Topic component updates with the stream name.
Step 4: Create a users connector definition¶
In this step, you create a pipeline definition for a connector that produces mock user data to a Kafka topic.
In the Components menu, click Source Connector.
In the Source Connector component, click Configure.
The Source Connector page opens.
In the search box, enter “datagen”.
Click the Datagen Source tile to open the Configuration page.
In the Topic textbox, type “users_topic”.
Click Continue.
The Kafka credentials page opens.
Ensure that the the Global access tile is selected and click Generate API key & download to create the API key for the Datagen connector.
A text file containing the newly generated API key and secret is downloaded to your local machine.
Click Continue.
In the Select output record value format section, click JSON, and in the Select a template section, click Users.
Click Continue.
In the Connector sizing section, leave the minimum number of tasks at
1
and click Continue.In the Connector name textbox, enter “Datagen_users” and click Continue.
The Datagen source connector is configured and appears on the canvas with a corresponding topic component. The topic component is configured with the name you provided during connector configuration.
Step 5: Register a table on the users topic¶
Stream Designer enables registering a table on an underlying topic. In this step, you create a table named “users_table” that corresponds with “users_topic”.
Right-click the Stream component and click Remove.
Hover over the Topic component and click the + icon that appears near the center.
A context menu opens.
In the context menu, click Table.
A Table component appears within the Topic component and, the Table Configuration dialog opens.
In the Table Configuration dialog, enter “users_table” in the Name textbox.
In the Value Format dropdown, select JSON.
In the Columns for the table textbox, enter the following SQL:
id VARCHAR PRIMARY KEY
Click Add Columns for the table and in the textbox, enter the following SQL:
REGISTERTIME BIGINT
Repeat the previous step for the following column definitions:
USERID STRING REGIONID STRING GENDER STRING
Your table configration should resemble:
Click Save.
The Topic component updates with the table name.
The start of the pipeline is defined, with a pageviews stream and a users table fed by corresponding Datagen source connectors.
Step 6: Join the stream and table¶
With pageviews_stream
and users_table
defined, you can join them
to produce an enriched stream that has pageviews per user messages.
In the Components menu, click Join.
A Join query appears on the canvas, and the Configuration dialog opens.
From users_table, drag an arrow to the Join component.
From pageviews_stream, drag an arrow to the Join component.
In the Join component, click Configure.
The Join Configuration dialog opens.
In the Reference to the left input source dropdown, Select pageviews_stream.
In the Alias of the left input source field, enter “p”.
In the Reference to the input source dropdown, select users_table.
In the Alias of the input source field, enter “u”.
In the Join Type dropdown, select LEFT OUTER.
In the Join on clause textbox, enter the following SQL:
p.userid = u.id
Your join configuration should resemble:
Click Save.
The query component displays a red error triangle because it requires a stream, table, or another query component for its output. In the next step, you add a sink topic with a corresponding stream for the filter output.
Step 7: Define the join topic¶
The join query requires a sink topic for the joined messages. In this step, you
define a user_pageviews
topic for the query results.
Hover over the Join component and click +.
A context menu appears showing the components that accept join results as an input.
In the context menu, click Stream.
A Topic component appears, and the stream configuration dialog opens.
Name the stream “user_pageviews” and click Save.
Click the topic component to open the configurtion dialog.
Name the topic “user_pageviews_topic”. Click Save.
The join pipeline is ready to activate.
Step 8: Activate the pipeline¶
When all components show the Activated state, click the user_pageviews topic, and in the details view, click Messages.
The joined messages appear, showing the pageviews stream enriched with data from the users table.
Step 9: Deactivate the pipeline¶
To avoid incurring costs, click Deactivate pipeline to delete all resources created by the pipeline.
When you deactivate a pipeline, you have the option of retaining or deleting topics in the pipeline.
-
The Pipeline Settings dialog opens.
Click Deactivate pipeline to delete all resources created by the pipeline.
The Revert pipeline to draft? dialog appears. Click the dropdowns to delete or retain the listed topics. For this example, keep the Delete settings.
Click Confirm and revert to draft to deactivate the pipeline and delete topics.
Step 10: Delete the pipeline¶
When all components have completed deactivation, you can delete the pipeline safely.
Click the settings icon.
The Pipeline Settings dialog opens.
Click Delete pipeline. In the Delete pipeline dialog, enter “confirm” and click Confirm.
The pipeline and associated resources are deleted. You are returned to the Pipelines list.