Cloud ETL Demo

This demo showcases a cloud ETL solution leveraging all fully-managed services on Confluent Cloud. Using Confluent Cloud CLI, the demo creates a source connector that reads data from an AWS Kinesis stream into Confluent Cloud, then a Confluent Cloud KSQL application processes that data, and then a sink connector writes the output data into cloud storage in the provider of your choice (one of GCP GCS, AWS S3, or Azure Blob).

image

The end result is an event streaming ETL, running 100% in the cloud, spanning multiple cloud providers. This enables you to:

  • Build business applications on a full event streaming platform
  • Span multiple cloud providers (AWS, GCP, Azure) and on-prem datacenters
  • Use Kafka to aggregate data in single source of truth
  • Harness the power of KSQL for stream processing

End-to-end Streaming ETL

This demo showcases an entire end-to-end cloud ETL deployment, built for 100% cloud services:

Data Flow

The data set is a stream of log messages, which in this demo is mock data captured in eventLogs.json.

Component Consumes From Produces To
Kinesis source connector Kinesis stream demo-logs Kafka topic eventLogs
KSQL eventLogs KSQL streams and tables
GCS/S3/Blob sink connector KSQL tables COUNT_PER_SOURCE, SUM_PER_SOURCE GCS/S3/Blob

Warning

This demo uses real cloud resources, including that of Confluent Cloud, AWS Kinesis, and one of the cloud storage providers. To avoid unexpected charges, carefully evaluate the cost of resources before launching the demo and ensure all resources are destroyed after you are done running it.

Prerequisites

Cloud services

Local Tools

  • Confluent Cloud CLI v0.239.0 or later
  • gsutil CLI, properly initialized with your credentials: (optional) if destination is GPC GCS
  • aws CLI, properly initialized with your credentials: used for AWS Kinesis and (optional) if destination is AWS S3
  • az CLI, properly initialized with your credentials: (optional) if destination is Azure Blob storage
  • jq
  • curl
  • timeout
  • python
  • Download Confluent Platform 5.4.2: for more advanced Confluent CLI functionality (optional)

Run the Demo

Setup

Because this demo interacts with real resources in Kinesis, a destination storage service, and Confluent Cloud, you must set up some initial parameters to communicate with these services.

  1. By default, the demo reads the configuration parameters for your Confluent Cloud environment from a file at $HOME/.ccloud/config. You can change this filename via the parameter CONFIG_FILE in config/demo.cfg. Enter the configuration parameters for your Confluent Cloud cluster, replacing the values in <...> below particular for your Confluent Cloud environment:

    $ cat $HOME/.ccloud/config
    bootstrap.servers=<BROKER ENDPOINT>
    ssl.endpoint.identification.algorithm=https
    security.protocol=SASL_SSL
    sasl.mechanism=PLAIN
    sasl.jaas.config=org.apache.kafka.common.security.plain.PlainLoginModule required username\="<API KEY>" password\="<API SECRET>";
    schema.registry.url=https://<SR ENDPOINT>
    schema.registry.basic.auth.user.info=<SR API KEY>:<SR API SECRET>
    basic.auth.credentials.source=USER_INFO
    ksql.endpoint=https://<KSQL ENDPOINT>
    ksql.basic.auth.user.info=<KSQL API KEY>:<KSQL API SECRET>
    

    To retrieve the values for the endpoints and credentials in the file above, find them using either the Confluent Cloud UI or Confluent Cloud CLI commands. If you have multiple Confluent Cloud clusters, make sure to use the one with the associated KSQL cluster. The commands below demonstrate how to retrieve the values using the Confluent Cloud CLI.

    # Login
    ccloud login --url https://confluent.cloud
    
    # BROKER ENDPOINT
    ccloud kafka cluster list
    ccloud kafka cluster use
    ccloud kafka cluster describe
    
    # SR ENDPOINT
    ccloud schema-registry cluster describe
    
    # KSQL ENDPOINT
    ccloud ksql app list
    
    # Credentials: API key and secret, one for each resource above
    ccloud api-key create
    
  2. Clone the examples GitHub repository and check out the 5.4.2-post branch.

    git clone https://github.com/confluentinc/examples
    cd examples
    git checkout 5.4.2-post
    
  3. Change directory to the Cloud ETL demo at cloud-etl:

    cd cloud-etl
    
  4. Modify the demo configuration file at config/demo.cfg. Set the proper credentials and parameters for the source, e.g. AWS Kinesis. Also set the required parameters for the respective destination cloud storage provider:

    • GCP GCS
      • DESTINATION_STORAGE='gcs'
      • GCS_CREDENTIALS_FILE
      • GCS_BUCKET
    • AWS S3
      • DESTINATION_STORAGE='s3'
      • S3_PROFILE
      • S3_BUCKET
    • Azure Blob
      • DESTINATION_STORAGE='az'
      • AZBLOB_STORAGE_ACCOUNT
      • AZBLOB_CONTAINER

Run

  1. Log in to Confluent Cloud with the command ccloud login, and use your Confluent Cloud username and password.

    ccloud login --url https://confluent.cloud
    
  2. Run the demo. It takes approximately 7 minutes to run.

    ./start.sh
    

Validate

  1. Using the Confluent Cloud CLI, list all the fully-managed connectors created in this cluster.

    ccloud connector list
    

    Sample output:

         ID     |         Name         | Status  |  Type
    +-----------+----------------------+---------+--------+
      lcc-knjgv | demo-KinesisSource   | RUNNING | source
      lcc-nwkxv | demo-GcsSink-avro    | RUNNING | sink
      lcc-3r7w2 | demo-GcsSink-no-avro | RUNNING | sink
    
  2. The demo automatically created these connectors using the Confluent Cloud CLI command ccloud connector create that included passing in a configuration file from the connector configuration directory. Describe any connector in more detail.

    ccloud connector describe lcc-knjgv
    

    Sample output:

    Connector Details
    +--------+--------------------+
    | ID     | lcc-knjgv          |
    | Name   | demo-KinesisSource |
    | Status | RUNNING            |
    | Type   | source             |
    +--------+--------------------+
    
    
    Task Level Details
      Task_ID |  State
    +---------+---------+
            0 | RUNNING
    
    
    Configuration Details
         Configuration    |                          Value
    +---------------------+---------------------------------------------------------+
      schema.registry.url | https://psrc-lz3xz.us-central1.gcp.confluent.cloud
      value.converter     | io.confluent.connect.replicator.util.ByteArrayConverter
      aws.access.key.id   | ****************
      kafka.region        | us-west2
      tasks.max           |                                                       1
      aws.secret.key.id   | ****************
      kafka.topic         | eventLogs
      kafka.api.key       | ****************
      kinesis.position    | TRIM_HORIZON
      kinesis.region      | us-west-2
      kinesis.stream      | demo-logs
      cloud.environment   | prod
      connector.class     | KinesisSource
      key.converter       | org.apache.kafka.connect.storage.StringConverter
      name                | demo-KinesisSource
      kafka.api.secret    | ****************
      kafka.endpoint      | SASL_SSL://pkc-4r087.us-west2.gcp.confluent.cloud:9092
    
  3. From the Confluent Cloud UI, select your Kafka cluster and click the KSQL tab to view the flow through your KSQL application:

    image
  4. The demo’s KSQL commands used KSQL’s REST API to generate a KSQL TABLE COUNT_PER_SOURCE, formatted as JSON, and its underlying Kafka topic is COUNT_PER_SOURCE. It also generated a KSQL TABLE SUM_PER_SOURCE, formatted as Avro, and its underlying Kafka topic is SUM_PER_SOURCE. Use the Confluent Cloud KSQL UI or its REST API to interact with the KSQL application:

    curl -X POST $KSQL_ENDPOINT/ksql \
           -H "Content-Type: application/vnd.ksql.v1+json; charset=utf-8" \
           -u $KSQL_BASIC_AUTH_USER_INFO \
           -d @<(cat <<EOF
    {
      "ksql": "SHOW QUERIES;",
      "streamsProperties": {}
    }
    EOF
    )
    

    Sample output:

    [
      {
        "@type": "queries",
        "statementText": "SHOW QUERIES;",
        "queries": [
          {
            "sinks": [
              "COUNT_PER_SOURCE"
            ],
            "id": "CTAS_COUNT_PER_SOURCE_210",
            "queryString": "CREATE TABLE COUNT_PER_SOURCE WITH (KAFKA_TOPIC='COUNT_PER_SOURCE', PARTITIONS=6, REPLICAS=3) AS SELECT\n  EVENTLOGS.EVENTSOURCEIP \"EVENTSOURCEIP\",\n  COUNT(*) \"COUNT\"\nFROM EVENTLOGS EVENTLOGS\nGROUP BY EVENTLOGS.EVENTSOURCEIP\nEMIT CHANGES;"
          },
          {
            "sinks": [
              "SUM_PER_SOURCE"
            ],
            "id": "CTAS_SUM_PER_SOURCE_211",
            "queryString": "CREATE TABLE SUM_PER_SOURCE WITH (KAFKA_TOPIC='SUM_PER_SOURCE', PARTITIONS=6, REPLICAS=3, VALUE_FORMAT='AVRO') AS SELECT\n  EVENTLOGS.EVENTSOURCEIP \"EVENTSOURCEIP\",\n  SUM(EVENTLOGS.EVENTDURATION) \"SUM\"\nFROM EVENTLOGS EVENTLOGS\nWHERE (EVENTLOGS.RESULT = 'Pass')\nGROUP BY EVENTLOGS.EVENTSOURCEIP\nEMIT CHANGES;"
          }
        ],
        "warnings": []
      }
    ]
    
  5. View the Avro schema for the topic SUM_PER_SOURCE in Confluent Cloud Schema Registry.

    curl --silent -u <SR API KEY>:<SR API SECRET> https://<SR ENDPOINT>/subjects/SUM_PER_SOURCE-value/versions/latest | jq -r '.schema' | jq .
    

    Sample output:

    {
      "type": "record",
      "name": "KsqlDataSourceSchema",
      "namespace": "io.confluent.ksql.avro_schemas",
      "fields": [
        {
          "name": "EVENTSOURCEIP",
          "type": [
            "null",
            "string"
          ],
          "default": null
        },
        {
          "name": "SUM",
          "type": [
            "null",
            "long"
          ],
          "default": null
        }
      ]
    }
    
  6. View the data from Kinesis, Kafka, and cloud storage after running the demo. Sample output shown below:

    ./read-data.sh
    

    Sample output:

    Data from Kinesis stream demo-logs --limit 10:
    {"eventSourceIP":"192.168.1.1","eventAction":"Upload","result":"Pass","eventDuration":3}
    {"eventSourceIP":"192.168.1.1","eventAction":"Create","result":"Pass","eventDuration":2}
    {"eventSourceIP":"192.168.1.1","eventAction":"Delete","result":"Fail","eventDuration":5}
    {"eventSourceIP":"192.168.1.2","eventAction":"Upload","result":"Pass","eventDuration":1}
    {"eventSourceIP":"192.168.1.2","eventAction":"Create","result":"Pass","eventDuration":3}
    {"eventSourceIP":"192.168.1.1","eventAction":"Upload","result":"Pass","eventDuration":3}
    {"eventSourceIP":"192.168.1.1","eventAction":"Create","result":"Pass","eventDuration":2}
    {"eventSourceIP":"192.168.1.1","eventAction":"Delete","result":"Fail","eventDuration":5}
    {"eventSourceIP":"192.168.1.2","eventAction":"Upload","result":"Pass","eventDuration":1}
    {"eventSourceIP":"192.168.1.2","eventAction":"Create","result":"Pass","eventDuration":3}
    
    Data from Kafka topic eventLogs:
    confluent local consume eventLogs -- --cloud --from-beginning --property print.key=true --max-messages 10
    5   {"eventSourceIP":"192.168.1.5","eventAction":"Upload","result":"Pass","eventDuration":4}
    5   {"eventSourceIP":"192.168.1.5","eventAction":"Create","result":"Pass","eventDuration":1}
    5   {"eventSourceIP":"192.168.1.5","eventAction":"Delete","result":"Fail","eventDuration":1}
    5   {"eventSourceIP":"192.168.1.5","eventAction":"Upload","result":"Pass","eventDuration":4}
    5   {"eventSourceIP":"192.168.1.5","eventAction":"Create","result":"Pass","eventDuration":1}
    5   {"eventSourceIP":"192.168.1.5","eventAction":"Delete","result":"Fail","eventDuration":1}
    5   {"eventSourceIP":"192.168.1.5","eventAction":"Upload","result":"Pass","eventDuration":4}
    5   {"eventSourceIP":"192.168.1.5","eventAction":"Create","result":"Pass","eventDuration":1}
    5   {"eventSourceIP":"192.168.1.5","eventAction":"Delete","result":"Fail","eventDuration":1}
    5   {"eventSourceIP":"192.168.1.5","eventAction":"Upload","result":"Pass","eventDuration":4}
    
    Data from Kafka topic COUNT_PER_SOURCE:
    confluent local consume COUNT_PER_SOURCE -- --cloud --from-beginning --property print.key=true --max-messages 10
    192.168.1.5       {"EVENTSOURCEIP":"192.168.1.5","COUNT":1}
    192.168.1.5       {"EVENTSOURCEIP":"192.168.1.5","COUNT":2}
    192.168.1.5       {"EVENTSOURCEIP":"192.168.1.5","COUNT":3}
    192.168.1.5       {"EVENTSOURCEIP":"192.168.1.5","COUNT":4}
    192.168.1.5       {"EVENTSOURCEIP":"192.168.1.5","COUNT":5}
    192.168.1.5       {"EVENTSOURCEIP":"192.168.1.5","COUNT":6}
    192.168.1.5       {"EVENTSOURCEIP":"192.168.1.5","COUNT":7}
    192.168.1.5       {"EVENTSOURCEIP":"192.168.1.5","COUNT":8}
    192.168.1.5       {"EVENTSOURCEIP":"192.168.1.5","COUNT":9}
    192.168.1.5       {"EVENTSOURCEIP":"192.168.1.5","COUNT":10}
    
    Data from Kafka topic SUM_PER_SOURCE:
    confluent local consume SUM_PER_SOURCE -- --cloud --from-beginning --property print.key=true --value-format avro --property basic.auth.credentials.source=USER_INFO --property schema.registry.basic.auth.user.info=$SCHEMA_REGISTRY_BASIC_AUTH_USER_INFO --property schema.registry.url=$SCHEMA_REGISTRY_URL --property key.deserializer=org.apache.kafka.common.serialization.StringDeserializer --max-messages 10
    192.168.1.2       {"EVENTSOURCEIP":{"string":"192.168.1.2"},"SUM":{"long":1}}
    192.168.1.2       {"EVENTSOURCEIP":{"string":"192.168.1.2"},"SUM":{"long":4}}
    192.168.1.2       {"EVENTSOURCEIP":{"string":"192.168.1.2"},"SUM":{"long":5}}
    192.168.1.2       {"EVENTSOURCEIP":{"string":"192.168.1.2"},"SUM":{"long":8}}
    192.168.1.2       {"EVENTSOURCEIP":{"string":"192.168.1.2"},"SUM":{"long":11}}
    192.168.1.2       {"EVENTSOURCEIP":{"string":"192.168.1.2"},"SUM":{"long":12}}
    192.168.1.2       {"EVENTSOURCEIP":{"string":"192.168.1.2"},"SUM":{"long":15}}
    192.168.1.2       {"EVENTSOURCEIP":{"string":"192.168.1.2"},"SUM":{"long":16}}
    192.168.1.2       {"EVENTSOURCEIP":{"string":"192.168.1.2"},"SUM":{"long":19}}
    192.168.1.2       {"EVENTSOURCEIP":{"string":"192.168.1.2"},"SUM":{"long":22}}
    
    Objects in Cloud storage gcs:
    
    gs://confluent-cloud-demo/topics/COUNT_PER_SOURCE/year=2020/month=02/day=26/hour=03/COUNT_PER_SOURCE+1+0000000000.bin
    gs://confluent-cloud-demo/topics/COUNT_PER_SOURCE/year=2020/month=02/day=26/hour=03/COUNT_PER_SOURCE+1+0000001000.bin
    gs://confluent-cloud-demo/topics/COUNT_PER_SOURCE/year=2020/month=02/day=26/hour=03/COUNT_PER_SOURCE+1+0000002000.bin
    gs://confluent-cloud-demo/topics/COUNT_PER_SOURCE/year=2020/month=02/day=26/hour=03/COUNT_PER_SOURCE+1+0000003000.bin
    gs://confluent-cloud-demo/topics/COUNT_PER_SOURCE/year=2020/month=02/day=26/hour=03/COUNT_PER_SOURCE+1+0000004000.bin
    gs://confluent-cloud-demo/topics/COUNT_PER_SOURCE/year=2020/month=02/day=26/hour=03/COUNT_PER_SOURCE+3+0000000000.bin
    gs://confluent-cloud-demo/topics/COUNT_PER_SOURCE/year=2020/month=02/day=26/hour=03/COUNT_PER_SOURCE+3+0000001000.bin
    gs://confluent-cloud-demo/topics/COUNT_PER_SOURCE/year=2020/month=02/day=26/hour=03/COUNT_PER_SOURCE+3+0000002000.bin
    gs://confluent-cloud-demo/topics/COUNT_PER_SOURCE/year=2020/month=02/day=26/hour=03/COUNT_PER_SOURCE+3+0000003000.bin
    gs://confluent-cloud-demo/topics/COUNT_PER_SOURCE/year=2020/month=02/day=26/hour=03/COUNT_PER_SOURCE+3+0000004000.bin
    gs://confluent-cloud-demo/topics/SUM_PER_SOURCE/year=2020/month=02/day=26/hour=03/SUM_PER_SOURCE+1+0000000000.avro
    gs://confluent-cloud-demo/topics/SUM_PER_SOURCE/year=2020/month=02/day=26/hour=03/SUM_PER_SOURCE+1+0000001000.avro
    gs://confluent-cloud-demo/topics/SUM_PER_SOURCE/year=2020/month=02/day=26/hour=03/SUM_PER_SOURCE+1+0000002000.avro
    gs://confluent-cloud-demo/topics/SUM_PER_SOURCE/year=2020/month=02/day=26/hour=03/SUM_PER_SOURCE+1+0000003000.avro
    gs://confluent-cloud-demo/topics/SUM_PER_SOURCE/year=2020/month=02/day=26/hour=03/SUM_PER_SOURCE+3+0000000000.avro
    gs://confluent-cloud-demo/topics/SUM_PER_SOURCE/year=2020/month=02/day=26/hour=03/SUM_PER_SOURCE+3+0000001000.avro
    gs://confluent-cloud-demo/topics/SUM_PER_SOURCE/year=2020/month=02/day=26/hour=03/SUM_PER_SOURCE+3+0000002000.avro
    gs://confluent-cloud-demo/topics/SUM_PER_SOURCE/year=2020/month=02/day=26/hour=03/SUM_PER_SOURCE+3+0000003000.avro
    

Stop

  1. Stop the demo and clean up all the resources, delete Kafka topics, delete the fully-managed connectors, delete the data in the cloud storage:

    ./stop.sh