Quick Start for Confluent Cloud¶
Confluent Cloud is a resilient, scalable, streaming data service based on Apache Kafka®, delivered as a fully managed service. Confluent Cloud has a web interface called the Confluent Cloud Console, a local command line interface, and REST APIs. You can manage cluster resources, settings, and billing with the Cloud Console. You can use the Confluent CLI and REST APIs to create and manage Kafka topics and more.
This quick start gets you up and running with Confluent Cloud using a Basic Kafka cluster. The first section shows how to use Confluent Cloud to create topics, and produce and consume data to and from the cluster. The second section walks you through how to add ksqlDB to the cluster and perform queries on the data using a SQL-like syntax.
Section 1: Create a cluster, add a topic¶
Follow the steps in this section to set up a Kafka cluster on Confluent Cloud and produce data to Kafka topics on the cluster.
Note
Confluent Cloud Console includes an in-product tutorial that guides you through the basic steps for setting up your environment. To start the tutorial, log in and choose Learn.
- Prerequisites
- Access to Confluent Cloud. To get started for free, see Deploy Free Clusters on Confluent Cloud.
- Internet connectivity
Step 1: Create a Kafka cluster in Confluent Cloud¶
In this step, you create an environment, select a cloud provider, and then create and launch a basic Kafka cluster inside your new environment.
Sign in to Confluent Cloud at https://confluent.cloud.
Click Add cluster.
On the Create cluster page, for the Basic cluster, select Begin configuration.
This example creates a Basic Kafka cluster, which supports single zone availability. For information about other cluster types, see Kafka Cluster Types in Confluent Cloud.
On the Region/zones page, choose a cloud provider, region, and select single availability zone.
Select Continue.
Note
If you haven’t set up a payment method, you see the Set payment page. Enter payment method and select Review or select Skip payment.
Specify a cluster name, review the configuration and cost information, and then select Launch cluster.
Depending on the chosen cloud provider and other settings, it may take a few minutes to provision your cluster, but after the cluster has provisioned, the Cluster Overview page displays.
Now you can get started configuring apps and data on your new cluster.
Log in or sign up to Confluent Cloud:
Log in:
confluent login
Create a Kafka cluster:
confluent kafka cluster create <name> [flags]
For example:
confluent kafka cluster create quickstart_cluster --cloud "aws" --region "us-west-2"
Create a Kafka cluster.
Request:
POST /cmk/v2/clusters
Host: api.confluent.cloud
{
"spec": {
"display_name": "quickstart_cluster",
"availability": "SINGLE_ZONE",
"cloud": "{provider}",
"region": "{region}",
"config": {
"kind": "Basic"
},
"environment": {
"id": "env-a12b34"
}
}
}
Response:
{
"api_version": "cmk/v2",
"id": "lkc-000000",
"kind": "Cluster",
"metadata": {
"created_at": "2022-11-21T22:50:07.496522Z",
"resource_name": "crn://confluent.cloud/organization=example1-org1-1111-2222-33aabbcc444dd55/environment=env-00000/cloud-cluster=lkc-000000/kafka=lkc-000000",
"self": "https://api.confluent.cloud/cmk/v2/clusters/lkc-000000",
"updated_at": "2022-11-21T22:50:07.497443Z"
},
"spec": {
"api_endpoint": "https://pkac-{00000}.{region}.{provider}.confluent.cloud",
"availability": "SINGLE_ZONE",
"cloud": "{provider}",
"config": {
"kind": "Basic"
},
"display_name": "quickstart_cluster",
"environment": {
"api_version": "org/v2",
"id": "env-a12b34",
"kind": "Environment",
"related": "https://api.confluent.cloud/org/v2/environments/env-a12b34",
"resource_name": "crn://confluent.cloud/organization=example1-org1-1111-2222-33aabbcc444dd55/environment=env-a12b34}"
,
"http_endpoint": "https://pkc-{00000}.{region}.{provider}.confluent.cloud:443",
"kafka_bootstrap_endpoint": "SASL_SSL://pkc-{00000}.{region}.{provider}.confluent.cloud:9092",
"region": "{region}"
},
"status": {
"phase": "PROVISIONING"
}
}
Step 2: Create a Kafka topic¶
In this step, you create a users
topic by using the Cloud Console. A Kafka topic
is a unit of organization for a cluster, and is essentially an append-only log.
For more about topics, see What is Apache Kafka.
From the navigation menu, click Topics, and then click Create topic.
In the Topic name field, type “users” and then select Create with defaults.
The users
topic is created on the Kafka cluster and is available for use
by producers and consumers.
The success message may prompt you to take an action, but you should continue with Step 3: Create a sample producer.
Create a topic:
confluent kafka topic create <name> [flags]
For example:
confluent kafka topic create users --cluster lkc-000000
Create a topic:
Request:
POST /kafka/v3/clusters/{cluster_id}/topics
Host: pkc-{00000}.{region}.{provider}.confluent.cloud
{
"topic_name": "users",
"partitions_count": 6,
"replication_factor": 3,
"configs": [{
"name": "cleanup.policy",
"value": "delete"
},
{
"name": "compression.type",
"value": "gzip"
}
]
}
Response:
{
"kind": "KafkaTopic",
"metadata": {
"self": "https://pkc-{00000}.{region}.{provider}.confluent.cloud/kafka/v3/clusters/quickstart/topics/users",
"resource_name": "crn:///kafka=quickstart/topic=users"
},
"cluster_id": "quickstart",
"topic_name": "users",
"is_internal": false,
"replication_factor": 3,
"partitions_count": 1,
"partitions": {
"related": "https://pkc-{00000}.{region}.{provider}.confluent.cloud/kafka/v3/clusters/cluster-1/topics/topic-X/partitions"
},
"configs": {
"related": "https://pkc-{00000}.{region}.{provider}.confluent.cloud/kafka/v3/clusters/cluster-1/topics/topic-X/configs"
},
"partition_reassignments": {
"related": "https://pkc-{00000}.{region}.{provider}.confluent.cloud/kafka/v3/clusters/cluster-1/topics/topic-X/partitions/-/reassignments"
}
}
Step 3: Create a sample producer¶
You can produce example data to your Kafka cluster by using the hosted Datagen Source Connector for Confluent Cloud.
From the navigation menu, select Connectors.
To open Confluent Cloud at Connectors: https://confluent.cloud/go/connectors.
In the Search box, type “datagen”.
From the search results, select the Datagen Source connector.
On the Topic selection pane, select the users topic you created in the previous section and then select Continue.
In the Kafka credentials pane, leave Global access selected, and click Generate API key & download. This creates an API key and secret that allows the connector to access your cluster, and downloads the key and secret to your computer.
The key and secret are required for the connector and also for the Confluent CLI and ksqlDB CLI to access your cluster.
Note
An API key and associated secret apply to the active Kafka cluster. If you add a new cluster, you must create a new API key for producers and consumers on the new Kafka cluster. For more information, see Use API Keys to Control Access in Confluent Cloud.
Enter “users” as the description for the key, and click Continue.
On the Configuration page, select JSON for the output record value format, Users for template, and then click Continue.
For Connector sizing, leave the slider at the default of 1 task and click Continue
On the Review and launch page, select the text in the Connector name box and replace it with “DatagenSourceConnector_users”.
Click Continue to start the connector.
The status of your new connector should read Provisioning, which lasts for a few seconds. When the status changes to Running, your connector is producing data to the
users
topic.
Create an API key:
confluent api-key create --resource <cluster_id> [flags]
For example:
confluent api-key create --resource lkc-000000
Example output:
+---------+------------------------------------------------------------------+ | API Key | EXAMPLEERFBSSSLK | | Secret | EXAMPLEEkYXOtOmn+En8397gCaeX05j0szygokwLRk1ypVby1UsgZpZLX7gJGR4G | +---------+------------------------------------------------------------------+
It may take a couple of minutes for the API key to be ready. Save the API key and secret. The secret is not retrievable later.
Create a JSON file named
quick-start.json
and copy and paste the following configuration properties into the file:{ "name" : "DatagenSourceConnector_users", "connector.class": "DatagenSource", "kafka.auth.mode": "KAFKA_API_KEY", "kafka.api.key": "[Add your cluster API key here]", "kafka.api.secret" : "[Add your cluster API secret here]", "kafka.topic" : "users", "output.data.format" : "JSON", "quickstart" : "USERS", "tasks.max" : "1" }
Replace
[Add your cluster API key here]
and[Add your cluster API secret here]
with your API key and secret.Create the sample producer:
confluent connect cluster create --config-file <file-name>.json --cluster <cluster_id>
For example:
confluent connect cluster create --config-file quick-start.json --cluster lkc-000000
Create a producer.
To create a producer with the Confluent Cloud APIs, you need two API keys:
- Cloud API key added to the header for authorization
- Kafka cluster API key added to the body for access to the cluster
Use the Confluent CLI or the Cloud Console to generate an API key for the Kafka cluster. For more information, see Authentication in the API reference.
Request:
POST /connect/v1/environments/{environment_id}/clusters/{cluster_id}/connectors
Host: api.confluent.cloud
{
"name": "DatagenSourceConnector_users",
"config": {
"name": "DatagenSourceConnector_users"
"connector.class": "DatagenSource",
"kafka.auth.mode": "KAFKA_API_KEY",
"kafka.api.key": "[Add your cluster API key here]",
"kafka.api.secret" : "[Add your cluster API secret here]",
"kafka.topic" : "users",
"output.data.format" : "JSON",
"quickstart" : "USERS",
"tasks.max" : "1"
}
}
Replace [Add your cluster API key here]
and [Add your cluster API secret here]
with your cluster API key and secret.
Response:
{
"name": "DatagenSourceConnector_users",
"type": "source",
"config": {
"cloud.environment": "prod",
"cloud.provider": "{provider}",
"connector.class": "DatagenSource",
"kafka.api.key": "[Your cluster API key]",
"kafka.api.secret": "[Your cluster API secret]",
"kafka.auth.mode": "KAFKA_API_KEY",
"kafka.endpoint": "SASL_SSL://pkc-{00000}.{region}.{provider}.confluent.cloud:9092",
"kafka.region": "{region}",
"kafka.topic": "users1",
"name": "DatagenSourceConnector_users",
"output.data.format": "JSON",
"quickstart": "USERS",
"tasks.max": "1"
},
"tasks": []
}
Step 4: Consume messages¶
Your new users
topic is now consuming messages. Use Confluent Cloud Console to see the data.
From the navigation menu, select Topics to show the list of topics in your cluster.
Select the users topic.
In the users topic detail page, select the Messages tab to view the messages being produced to the topic. The message viewer shows messages produced since the page was loaded, but it doesn’t show a historical view.
Step 5: Enable a Stream Governance package¶
Now you will enable a Stream Governance package so that you can track the movement of data through your cluster. This enables you to see sources, sinks, and topics and monitor messages as they move from one to another.
If your organization already has Stream Governance enabled, you can skip to Step 6: Inspect the data stream.
Click Stream Lineage in the navigation menu (it will be grayed out) and click Stream Governance Package in the popup to go to Stream Governance Packages.
To open Confluent Cloud at Stream Governance Packages: https://confluent.cloud/go/schema-registry.
Under the Essentials package, choose Begin configuration.
On the Enable Stream Governance Essentials page, choose a cloud provider and a free region for that provider, and then click Enable.
For example, choose AWS and Ohio (us-east-2) for $0/hr.
Now, click the cluster you created for the Quick Start.
From the navigation menu in the cluster, click Topics and then click the users topic. In the users topic, click See in Stream Lineage at the top right. The stream lineage for the
users
topic is shown.
Step 6: Inspect the data stream¶
Use Stream Lineage to track data movement through your cluster.
Click Stream Lineage in the navigation menu.
Click the node labeled DatagenSourceConnector_users, which is the connector that you created in Step 3. The details view opens, showing graphs for total production and other data.
Dismiss the details view and select the topic labeled users. The details view opens, showing graphs for total throughput and other data.
Click the arrow on the left border of the canvas to open the navigation menu.
(optional) Step 7: Delete the connector and topic¶
Skip this step if you plan to move on to Section 2: Add ksqlDB to the cluster and learn how to use SQL statements to query your data.
If you don’t plan to complete Section 2 and you’re ready to quit the Quick Start, delete the resources you created to avoid unexpected charges to your account.
- Delete the connector:
- From the navigation menu, select Connectors.
- Click DatagenSourceConnector_users and choose the Settings tab.
- Click Delete connector, enter the connector name (
DatagenSourceConnector_users
), and click Confirm.
- Delete the topic:
- From the navigation menu, click Topics, select the users topic, and then choose the Configuration tab.
- Click Delete topic, enter the topic name (
users
), and select Continue.
Delete the connector:
confluent connect delete <connector-id> [flags]
For example:
confluent connect delete lcc-aa1234 --cluster lkc-000000
Delete the topic:
confluent kafka topic delete <topic name> [flags]
For example:
confluent kafka topic delete users --cluster lkc-000000
Delete a producer.
Request:
DELETE /connect/v1/environments/{environment_id}/clusters/{kafka_cluster_id}/connectors/{connector_name}
Host: api.confluent.cloud
Delete a topic.
Request:
DELETE /kafka/v3/clusters/{kafka_cluster_id}/topics/{topic_name}
Host: pkc-{0000}.{region}.{provider}.confluent.cloud
Section 2: Add ksqlDB to the cluster¶
In Section 1, you installed a Datagen connector to produce data
to the users
topic in your Confluent Cloud cluster.
In this section, you will create a ksqlDB cluster, and a stream and a table in that cluster, and write queries against them.
Note
This section uses the Cloud Console to create a ksqlDB cluster. For an introduction that uses the Confluent CLI exclusively, see ksqlDB Quickstart for Confluent Cloud.
Step 1: Create a ksqlDB cluster in Confluent Cloud¶
To write queries against streams and tables, create a new ksqlDB cluster in Confluent Cloud.
Select the cluster you created in Section 1, and in the navigation menu, click ksqlDB.
Click Create cluster myself.
On the New cluster page, ensure that Global access is selected, and click Continue.
- To enable stricter access control for the new ksqlDB cluster, click Granular access and follow these steps.
- If you see an alert that reads, “IMPORTANT: Confirm that the user or service account has the required privileges to access Schema Registry”, follow the steps in Enable ksqlDB Integration with Schema Registry on Confluent Cloud to configure your ksqlDB cluster to access Schema Registry.
On the Configuration page, enter
ksqldb-app1
for the Cluster name. In the Cluster size dropdown, select 1 and keep the configuration options in Default state. For more information on cluster sizes and configuration options, see Manage Billing in Confluent Cloud and Configuration Options.Click Launch cluster. The ksqlDB clusters page opens, and the new cluster appears in the list. The new cluster has a Provisioning status. It may take a few minutes to provision the ksqlDB cluster. When the ksqlDB cluster is ready, its Status changes from Provisioning to Up.
The new ksqlDB cluster appears in the clusters list.
Step 2: Create the pageviews topic¶
In Section 1, you created the users
topic by using
the Cloud Console. In this step, you create the pageviews
topic
the same way.
In the navigation menu, click Topics, and in the Topics page, click Add topic.
In the Topic name field, type “pageviews”. Click Create with defaults.
The pageviews
topic is created on the Kafka cluster and is available for use
by producers and consumers.
Create a topic:
confluent kafka topic create <name> [flags]
For example:
confluent kafka topic create pageviews --cluster lkc-000000
Create a topic:
Request:
POST /kafka/v3/clusters/{cluster_id}/topics
Host: pkc-{00000}.{region}.{provider}.confluent.cloud
{
"topic_name": "pageviews",
"partitions_count": 6,
"replication_factor": 3,
"configs": [{
"name": "cleanup.policy",
"value": "delete"
},
{
"name": "compression.type",
"value": "gzip"
}
]
}
Response:
{
"kind": "KafkaTopic",
"metadata": {
"self": "https://pkc-{00000}.{region}.{provider}.confluent.cloud/kafka/v3/clusters/quickstart/topics/pageviews",
"resource_name": "crn:///kafka=quickstart/topic=pageviews"
},
"cluster_id": "quickstart",
"topic_name": "pageviews",
"is_internal": false,
"replication_factor": 3,
"partitions_count": 1,
"partitions": {
"related": "https://pkc-{00000}.{region}.{provider}.confluent.cloud/kafka/v3/clusters/cluster-1/topics/topic-X/partitions"
},
"configs": {
"related": "https://pkc-{00000}.{region}.{provider}.confluent.cloud/kafka/v3/clusters/cluster-1/topics/topic-X/configs"
},
"partition_reassignments": {
"related": "https://pkc-{00000}.{region}.{provider}.confluent.cloud/kafka/v3/clusters/cluster-1/topics/topic-X/partitions/-/reassignments"
}
}
The pageviews
topic is created on the Kafka cluster and is available for use by producers and consumers.
Step 3: Produce pageview data to Confluent Cloud¶
In this step, you create a Datagen connector for the pageviews
topic, using
the same procedure that you used to create DatagenSourceConnector_users
.
In the navigation menu, select Connectors.
In the Search connectors box, enter “datagen”.
From the search results, select the Datagen Source connector.
On the Topic selection pane, select the pageviews topic you created in the previous section and click Continue.
In the API credentials pane, leave Global access selected, and click Generate API key & download. This creates an API key and secret that allows the connector to access your cluster, and downloads the key and secret to your computer.
The key and secret are required for the connector and also for the Confluent CLI and ksqlDB CLI to access your cluster.
Note
An API key and associated secret apply to the active Kafka cluster. If you add a new cluster, you must create a new API key for producers and consumers on the new Kafka cluster. For more information, see Use API Keys to Control Access in Confluent Cloud.
Enter “pageviews” as the description for the key, and click Continue.
On the Configuration page, select JSON_SR for the output record value format, Pageviews for template, and then click Continue.
Selecting JSON_SR configures the connector to associate a schema with the
pageviews
topic and register it with Schema Registry. Currently, importing a topic as a stream works only for the JSON_SR format.For Connector sizing, leave the slider at the default of 1 task and click Continue
On the Review and launch page, select the text in the Connector name box and replace it with “DatagenSourceConnector_pageviews”.
Click Continue to start the connector.
The status of your new connector should read Provisioning, which lasts for a few seconds. When the status of the new connector changes from Provisioning to Running, you have two producers sending event streams to topics in your Confluent Cloud cluster.
Create an API key:
confluent api-key create --resource <cluster_id> [flags]
For example:
confluent api-key create --resource lkc-000000
Example output:
+---------+------------------------------------------------------------------+ | API Key | EXAMPLEERFBSSSLK | | Secret | EXAMPLEEkYXOtOmn+En8397gCaeX05j0szygokwLRk1ypVby1UsgZpZLX7gJGR4G | +---------+------------------------------------------------------------------+
It may take a couple of minutes for the API key to be ready. Save the API key and secret. The secret is not retrievable later.
Create a JSON file named
quick-start.json
and copy and paste the following configuration properties into the file:{ "name" : "DatagenSourceConnector_users", "connector.class": "DatagenSource", "kafka.auth.mode": "KAFKA_API_KEY", "kafka.api.key": "[Add your cluster API key here]", "kafka.api.secret" : "[Add your cluster API secret here]", "kafka.topic" : "pageviews", "output.data.format" : "JSON_SR", "quickstart" : "PAGEVIEWS", "tasks.max" : "1" }
Replace
[Add your cluster API key here]
and[Add your cluster API secret here]
with your API key and secret.Create the sample producer:
confluent connect cluster create --config-file <file-name>.json --cluster <cluster_id>
For example:
confluent connect cluster create --config-file quick-start.json --cluster lkc-000000
Create a producer.
To create a producer with the Confluent Cloud APIs, you need two API keys:
- Cloud API key added to the header for authorization
- Kafka cluster API key added to the body for access to the cluster
Use the Confluent CLI or the Cloud Console to generate an API key for the Kafka cluster. For more information, see Authentication in the API reference.
Request:
POST /connect/v1/environments/{environment_id}/clusters/{cluster_id}/connectors
Host: api.confluent.cloud
{
"name": "DatagenSourceConnector_users",
"config": {
"name": "DatagenSourceConnector_users"
"connector.class": "DatagenSource",
"kafka.auth.mode": "KAFKA_API_KEY",
"kafka.api.key": "[Add your cluster API key here]",
"kafka.api.secret" : "[Add your cluster API secret here]",
"kafka.topic" : "users",
"output.data.format" : "JSON_SR",
"quickstart" : "PAGEVIEWS",
"tasks.max" : "1"
}
}
Replace [Add your cluster API key here]
and [Add your cluster API secret here]
with your cluster API key and secret.
Response:
{
"name": "DatagenSourceConnector_users",
"type": "source",
"config": {
"cloud.environment": "prod",
"cloud.provider": "{provider}",
"connector.class": "DatagenSource",
"kafka.api.key": "[Your cluster API key]",
"kafka.api.secret": "[Your cluster API secret]",
"kafka.auth.mode": "KAFKA_API_KEY",
"kafka.endpoint": "SASL_SSL://pkc-{00000}.{region}.{provider}.confluent.cloud:9092",
"kafka.region": "{region}",
"kafka.topic": "users1",
"name": "DatagenSourceConnector_users",
"output.data.format": "JSON_SR",
"quickstart": "PAGEVIEWS",
"tasks.max": "1"
},
"tasks": []
}
Step 4: Create tables and streams¶
In the next two steps, you create a table for the users
topic and a stream for the pageviews
topic by using familiar SQL syntax. When you register a
stream or a table on a topic, you can use the stream/table in SQL statements.
- A table is a mutable collection that models change over time. Tables work by leveraging the keys of each row. If a sequence of rows shares a key, the last row for a given key represents the most up-to-date information for that key’s identity. A background process periodically runs and deletes all but the newest rows for each key.
- A stream is an immutable append-only collection that represents a series of historical facts, or events. Once a row is inserted into a stream, the row can never change. You can append new rows at the end of the stream, but you can’t update or delete existing rows.
Together, tables and streams comprise a fully realized database. For more information, see Stream processing
These examples query records from the pageviews
and users
topics using
the following schema.
Step 5: Create a table in the ksqlDB editor¶
You can create a stream or table by using the CREATE STREAM and CREATE TABLE statements in the ksqlDB Editor, similar to how you use them in the ksqlDB CLI.
Use the CREATE TABLE statement to register a table on a topic.
In the navigation menu, click ksqlDB.
In the ksqlDB clusters list, click ksqldb-app1.
Make sure the Editor tab is selected, copy the following code into the editor window, and click Run query.
CREATE TABLE users (userid VARCHAR PRIMARY KEY, registertime BIGINT, gender VARCHAR, regionid VARCHAR) WITH (KAFKA_TOPIC='users', VALUE_FORMAT='JSON');
Your output should resemble:
To create a tab in the editor window on a Mac, press
Option+Tab
.Clear the editor window, and use the following SELECT query to inspect records in the
users
table. Click Run query.SELECT * FROM users EMIT CHANGES;
Your output should resemble:
The query continues until you end it explicitly. Click Stop to end the query.
Click Tables, and in the list, click USERS to open the details page.
Step 6: Create a stream in the ksqlDB editor¶
The Cloud Console automates registering a stream on a topic.
Click Streams to view the currently registered streams.
Click Import topics as streams.
The Import dialog opens.
Ensure that pageviews is selected, and then click Import to register a stream on the
pageviews
topic.The PAGEVIEWS stream is created and appears in the STREAM list.
Note
You can use the CREATE STREAM statement in the editor window to register a stream on a topic manually and specify the stream’s name.
CREATE STREAM pageviews (viewtime bigint, userid varchar, pageid varchar) WITH (kafka_topic='pageviews', value_format='JSON_SR');
Click Editor, and clear the editor window.
To create a tab in the editor window on a Mac, press
Option+Tab
.Copy the following SELECT query to inspect records in the
PAGEVIEWS
stream into the editor, and click Run query.SELECT * FROM PAGEVIEWS EMIT CHANGES;
Your output should resemble:
The query continues until you end it explicitly. Click Stop to end the query.
Step 7: Write a persistent query¶
With the pageviews
topic registered as a stream, and the users
topic
registered as a table, you can write a streaming join query that runs until you
end it with the TERMINATE statement.
Click Editor and copy the following code into the editor, clearing its previous contents, and click Run query.
CREATE STREAM pageviews_enriched AS SELECT users.userid AS userid, pageid, regionid, gender FROM PAGEVIEWS LEFT JOIN users ON PAGEVIEWS.userid = users.userid EMIT CHANGES;
Your output should resemble:
To create a tab in the editor window on a Mac, press
Option+Tab
.To inspect your persistent queries, click the Persistent queries tab, which shows details about the
pageviews_enriched
stream that you created in the previous query.Click Explain query to see the schema and query properties for the persistent query.
Step 8: Inspect data with the Flow view¶
Use the Flow view to monitor the topology of your application, inspect the details of streams, tables, and the SQL statements that create them. Additionally, you can track the flow of events through your application in real-time, making it easier to identify any potential issues and optimize your application’s performance.
To visualize data flow in your ksqlDB application, click the Flow tab.
Click the CREATE-STREAM node to see the query that you used to create the PAGEVIEWS_ENRICHED stream.
Click the PAGEVIEWS_ENRICHED node to see the stream’s events and schema.
Step 9: Monitor persistent queries¶
You can monitor your persistent queries visually using the Cloud Console.
In the navigation menu, select Clients and click the Consumer lag tab.
Find the group that corresponds with your
pageviews_enriched
stream, for example_confluent-ksql-pksqlc-lgwpnquery_CSAS_PAGEVIEWS_ENRICHED_5
. This view shows how well your persistent query is keeping up with the incoming data.
Step 10: Delete the connectors and topics¶
When you are finished with the Quick Start, delete the resources you created to avoid unexpected charges to your account.
- Delete the connectors:
- From the navigation menu, select Connectors.
- Click DatagenSourceConnector_users and choose the Settings tab.
- Click Delete connector, enter the connector name (
DatagenSourceConnector_users
), and click Confirm. - Repeat these steps with the DatagenSourceConnector_pageviews connector.
- Delete the topics:
- From the navigation menu, click Topics, select the users topic, and choose the Configuration tab.
- Click Delete topic, enter the topic name (
users
), and click Continue. - Repeat these steps with the
pageviews
topic.
Delete the connector:
confluent connect delete <connector-id> [flags]
For example:
confluent connect delete lcc-aa1234 --cluster lkc-000000
Delete the topic:
confluent kafka topic delete <topic name> [flags]
For example:
confluent kafka topic delete users --cluster lkc-000000
Delete a producer.
Request:
DELETE /connect/v1/environments/{environment_id}/clusters/{kafka_cluster_id}/connectors/{connector_name}
Host: api.confluent.cloud
Delete a topic.
Request:
DELETE /kafka/v3/clusters/{kafka_cluster_id}/topics/{topic_name}
Host: pkc-{0000}.{region}.{provider}.confluent.cloud