Example: Build an ETL Pipeline With Confluent Cloud

This example showcases an entire end-to-end cloud ETL deployment, built for 100% cloud services:

image

Overview

As enterprises move more and more of their applications to the cloud, they are also moving their on-prem ETL (extract, transform, load) pipelines to the cloud, as well as building new ones. This example showcases a cloud ETL solution leveraging all fully-managed services on Confluent Cloud.

image

There are many powerful use cases for these real-time cloud ETL pipelines, and this example showcases one such use case—a log ingestion pipeline that spans multiple cloud providers. Using the Confluent CLI, the example creates a source connector that reads data from either an AWS Kinesis stream or AWS RDS PostgreSQL database into Confluent Cloud. Then it creates a Confluent Cloud ksqlDB application that processes that data. Finally, a sink connector writes the output data into cloud storage in the provider of your choice (one of GCP GCS, AWS S3, or Azure Blob).

The end result is an event streaming ETL, running 100% in the cloud, spanning multiple cloud providers. This enables you to:

  • Build business applications on a full event streaming platform
  • Span multiple cloud providers (AWS, GCP, Azure) and on-prem datacenters
  • Use Kafka to aggregate data into a single source of truth
  • Harness the power of ksqlDB for stream processing

Tip

For more information about building a cloud ETL pipeline on Confluent Cloud, see this blog post.

Data Flow

The data set is a stream of log messages, which in this example is mock data captured in eventlogs.json. It resembles this:

{"eventSourceIP":"192.168.1.1","eventAction":"Upload","result":"Pass","eventDuration":3}
{"eventSourceIP":"192.168.1.1","eventAction":"Create","result":"Pass","eventDuration":2}
{"eventSourceIP":"192.168.1.1","eventAction":"Delete","result":"Fail","eventDuration":5}
{"eventSourceIP":"192.168.1.2","eventAction":"Upload","result":"Pass","eventDuration":1}
{"eventSourceIP":"192.168.1.2","eventAction":"Create","result":"Pass","eventDuration":3}
Component Consumes From Produces To
Kinesis/PostgreSQL source connector Kinesis stream or RDS PostgreSQL table Kafka topic eventlogs
ksqlDB eventlogs ksqlDB streams and tables
GCS/S3/Blob sink connector ksqlDB tables COUNT_PER_SOURCE, SUM_PER_SOURCE GCS/S3/Blob

Prerequisites

Cloud services

Local Tools

  • Confluent CLI v4.0.0 or later, logged in with the --save argument which saves your Confluent Cloud user login credentials or refresh token (in the case of SSO) to the local .netrc file.
  • gsutil CLI, properly initialized with your credentials: (optional) if destination is GCP GCS
  • aws CLI, properly initialized with your credentials: used for AWS Kinesis or RDS PostgreSQL, and (optional) if destination is AWS S3
  • az CLI, properly initialized with your credentials: (optional) if destination is Azure Blob storage
  • psql: (optional) if source is RDS PostgreSQL
  • jq
  • curl
  • timeout: used by the bash scripts to terminate a consumer process after a certain period of time. timeout is available on most Linux distributions but not on macOS. macOS users can install timeout via brew install coreutils.
  • python

Cost to Run Tutorial

Caution

Confluent Cloud examples that use actual Confluent Cloud resources may be billable. An example may create a new Confluent Cloud environment, Kafka cluster, topics, ACLs, and service accounts, and resources that have hourly charges such as connectors and ksqlDB applications. To avoid unexpected charges, carefully evaluate the cost of resources before you start. After you are done running a Confluent Cloud example, destroy all Confluent Cloud resources to avoid accruing hourly charges for services and verify that they have been deleted.

This example also uses real resources from other cloud providers, including:

  • AWS Kinesis or RDS PostgreSQL
  • Cloud storage providers (one of GCP GCS, AWS S3, or Azure Blob) depending on your configuration

Run Example

Setup

Because this example interacts with real resources in Kinesis or RDS PostgreSQL, a destination storage service, and Confluent Cloud, you must set up some initial parameters to communicate with these services.

  1. This example creates a new Confluent Cloud environment with required resources to run this example. As a reminder, this example uses real Confluent Cloud resources and you may incur charges so carefully evaluate the cost of resources before launching the example.

  2. Clone the confluentinc/examples GitHub repository, and check out the master branch..

    git clone https://github.com/confluentinc/examples
    cd examples
    git checkout master
    
  3. Change directory to the cloud-etl example.

    cd cloud-etl
    
  4. Modify the example configuration file at config/demo.cfg. Set the proper credentials and parameters for the source:

    • AWS Kinesis
      • DATA_SOURCE='kinesis'
      • KINESIS_STREAM_NAME
      • KINESIS_REGION
      • AWS_PROFILE
    • AWS RDS (PostgreSQL)
      • DATA_SOURCE='rds'
      • DB_INSTANCE_IDENTIFIER
      • RDS_REGION
      • AWS_PROFILE
  5. In the same example configuration file at config/demo.cfg, set the required parameters for the destination cloud storage provider:

    • GCP GCS
      • DESTINATION_STORAGE='gcs'
      • GCS_CREDENTIALS_FILE
      • GCS_PROJECT_ID
      • GCS_BUCKET
    • AWS S3
      • DESTINATION_STORAGE='s3'
      • S3_PROFILE
      • S3_BUCKET
    • Azure Blob
      • DESTINATION_STORAGE='az'
      • AZBLOB_STORAGE_ACCOUNT
      • AZBLOB_CONTAINER
  6. Log in to Confluent Cloud with the command confluent login --save, and use your Confluent Cloud username and password. The --save argument saves your Confluent Cloud user login credentials or refresh token (in the case of SSO) to the local netrc file.

    confluent login --save
    

Run

This example uses the ccloud-stack utility for Confluent Cloud to automatically create a stack of fully managed services in Confluent Cloud. By default, the ccloud-stack utility creates resources in a new Confluent Cloud environment in cloud provider aws in region us-west-2. If you want to reuse an existing Confluent Cloud environment, or if aws and us-west-2 are not the target provider and region, you may configure other ccloud-stack options before you run this example.

  1. Run the example. Set the cloud provider and region for your Confluent Cloud cluster when you start the example, so that they match the destination cloud storage provider and region. This will take several minutes to complete as it creates new resources in Confluent Cloud and other cloud providers.

    # Example for running to AWS S3 in us-west-2
    CLUSTER_CLOUD=aws CLUSTER_REGION=us-west-2 ./start.sh
    
    # Example for running to GCP GCS in us-west2
    CLUSTER_CLOUD=gcp CLUSTER_REGION=us-west2 ./start.sh
    
    # Example for running to Azure Blob in westus2
    CLUSTER_CLOUD=azure CLUSTER_REGION=westus2 ./start.sh
    
  2. As part of this script run, the ccloud-stack utility for Confluent Cloud creates a new Confluent Cloud stack of fully-managed resources and also generates a local configuration file with all connection information, cluster IDs, and credentials, which is useful for other demos/automation. View this local configuration file, where SERVICE ACCOUNT ID is auto-generated by the script.

    cat stack-configs/java-service-account-<SERVICE ACCOUNT ID>.config
    

    Your output should resemble:

    # --------------------------------------
    # Confluent Cloud connection information
    # --------------------------------------
    # ENVIRONMENT ID: <ENVIRONMENT ID>
    # SERVICE ACCOUNT ID: <SERVICE ACCOUNT ID>
    # KAFKA CLUSTER ID: <KAFKA CLUSTER ID>
    # SCHEMA REGISTRY CLUSTER ID: <SCHEMA REGISTRY CLUSTER ID>
    # KSQLDB APP ID: <KSQLDB APP ID>
    # --------------------------------------
    security.protocol=SASL_SSL
    sasl.mechanism=PLAIN
    bootstrap.servers=<BROKER ENDPOINT>
    sasl.jaas.config=org.apache.kafka.common.security.plain.PlainLoginModule required username='<API KEY>' password='<API SECRET>';
    basic.auth.credentials.source=USER_INFO
    schema.registry.basic.auth.user.info=<SR API KEY>:<SR API SECRET>
    schema.registry.url=https://<SR ENDPOINT>
    ksql.endpoint=<KSQLDB ENDPOINT>
    ksql.basic.auth.user.info=<KSQLDB API KEY>:<KSQLDB API SECRET>
    
  3. Log into the Confluent Cloud Console .

Connectors

  1. The example automatically created Kafka Connect connectors using the Confluent CLI command confluent connect create that included passing in connector configuration files from the connector configuration directory:

    For example, if you configured the example to source data from Kinesis, it ran this AWS Kinesis connector configuration file.

    {
        "name": "demo-KinesisSource",
        "connector.class": "KinesisSource",
        "tasks.max": "1",
        "kafka.api.key": "$CLOUD_KEY",
        "kafka.api.secret": "$CLOUD_SECRET",
        "aws.access.key.id": "$AWS_ACCESS_KEY_ID",
        "aws.secret.key.id": "$AWS_SECRET_ACCESS_KEY",
        "kafka.topic": "$KAFKA_TOPIC_NAME_IN",
        "kinesis.region": "$KINESIS_REGION",
        "kinesis.stream": "$KINESIS_STREAM_NAME",
        "kinesis.position": "TRIM_HORIZON"
    }
    
  2. Let’s say you ran the example with Kinesis as the source and S3 as the sink, the pipeline would resemble:

    image
  3. Using the Confluent CLI, list all the fully-managed connectors created in this cluster.

    confluent connect list
    

    Your output should resemble:

         ID     |        Name         | Status  |  Type  | Trace
    +-----------+---------------------+---------+--------+-------+
      lcc-2jrx1 | demo-S3Sink-no-avro | RUNNING | sink   |
      lcc-vnrqp | demo-KinesisSource  | RUNNING | source |
      lcc-5qwrn | demo-S3Sink-avro    | RUNNING | sink   |
    
  4. Describe any running connector in more detail, in this case lcc-vnrqp which corresponds to the the AWS Kinesis connector.

    confluent connect describe lcc-vnrqp
    

    Your output should resemble:

    Connector Details
    +--------+--------------------+
    | ID     | lcc-vnrqp          |
    | Name   | demo-KinesisSource |
    | Status | RUNNING            |
    | Type   | source             |
    | Trace  |                    |
    +--------+--------------------+
    
    
    Task Level Details
      TaskId |  State
    +--------+---------+
           0 | RUNNING
    
    
    Configuration Details
            Config        |                          Value
    +---------------------+---------------------------------------------------------+
      name                | demo-KinesisSource
      kafka.api.key       | ****************
      kafka.api.secret    | ****************
      schema.registry.url | https://psrc-4yovk.us-east-2.aws.confluent.cloud
      cloud.environment   | prod
      kafka.endpoint      | SASL_SSL://pkc-4kgmg.us-west-2.aws.confluent.cloud:9092
      kafka.region        | us-west-2
      kafka.user.id       |                                                   73800
      kinesis.position    | TRIM_HORIZON
      kinesis.region      | us-west-2
      kinesis.stream      | demo-logs
      aws.secret.key.id   | ****************
      connector.class     | KinesisSource
      tasks.max           |                                                       1
      aws.access.key.id   | ****************
    
  5. View these same connectors from the Confluent Cloud Console.

    image

ksqlDB

  1. From the Confluent Cloud Console, select your Kafka cluster and click the ksqlDB tab to view the flow through your ksqlDB application:

    image
  2. This flow is the result of this set of ksqlDB statements. It generated a ksqlDB TABLE COUNT_PER_SOURCE, formatted as JSON, and its underlying Kafka topic is COUNT_PER_SOURCE. It also generated a ksqlDB TABLE SUM_PER_SOURCE, formatted as Avro, and its underlying Kafka topic is SUM_PER_SOURCE.

    CREATE STREAM eventlogs (eventSourceIP VARCHAR, eventAction VARCHAR, result VARCHAR, eventDuration BIGINT) WITH (KAFKA_TOPIC='eventlogs', VALUE_FORMAT='JSON');
    CREATE TABLE count_per_source WITH (KAFKA_TOPIC='COUNT_PER_SOURCE', PARTITIONS=6) AS SELECT eventSourceIP, COUNT(*) as count FROM eventlogs GROUP BY eventSourceIP EMIT CHANGES;
    CREATE TABLE sum_per_source WITH (KAFKA_TOPIC='SUM_PER_SOURCE', PARTITIONS=6, VALUE_FORMAT='AVRO') AS SELECT eventSourceIP as ROWKEY, as_value(eventSourceIP) as eventSourceIP, SUM(EVENTDURATION) as sum FROM eventlogs WHERE (RESULT = 'Pass') GROUP BY eventSourceIP EMIT CHANGES;
    
  3. Use the Confluent Cloud ksqlDB editor or its REST API to interact with the ksqlDB application:

    curl -X POST $KSQLDB_ENDPOINT/ksql \
           -H "Content-Type: application/vnd.ksql.v1+json; charset=utf-8" \
           -u $KSQLDB_BASIC_AUTH_USER_INFO \
           -d @<(cat <<EOF
    {
      "ksql": "SHOW QUERIES;",
      "streamsProperties": {}
    }
    EOF
    )
    

    Your output should resemble:

    [
      {
        "@type": "queries",
        "statementText": "SHOW QUERIES;",
        "queries": [
          {
            "queryString": "CREATE TABLE COUNT_PER_SOURCE WITH (KAFKA_TOPIC='COUNT_PER_SOURCE', PARTITIONS=6, REPLICAS=3) AS SELECT\n  EVENTLOGS.EVENTSOURCEIP EVENTSOURCEIP,\n  COUNT(*) COUNT\nFROM EVENTLOGS EVENTLOGS\nGROUP BY EVENTLOGS.EVENTSOURCEIP\nEMIT CHANGES;",
            "sinks": [
              "COUNT_PER_SOURCE"
            ],
            "sinkKafkaTopics": [
              "COUNT_PER_SOURCE"
            ],
            "id": "CTAS_COUNT_PER_SOURCE_0",
            "statusCount": {
              "RUNNING": 1
            },
            "queryType": "PERSISTENT",
            "state": "RUNNING"
          },
          {
            "queryString": "CREATE TABLE SUM_PER_SOURCE WITH (KAFKA_TOPIC='SUM_PER_SOURCE', PARTITIONS=6, REPLICAS=3, VALUE_FORMAT='AVRO') AS SELECT\n  EVENTLOGS.EVENTSOURCEIP ROWKEY,\n  AS_VALUE(EVENTLOGS.EVENTSOURCEIP) EVENTSOURCEIP,\n  SUM(EVENTLOGS.EVENTDURATION) SUM\nFROM EVENTLOGS EVENTLOGS\nWHERE (EVENTLOGS.RESULT = 'Pass')\nGROUP BY EVENTLOGS.EVENTSOURCEIP\nEMIT CHANGES;",
            "sinks": [
              "SUM_PER_SOURCE"
            ],
            "sinkKafkaTopics": [
              "SUM_PER_SOURCE"
            ],
            "id": "CTAS_SUM_PER_SOURCE_5",
            "statusCount": {
              "RUNNING": 1
            },
            "queryType": "PERSISTENT",
            "state": "RUNNING"
          }
        ],
        "warnings": []
      }
    ]
    
  4. View the Avro schema for the topic SUM_PER_SOURCE in Confluent Cloud Schema Registry.

    curl --silent -u <SR API KEY>:<SR API SECRET> https://<SR ENDPOINT>/subjects/SUM_PER_SOURCE-value/versions/latest | jq -r '.schema' | jq .
    

    Your output should resemble:

    {
      "type": "record",
      "name": "KsqlDataSourceSchema",
      "namespace": "io.confluent.ksql.avro_schemas",
      "fields": [
        {
          "name": "EVENTSOURCEIP",
          "type": [
            "null",
            "string"
          ],
          "default": null
        },
        {
          "name": "SUM",
          "type": [
            "null",
            "long"
          ],
          "default": null
        }
      ]
    }
    
  5. View these same queries from the Confluent Cloud Console.

image

Validate

  1. View the data from Kinesis, Kafka, and cloud storage after running the example, running the read-data.sh script.

    ./read-data.sh stack-configs/java-service-account-<SERVICE ACCOUNT ID>.config
    

    Your output should resemble:

    Data from Kinesis stream demo-logs --limit 10:
    {"eventSourceIP":"192.168.1.1","eventAction":"Upload","result":"Pass","eventDuration":3}
    {"eventSourceIP":"192.168.1.1","eventAction":"Create","result":"Pass","eventDuration":2}
    {"eventSourceIP":"192.168.1.1","eventAction":"Delete","result":"Fail","eventDuration":5}
    {"eventSourceIP":"192.168.1.2","eventAction":"Upload","result":"Pass","eventDuration":1}
    {"eventSourceIP":"192.168.1.2","eventAction":"Create","result":"Pass","eventDuration":3}
    {"eventSourceIP":"192.168.1.1","eventAction":"Upload","result":"Pass","eventDuration":3}
    {"eventSourceIP":"192.168.1.1","eventAction":"Create","result":"Pass","eventDuration":2}
    {"eventSourceIP":"192.168.1.1","eventAction":"Delete","result":"Fail","eventDuration":5}
    {"eventSourceIP":"192.168.1.2","eventAction":"Upload","result":"Pass","eventDuration":1}
    {"eventSourceIP":"192.168.1.2","eventAction":"Create","result":"Pass","eventDuration":3}
    
    Data from Kafka topic eventlogs:
    confluent-v1 local services kafka consume eventlogs --cloud --config stack-configs/java-service-account-<SERVICE ACCOUNT ID>.config --from-beginning --property print.key=true --max-messages 10
    5   {"eventSourceIP":"192.168.1.5","eventAction":"Upload","result":"Pass","eventDuration":4}
    5   {"eventSourceIP":"192.168.1.5","eventAction":"Create","result":"Pass","eventDuration":1}
    5   {"eventSourceIP":"192.168.1.5","eventAction":"Delete","result":"Fail","eventDuration":1}
    5   {"eventSourceIP":"192.168.1.5","eventAction":"Upload","result":"Pass","eventDuration":4}
    5   {"eventSourceIP":"192.168.1.5","eventAction":"Create","result":"Pass","eventDuration":1}
    5   {"eventSourceIP":"192.168.1.5","eventAction":"Delete","result":"Fail","eventDuration":1}
    5   {"eventSourceIP":"192.168.1.5","eventAction":"Upload","result":"Pass","eventDuration":4}
    5   {"eventSourceIP":"192.168.1.5","eventAction":"Create","result":"Pass","eventDuration":1}
    5   {"eventSourceIP":"192.168.1.5","eventAction":"Delete","result":"Fail","eventDuration":1}
    5   {"eventSourceIP":"192.168.1.5","eventAction":"Upload","result":"Pass","eventDuration":4}
    
    Data from Kafka topic COUNT_PER_SOURCE:
    confluent-v1 local services kafka consume COUNT_PER_SOURCE --cloud --config stack-configs/java-service-account-<SERVICE ACCOUNT ID>.config --from-beginning --property print.key=true --max-messages 10
    192.168.1.5       {"EVENTSOURCEIP":"192.168.1.5","COUNT":1}
    192.168.1.5       {"EVENTSOURCEIP":"192.168.1.5","COUNT":2}
    192.168.1.5       {"EVENTSOURCEIP":"192.168.1.5","COUNT":3}
    192.168.1.5       {"EVENTSOURCEIP":"192.168.1.5","COUNT":4}
    192.168.1.5       {"EVENTSOURCEIP":"192.168.1.5","COUNT":5}
    192.168.1.5       {"EVENTSOURCEIP":"192.168.1.5","COUNT":6}
    192.168.1.5       {"EVENTSOURCEIP":"192.168.1.5","COUNT":7}
    192.168.1.5       {"EVENTSOURCEIP":"192.168.1.5","COUNT":8}
    192.168.1.5       {"EVENTSOURCEIP":"192.168.1.5","COUNT":9}
    192.168.1.5       {"EVENTSOURCEIP":"192.168.1.5","COUNT":10}
    
    Data from Kafka topic SUM_PER_SOURCE:
    confluent-v1 local services kafka consume SUM_PER_SOURCE --cloud --config stack-configs/java-service-account-<SERVICE ACCOUNT ID>.config --from-beginning --property print.key=true --value-format avro --property basic.auth.credentials.source=USER_INFO --property schema.registry.basic.auth.user.info=$SCHEMA_REGISTRY_BASIC_AUTH_USER_INFO --property schema.registry.url=$SCHEMA_REGISTRY_URL --property key.deserializer=org.apache.kafka.common.serialization.StringDeserializer --max-messages 10
    192.168.1.2       {"EVENTSOURCEIP":{"string":"192.168.1.2"},"SUM":{"long":1}}
    192.168.1.2       {"EVENTSOURCEIP":{"string":"192.168.1.2"},"SUM":{"long":4}}
    192.168.1.2       {"EVENTSOURCEIP":{"string":"192.168.1.2"},"SUM":{"long":5}}
    192.168.1.2       {"EVENTSOURCEIP":{"string":"192.168.1.2"},"SUM":{"long":8}}
    192.168.1.2       {"EVENTSOURCEIP":{"string":"192.168.1.2"},"SUM":{"long":11}}
    192.168.1.2       {"EVENTSOURCEIP":{"string":"192.168.1.2"},"SUM":{"long":12}}
    192.168.1.2       {"EVENTSOURCEIP":{"string":"192.168.1.2"},"SUM":{"long":15}}
    192.168.1.2       {"EVENTSOURCEIP":{"string":"192.168.1.2"},"SUM":{"long":16}}
    192.168.1.2       {"EVENTSOURCEIP":{"string":"192.168.1.2"},"SUM":{"long":19}}
    192.168.1.2       {"EVENTSOURCEIP":{"string":"192.168.1.2"},"SUM":{"long":22}}
    
    Objects in Cloud storage gcs:
    
    gs://confluent-cloud-etl-demo/topics/COUNT_PER_SOURCE/year=2020/month=02/day=26/hour=03/COUNT_PER_SOURCE+1+0000000000.bin
    gs://confluent-cloud-etl-demo/topics/COUNT_PER_SOURCE/year=2020/month=02/day=26/hour=03/COUNT_PER_SOURCE+1+0000001000.bin
    gs://confluent-cloud-etl-demo/topics/COUNT_PER_SOURCE/year=2020/month=02/day=26/hour=03/COUNT_PER_SOURCE+1+0000002000.bin
    gs://confluent-cloud-etl-demo/topics/COUNT_PER_SOURCE/year=2020/month=02/day=26/hour=03/COUNT_PER_SOURCE+1+0000003000.bin
    gs://confluent-cloud-etl-demo/topics/COUNT_PER_SOURCE/year=2020/month=02/day=26/hour=03/COUNT_PER_SOURCE+1+0000004000.bin
    gs://confluent-cloud-etl-demo/topics/COUNT_PER_SOURCE/year=2020/month=02/day=26/hour=03/COUNT_PER_SOURCE+3+0000000000.bin
    gs://confluent-cloud-etl-demo/topics/COUNT_PER_SOURCE/year=2020/month=02/day=26/hour=03/COUNT_PER_SOURCE+3+0000001000.bin
    gs://confluent-cloud-etl-demo/topics/COUNT_PER_SOURCE/year=2020/month=02/day=26/hour=03/COUNT_PER_SOURCE+3+0000002000.bin
    gs://confluent-cloud-etl-demo/topics/COUNT_PER_SOURCE/year=2020/month=02/day=26/hour=03/COUNT_PER_SOURCE+3+0000003000.bin
    gs://confluent-cloud-etl-demo/topics/COUNT_PER_SOURCE/year=2020/month=02/day=26/hour=03/COUNT_PER_SOURCE+3+0000004000.bin
    gs://confluent-cloud-etl-demo/topics/SUM_PER_SOURCE/year=2020/month=02/day=26/hour=03/SUM_PER_SOURCE+1+0000000000.avro
    gs://confluent-cloud-etl-demo/topics/SUM_PER_SOURCE/year=2020/month=02/day=26/hour=03/SUM_PER_SOURCE+1+0000001000.avro
    gs://confluent-cloud-etl-demo/topics/SUM_PER_SOURCE/year=2020/month=02/day=26/hour=03/SUM_PER_SOURCE+1+0000002000.avro
    gs://confluent-cloud-etl-demo/topics/SUM_PER_SOURCE/year=2020/month=02/day=26/hour=03/SUM_PER_SOURCE+1+0000003000.avro
    gs://confluent-cloud-etl-demo/topics/SUM_PER_SOURCE/year=2020/month=02/day=26/hour=03/SUM_PER_SOURCE+3+0000000000.avro
    gs://confluent-cloud-etl-demo/topics/SUM_PER_SOURCE/year=2020/month=02/day=26/hour=03/SUM_PER_SOURCE+3+0000001000.avro
    gs://confluent-cloud-etl-demo/topics/SUM_PER_SOURCE/year=2020/month=02/day=26/hour=03/SUM_PER_SOURCE+3+0000002000.avro
    gs://confluent-cloud-etl-demo/topics/SUM_PER_SOURCE/year=2020/month=02/day=26/hour=03/SUM_PER_SOURCE+3+0000003000.avro
    
  2. Add more entries in the source and see them propagate through the pipeline by viewing messages in the Confluent Cloud Console or CLI.

    If you are running Kinesis:

    ./add_entries_kinesis.sh
    

    If you are running RDS PostgreSQL:

    ./add_entries_rds.sh
    
  3. View the new messages from the Confluent Cloud Console.

image

Stop Example

Any Confluent Cloud example uses real Confluent Cloud resources. After you are done running a Confluent Cloud example, manually verify that all Confluent Cloud resources are destroyed to avoid unexpected charges.

Details

  1. Stop the example and clean up all the resources, delete Kafka topics, delete the fully-managed connectors, delete the data in the cloud storage:

    ./stop.sh stack-configs/java-service-account-<SERVICE ACCOUNT ID>.config
    
  2. Always verify that resources in Confluent Cloud have been destroyed.

Additional Resources