Example: Build an ETL Pipeline With Confluent Cloud¶
This example showcases an entire end-to-end cloud ETL deployment, built for 100% cloud services:
- Cloud source connectors: writes data to Kafka topics in Confluent Cloud from a cloud service, one of:
- Cloud sink connectors: writes data from Kafka topics in Confluent Cloud to cloud storage, one of:
- Confluent Cloud ksqlDB : streaming SQL engine that enables real-time data processing against Kafka
- Confluent Cloud Schema Registry: centralized management of schemas and compatibility checks as schemas evolve
Overview¶
As enterprises move more and more of their applications to the cloud, they are also moving their on-prem ETL (extract, transform, load) pipelines to the cloud, as well as building new ones. This example showcases a cloud ETL solution leveraging all fully-managed services on Confluent Cloud.
There are many powerful use cases for these real-time cloud ETL pipelines, and this example showcases one such use case—a log ingestion pipeline that spans multiple cloud providers. Using the Confluent CLI, the example creates a source connector that reads data from either an AWS Kinesis stream or AWS RDS PostgreSQL database into Confluent Cloud. Then it creates a Confluent Cloud ksqlDB application that processes that data. Finally, a sink connector writes the output data into cloud storage in the provider of your choice (one of GCP GCS, AWS S3, or Azure Blob).
The end result is an event streaming ETL, running 100% in the cloud, spanning multiple cloud providers. This enables you to:
- Build business applications on a full event streaming platform
- Span multiple cloud providers (AWS, GCP, Azure) and on-prem datacenters
- Use Kafka to aggregate data into a single source of truth
- Harness the power of ksqlDB for stream processing
Tip
For more information about building a cloud ETL pipeline on Confluent Cloud, see this blog post.
Data Flow¶
The data set is a stream of log messages, which in this example is mock data captured in eventlogs.json. It resembles this:
{"eventSourceIP":"192.168.1.1","eventAction":"Upload","result":"Pass","eventDuration":3}
{"eventSourceIP":"192.168.1.1","eventAction":"Create","result":"Pass","eventDuration":2}
{"eventSourceIP":"192.168.1.1","eventAction":"Delete","result":"Fail","eventDuration":5}
{"eventSourceIP":"192.168.1.2","eventAction":"Upload","result":"Pass","eventDuration":1}
{"eventSourceIP":"192.168.1.2","eventAction":"Create","result":"Pass","eventDuration":3}
Component | Consumes From | Produces To |
---|---|---|
Kinesis/PostgreSQL source connector | Kinesis stream or RDS PostgreSQL table | Kafka topic
eventlogs |
ksqlDB | eventlogs |
ksqlDB streams and tables |
GCS/S3/Blob sink connector | ksqlDB tables
COUNT_PER_SOURCE ,
SUM_PER_SOURCE |
GCS/S3/Blob |
Prerequisites¶
Cloud services¶
- Confluent Cloud
- Access to AWS and (optional) GCP or Azure
Local Tools¶
- Confluent CLI v3.0.0 or later, logged in with the
--save
argument which saves your Confluent Cloud user login credentials or refresh token (in the case of SSO) to the localnetrc
file. - Confluent CLI v1 (optional)
gsutil
CLI, properly initialized with your credentials: (optional) if destination is GCP GCSaws
CLI, properly initialized with your credentials: used for AWS Kinesis or RDS PostgreSQL, and (optional) if destination is AWS S3az
CLI, properly initialized with your credentials: (optional) if destination is Azure Blob storagepsql
: (optional) if source is RDS PostgreSQLjq
curl
timeout
: used by the bash scripts to terminate a consumer process after a certain period of time.timeout
is available on most Linux distributions but not on macOS. macOS users can installtimeout
viabrew install coreutils
.python
Cost to Run Tutorial¶
Caution¶
Confluent Cloud examples that use actual Confluent Cloud resources may be billable. An example may create a new Confluent Cloud environment, Kafka cluster, topics, ACLs, and service accounts, and resources that have hourly charges such as connectors and ksqlDB applications. To avoid unexpected charges, carefully evaluate the cost of resources before you start. After you are done running a Confluent Cloud example, destroy all Confluent Cloud resources to avoid accruing hourly charges for services and verify that they have been deleted.
This example also uses real resources from other cloud providers, including:
- AWS Kinesis or RDS PostgreSQL
- Cloud storage providers (one of GCP GCS, AWS S3, or Azure Blob) depending on your configuration
Run Example¶
Setup¶
Because this example interacts with real resources in Kinesis or RDS PostgreSQL, a destination storage service, and Confluent Cloud, you must set up some initial parameters to communicate with these services.
This example creates a new Confluent Cloud environment with required resources to run this example. As a reminder, this example uses real Confluent Cloud resources and you may incur charges so carefully evaluate the cost of resources before launching the example.
Clone the confluentinc/examples GitHub repository, and check out the
current-post
branch..git clone https://github.com/confluentinc/examples cd examples git checkout current-post
Change directory to the
cloud-etl
example.cd cloud-etl
Modify the example configuration file at config/demo.cfg. Set the proper credentials and parameters for the source:
- AWS Kinesis
DATA_SOURCE='kinesis'
KINESIS_STREAM_NAME
KINESIS_REGION
AWS_PROFILE
- AWS RDS (PostgreSQL)
DATA_SOURCE='rds'
DB_INSTANCE_IDENTIFIER
RDS_REGION
AWS_PROFILE
- AWS Kinesis
In the same example configuration file at config/demo.cfg, set the required parameters for the destination cloud storage provider:
- GCP GCS
DESTINATION_STORAGE='gcs'
GCS_CREDENTIALS_FILE
GCS_PROJECT_ID
GCS_BUCKET
- AWS S3
DESTINATION_STORAGE='s3'
S3_PROFILE
S3_BUCKET
- Azure Blob
DESTINATION_STORAGE='az'
AZBLOB_STORAGE_ACCOUNT
AZBLOB_CONTAINER
- GCP GCS
Log in to Confluent Cloud with the command
confluent login --save
, and use your Confluent Cloud username and password. The--save
argument saves your Confluent Cloud user login credentials or refresh token (in the case of SSO) to the localnetrc
file.confluent login --save
Run¶
This example uses the ccloud-stack utility for Confluent Cloud to automatically
create a stack of fully managed services in Confluent Cloud. By default, the ccloud-stack
utility creates resources in a
new Confluent Cloud environment in cloud provider aws
in region us-west-2
. If you want to reuse an existing Confluent Cloud
environment, or if aws
and us-west-2
are not the target provider and region, you may configure
other ccloud-stack options before you run this example.
Run the example. Set the cloud provider and region for your Confluent Cloud cluster when you start the example, so that they match the destination cloud storage provider and region. This will take several minutes to complete as it creates new resources in Confluent Cloud and other cloud providers.
# Example for running to AWS S3 in us-west-2 CLUSTER_CLOUD=aws CLUSTER_REGION=us-west-2 ./start.sh # Example for running to GCP GCS in us-west2 CLUSTER_CLOUD=gcp CLUSTER_REGION=us-west2 ./start.sh # Example for running to Azure Blob in westus2 CLUSTER_CLOUD=azure CLUSTER_REGION=westus2 ./start.sh
As part of this script run, the ccloud-stack utility for Confluent Cloud creates a new Confluent Cloud stack of fully-managed resources and also generates a local configuration file with all connection information, cluster IDs, and credentials, which is useful for other demos/automation. View this local configuration file, where
SERVICE ACCOUNT ID
is auto-generated by the script.cat stack-configs/java-service-account-<SERVICE ACCOUNT ID>.config
Your output should resemble:
# -------------------------------------- # Confluent Cloud connection information # -------------------------------------- # ENVIRONMENT ID: <ENVIRONMENT ID> # SERVICE ACCOUNT ID: <SERVICE ACCOUNT ID> # KAFKA CLUSTER ID: <KAFKA CLUSTER ID> # SCHEMA REGISTRY CLUSTER ID: <SCHEMA REGISTRY CLUSTER ID> # KSQLDB APP ID: <KSQLDB APP ID> # -------------------------------------- security.protocol=SASL_SSL sasl.mechanism=PLAIN bootstrap.servers=<BROKER ENDPOINT> sasl.jaas.config=org.apache.kafka.common.security.plain.PlainLoginModule required username='<API KEY>' password='<API SECRET>'; basic.auth.credentials.source=USER_INFO schema.registry.basic.auth.user.info=<SR API KEY>:<SR API SECRET> schema.registry.url=https://<SR ENDPOINT> ksql.endpoint=<KSQLDB ENDPOINT> ksql.basic.auth.user.info=<KSQLDB API KEY>:<KSQLDB API SECRET>
Log into the Confluent Cloud Console .
Connectors¶
The example automatically created Kafka Connect connectors using the Confluent CLI command
confluent connect create
that included passing in connector configuration files from the connector configuration directory:- AWS Kinesis source connector configuration file
- PostgreSQL source connector configuration file
- GCS sink connector configuration file
- GCS sink connector with Avro configuration file
- S3 sink connector configuration file
- S3 sink connector with Avro configuration file
- Azure Blob sink connector configuration file
- Azure Blob sink connector with Avro configuration file
For example, if you configured the example to source data from Kinesis, it ran this AWS Kinesis connector configuration file.
{ "name": "demo-KinesisSource", "connector.class": "KinesisSource", "tasks.max": "1", "kafka.api.key": "$CLOUD_KEY", "kafka.api.secret": "$CLOUD_SECRET", "aws.access.key.id": "$AWS_ACCESS_KEY_ID", "aws.secret.key.id": "$AWS_SECRET_ACCESS_KEY", "kafka.topic": "$KAFKA_TOPIC_NAME_IN", "kinesis.region": "$KINESIS_REGION", "kinesis.stream": "$KINESIS_STREAM_NAME", "kinesis.position": "TRIM_HORIZON" }
Let’s say you ran the example with Kinesis as the source and S3 as the sink, the pipeline would resemble:
Using the Confluent CLI, list all the fully-managed connectors created in this cluster.
confluent connect list
Your output should resemble:
ID | Name | Status | Type | Trace +-----------+---------------------+---------+--------+-------+ lcc-2jrx1 | demo-S3Sink-no-avro | RUNNING | sink | lcc-vnrqp | demo-KinesisSource | RUNNING | source | lcc-5qwrn | demo-S3Sink-avro | RUNNING | sink |
Describe any running connector in more detail, in this case
lcc-vnrqp
which corresponds to the the AWS Kinesis connector.confluent connect describe lcc-vnrqp
Your output should resemble:
Connector Details +--------+--------------------+ | ID | lcc-vnrqp | | Name | demo-KinesisSource | | Status | RUNNING | | Type | source | | Trace | | +--------+--------------------+ Task Level Details TaskId | State +--------+---------+ 0 | RUNNING Configuration Details Config | Value +---------------------+---------------------------------------------------------+ name | demo-KinesisSource kafka.api.key | **************** kafka.api.secret | **************** schema.registry.url | https://psrc-4yovk.us-east-2.aws.confluent.cloud cloud.environment | prod kafka.endpoint | SASL_SSL://pkc-4kgmg.us-west-2.aws.confluent.cloud:9092 kafka.region | us-west-2 kafka.user.id | 73800 kinesis.position | TRIM_HORIZON kinesis.region | us-west-2 kinesis.stream | demo-logs aws.secret.key.id | **************** connector.class | KinesisSource tasks.max | 1 aws.access.key.id | ****************
View these same connectors from the Confluent Cloud Console.
ksqlDB¶
From the Confluent Cloud Console, select your Kafka cluster and click the ksqlDB tab to view the flow through your ksqlDB application:
This flow is the result of this set of ksqlDB statements. It generated a ksqlDB TABLE
COUNT_PER_SOURCE
, formatted as JSON, and its underlying Kafka topic isCOUNT_PER_SOURCE
. It also generated a ksqlDB TABLESUM_PER_SOURCE
, formatted as Avro, and its underlying Kafka topic isSUM_PER_SOURCE
.CREATE STREAM eventlogs (eventSourceIP VARCHAR, eventAction VARCHAR, result VARCHAR, eventDuration BIGINT) WITH (KAFKA_TOPIC='eventlogs', VALUE_FORMAT='JSON'); CREATE TABLE count_per_source WITH (KAFKA_TOPIC='COUNT_PER_SOURCE', PARTITIONS=6) AS SELECT eventSourceIP, COUNT(*) as count FROM eventlogs GROUP BY eventSourceIP EMIT CHANGES; CREATE TABLE sum_per_source WITH (KAFKA_TOPIC='SUM_PER_SOURCE', PARTITIONS=6, VALUE_FORMAT='AVRO') AS SELECT eventSourceIP as ROWKEY, as_value(eventSourceIP) as eventSourceIP, SUM(EVENTDURATION) as sum FROM eventlogs WHERE (RESULT = 'Pass') GROUP BY eventSourceIP EMIT CHANGES;
Use the Confluent Cloud ksqlDB editor or its REST API to interact with the ksqlDB application:
curl -X POST $KSQLDB_ENDPOINT/ksql \ -H "Content-Type: application/vnd.ksql.v1+json; charset=utf-8" \ -u $KSQLDB_BASIC_AUTH_USER_INFO \ -d @<(cat <<EOF { "ksql": "SHOW QUERIES;", "streamsProperties": {} } EOF )
Your output should resemble:
[ { "@type": "queries", "statementText": "SHOW QUERIES;", "queries": [ { "queryString": "CREATE TABLE COUNT_PER_SOURCE WITH (KAFKA_TOPIC='COUNT_PER_SOURCE', PARTITIONS=6, REPLICAS=3) AS SELECT\n EVENTLOGS.EVENTSOURCEIP EVENTSOURCEIP,\n COUNT(*) COUNT\nFROM EVENTLOGS EVENTLOGS\nGROUP BY EVENTLOGS.EVENTSOURCEIP\nEMIT CHANGES;", "sinks": [ "COUNT_PER_SOURCE" ], "sinkKafkaTopics": [ "COUNT_PER_SOURCE" ], "id": "CTAS_COUNT_PER_SOURCE_0", "statusCount": { "RUNNING": 1 }, "queryType": "PERSISTENT", "state": "RUNNING" }, { "queryString": "CREATE TABLE SUM_PER_SOURCE WITH (KAFKA_TOPIC='SUM_PER_SOURCE', PARTITIONS=6, REPLICAS=3, VALUE_FORMAT='AVRO') AS SELECT\n EVENTLOGS.EVENTSOURCEIP ROWKEY,\n AS_VALUE(EVENTLOGS.EVENTSOURCEIP) EVENTSOURCEIP,\n SUM(EVENTLOGS.EVENTDURATION) SUM\nFROM EVENTLOGS EVENTLOGS\nWHERE (EVENTLOGS.RESULT = 'Pass')\nGROUP BY EVENTLOGS.EVENTSOURCEIP\nEMIT CHANGES;", "sinks": [ "SUM_PER_SOURCE" ], "sinkKafkaTopics": [ "SUM_PER_SOURCE" ], "id": "CTAS_SUM_PER_SOURCE_5", "statusCount": { "RUNNING": 1 }, "queryType": "PERSISTENT", "state": "RUNNING" } ], "warnings": [] } ]
View the Avro schema for the topic
SUM_PER_SOURCE
in Confluent Cloud Schema Registry.curl --silent -u <SR API KEY>:<SR API SECRET> https://<SR ENDPOINT>/subjects/SUM_PER_SOURCE-value/versions/latest | jq -r '.schema' | jq .
Your output should resemble:
{ "type": "record", "name": "KsqlDataSourceSchema", "namespace": "io.confluent.ksql.avro_schemas", "fields": [ { "name": "EVENTSOURCEIP", "type": [ "null", "string" ], "default": null }, { "name": "SUM", "type": [ "null", "long" ], "default": null } ] }
View these same queries from the Confluent Cloud Console.
Validate¶
View the data from Kinesis, Kafka, and cloud storage after running the example, running the read-data.sh script.
./read-data.sh stack-configs/java-service-account-<SERVICE ACCOUNT ID>.config
Your output should resemble:
Data from Kinesis stream demo-logs --limit 10: {"eventSourceIP":"192.168.1.1","eventAction":"Upload","result":"Pass","eventDuration":3} {"eventSourceIP":"192.168.1.1","eventAction":"Create","result":"Pass","eventDuration":2} {"eventSourceIP":"192.168.1.1","eventAction":"Delete","result":"Fail","eventDuration":5} {"eventSourceIP":"192.168.1.2","eventAction":"Upload","result":"Pass","eventDuration":1} {"eventSourceIP":"192.168.1.2","eventAction":"Create","result":"Pass","eventDuration":3} {"eventSourceIP":"192.168.1.1","eventAction":"Upload","result":"Pass","eventDuration":3} {"eventSourceIP":"192.168.1.1","eventAction":"Create","result":"Pass","eventDuration":2} {"eventSourceIP":"192.168.1.1","eventAction":"Delete","result":"Fail","eventDuration":5} {"eventSourceIP":"192.168.1.2","eventAction":"Upload","result":"Pass","eventDuration":1} {"eventSourceIP":"192.168.1.2","eventAction":"Create","result":"Pass","eventDuration":3} Data from Kafka topic eventlogs: confluent-v1 local services kafka consume eventlogs --cloud --config stack-configs/java-service-account-<SERVICE ACCOUNT ID>.config --from-beginning --property print.key=true --max-messages 10 5 {"eventSourceIP":"192.168.1.5","eventAction":"Upload","result":"Pass","eventDuration":4} 5 {"eventSourceIP":"192.168.1.5","eventAction":"Create","result":"Pass","eventDuration":1} 5 {"eventSourceIP":"192.168.1.5","eventAction":"Delete","result":"Fail","eventDuration":1} 5 {"eventSourceIP":"192.168.1.5","eventAction":"Upload","result":"Pass","eventDuration":4} 5 {"eventSourceIP":"192.168.1.5","eventAction":"Create","result":"Pass","eventDuration":1} 5 {"eventSourceIP":"192.168.1.5","eventAction":"Delete","result":"Fail","eventDuration":1} 5 {"eventSourceIP":"192.168.1.5","eventAction":"Upload","result":"Pass","eventDuration":4} 5 {"eventSourceIP":"192.168.1.5","eventAction":"Create","result":"Pass","eventDuration":1} 5 {"eventSourceIP":"192.168.1.5","eventAction":"Delete","result":"Fail","eventDuration":1} 5 {"eventSourceIP":"192.168.1.5","eventAction":"Upload","result":"Pass","eventDuration":4} Data from Kafka topic COUNT_PER_SOURCE: confluent-v1 local services kafka consume COUNT_PER_SOURCE --cloud --config stack-configs/java-service-account-<SERVICE ACCOUNT ID>.config --from-beginning --property print.key=true --max-messages 10 192.168.1.5 {"EVENTSOURCEIP":"192.168.1.5","COUNT":1} 192.168.1.5 {"EVENTSOURCEIP":"192.168.1.5","COUNT":2} 192.168.1.5 {"EVENTSOURCEIP":"192.168.1.5","COUNT":3} 192.168.1.5 {"EVENTSOURCEIP":"192.168.1.5","COUNT":4} 192.168.1.5 {"EVENTSOURCEIP":"192.168.1.5","COUNT":5} 192.168.1.5 {"EVENTSOURCEIP":"192.168.1.5","COUNT":6} 192.168.1.5 {"EVENTSOURCEIP":"192.168.1.5","COUNT":7} 192.168.1.5 {"EVENTSOURCEIP":"192.168.1.5","COUNT":8} 192.168.1.5 {"EVENTSOURCEIP":"192.168.1.5","COUNT":9} 192.168.1.5 {"EVENTSOURCEIP":"192.168.1.5","COUNT":10} Data from Kafka topic SUM_PER_SOURCE: confluent-v1 local services kafka consume SUM_PER_SOURCE --cloud --config stack-configs/java-service-account-<SERVICE ACCOUNT ID>.config --from-beginning --property print.key=true --value-format avro --property basic.auth.credentials.source=USER_INFO --property schema.registry.basic.auth.user.info=$SCHEMA_REGISTRY_BASIC_AUTH_USER_INFO --property schema.registry.url=$SCHEMA_REGISTRY_URL --property key.deserializer=org.apache.kafka.common.serialization.StringDeserializer --max-messages 10 192.168.1.2 {"EVENTSOURCEIP":{"string":"192.168.1.2"},"SUM":{"long":1}} 192.168.1.2 {"EVENTSOURCEIP":{"string":"192.168.1.2"},"SUM":{"long":4}} 192.168.1.2 {"EVENTSOURCEIP":{"string":"192.168.1.2"},"SUM":{"long":5}} 192.168.1.2 {"EVENTSOURCEIP":{"string":"192.168.1.2"},"SUM":{"long":8}} 192.168.1.2 {"EVENTSOURCEIP":{"string":"192.168.1.2"},"SUM":{"long":11}} 192.168.1.2 {"EVENTSOURCEIP":{"string":"192.168.1.2"},"SUM":{"long":12}} 192.168.1.2 {"EVENTSOURCEIP":{"string":"192.168.1.2"},"SUM":{"long":15}} 192.168.1.2 {"EVENTSOURCEIP":{"string":"192.168.1.2"},"SUM":{"long":16}} 192.168.1.2 {"EVENTSOURCEIP":{"string":"192.168.1.2"},"SUM":{"long":19}} 192.168.1.2 {"EVENTSOURCEIP":{"string":"192.168.1.2"},"SUM":{"long":22}} Objects in Cloud storage gcs: gs://confluent-cloud-etl-demo/topics/COUNT_PER_SOURCE/year=2020/month=02/day=26/hour=03/COUNT_PER_SOURCE+1+0000000000.bin gs://confluent-cloud-etl-demo/topics/COUNT_PER_SOURCE/year=2020/month=02/day=26/hour=03/COUNT_PER_SOURCE+1+0000001000.bin gs://confluent-cloud-etl-demo/topics/COUNT_PER_SOURCE/year=2020/month=02/day=26/hour=03/COUNT_PER_SOURCE+1+0000002000.bin gs://confluent-cloud-etl-demo/topics/COUNT_PER_SOURCE/year=2020/month=02/day=26/hour=03/COUNT_PER_SOURCE+1+0000003000.bin gs://confluent-cloud-etl-demo/topics/COUNT_PER_SOURCE/year=2020/month=02/day=26/hour=03/COUNT_PER_SOURCE+1+0000004000.bin gs://confluent-cloud-etl-demo/topics/COUNT_PER_SOURCE/year=2020/month=02/day=26/hour=03/COUNT_PER_SOURCE+3+0000000000.bin gs://confluent-cloud-etl-demo/topics/COUNT_PER_SOURCE/year=2020/month=02/day=26/hour=03/COUNT_PER_SOURCE+3+0000001000.bin gs://confluent-cloud-etl-demo/topics/COUNT_PER_SOURCE/year=2020/month=02/day=26/hour=03/COUNT_PER_SOURCE+3+0000002000.bin gs://confluent-cloud-etl-demo/topics/COUNT_PER_SOURCE/year=2020/month=02/day=26/hour=03/COUNT_PER_SOURCE+3+0000003000.bin gs://confluent-cloud-etl-demo/topics/COUNT_PER_SOURCE/year=2020/month=02/day=26/hour=03/COUNT_PER_SOURCE+3+0000004000.bin gs://confluent-cloud-etl-demo/topics/SUM_PER_SOURCE/year=2020/month=02/day=26/hour=03/SUM_PER_SOURCE+1+0000000000.avro gs://confluent-cloud-etl-demo/topics/SUM_PER_SOURCE/year=2020/month=02/day=26/hour=03/SUM_PER_SOURCE+1+0000001000.avro gs://confluent-cloud-etl-demo/topics/SUM_PER_SOURCE/year=2020/month=02/day=26/hour=03/SUM_PER_SOURCE+1+0000002000.avro gs://confluent-cloud-etl-demo/topics/SUM_PER_SOURCE/year=2020/month=02/day=26/hour=03/SUM_PER_SOURCE+1+0000003000.avro gs://confluent-cloud-etl-demo/topics/SUM_PER_SOURCE/year=2020/month=02/day=26/hour=03/SUM_PER_SOURCE+3+0000000000.avro gs://confluent-cloud-etl-demo/topics/SUM_PER_SOURCE/year=2020/month=02/day=26/hour=03/SUM_PER_SOURCE+3+0000001000.avro gs://confluent-cloud-etl-demo/topics/SUM_PER_SOURCE/year=2020/month=02/day=26/hour=03/SUM_PER_SOURCE+3+0000002000.avro gs://confluent-cloud-etl-demo/topics/SUM_PER_SOURCE/year=2020/month=02/day=26/hour=03/SUM_PER_SOURCE+3+0000003000.avro
Add more entries in the source and see them propagate through the pipeline by viewing messages in the Confluent Cloud Console or CLI.
If you are running Kinesis:
./add_entries_kinesis.sh
If you are running RDS PostgreSQL:
./add_entries_rds.sh
View the new messages from the Confluent Cloud Console.
Stop Example¶
Any Confluent Cloud example uses real Confluent Cloud resources. After you are done running a Confluent Cloud example, manually verify that all Confluent Cloud resources are destroyed to avoid unexpected charges.
Details¶
Stop the example and clean up all the resources, delete Kafka topics, delete the fully-managed connectors, delete the data in the cloud storage:
./stop.sh stack-configs/java-service-account-<SERVICE ACCOUNT ID>.config
Always verify that resources in Confluent Cloud have been destroyed.
Additional Resources¶
- To find additional Confluent Cloud examples, see Confluent Cloud Tutorials.