Important
You are viewing documentation for an older version of Confluent Platform. For the latest, click here.
Cloud ETL Demo¶
This demo showcases a cloud ETL solution leveraging all fully-managed services on Confluent Cloud. Using Confluent Cloud CLI, the demo creates a source connector that reads data from an AWS Kinesis stream into Confluent Cloud, then a Confluent Cloud KSQL application processes that data, and then a sink connector writes the output data into cloud storage in the provider of your choice (one of GCP GCS, AWS S3, or Azure Blob).
The end result is an event streaming ETL, running 100% in the cloud, spanning multiple cloud providers. This enables you to:
- Build business applications on a full event streaming platform
- Span multiple cloud providers (AWS, GCP, Azure) and on-prem datacenters
- Use Kafka to aggregate data in single source of truth
- Harness the power of KSQL for stream processing
End-to-end Streaming ETL¶
This demo showcases an entire end-to-end cloud ETL deployment, built for 100% cloud services:
- Kinesis source connector: reads from a Kinesis stream and writes the data to a Kafka topic in Confluent Cloud
- KSQL: streaming SQL engine that enables real-time data processing against Kafka
- Cloud storage sink connector: writes data from Kafka topics to cloud storage, one of:
- Confluent Cloud Schema Registry: for centralized management of schemas and checks compatibility as schemas evolve
Data Flow¶
The data set is a stream of log messages, which in this demo is mock data captured in eventLogs.json.
Component | Consumes From | Produces To |
---|---|---|
Kinesis source connector | Kinesis stream
demo-logs |
Kafka topic
eventLogs |
KSQL | eventLogs |
KSQL streams and tables |
GCS/S3/Blob sink connector | KSQL tables
COUNT_PER_SOURCE ,
SUM_PER_SOURCE |
GCS/S3/Blob |
Warning¶
This demo uses real cloud resources, including that of Confluent Cloud, AWS Kinesis, and one of the cloud storage providers. To avoid unexpected charges, carefully evaluate the cost of resources before launching the demo and ensure all resources are destroyed after you are done running it.
Prerequisites¶
Cloud services¶
- Confluent Cloud cluster: for development only. Do not use a production cluster.
- Confluent Cloud KSQL provisioned in your Confluent Cloud
- AWS or GCP or Azure access
Local Tools¶
- Confluent Cloud CLI v0.239.0 or later
gsutil
CLI, properly initialized with your credentials: (optional) if destination is GPC GCSaws
CLI, properly initialized with your credentials: used for AWS Kinesis and (optional) if destination is AWS S3az
CLI, properly initialized with your credentials: (optional) if destination is Azure Blob storagejq
curl
timeout
python
- Download Confluent Platform 5.4.11: for more advanced Confluent CLI functionality (optional)
Run the Demo¶
Setup¶
Because this demo interacts with real resources in Kinesis, a destination storage service, and Confluent Cloud, you must set up some initial parameters to communicate with these services.
By default, the demo reads the configuration parameters for your Confluent Cloud environment from a file at
$HOME/.ccloud/config
. You can change this filename via the parameterCONFIG_FILE
in config/demo.cfg. Enter the configuration parameters for your Confluent Cloud cluster, replacing the values in<...>
below particular for your Confluent Cloud environment:$ cat $HOME/.ccloud/config bootstrap.servers=<BROKER ENDPOINT> ssl.endpoint.identification.algorithm=https security.protocol=SASL_SSL sasl.mechanism=PLAIN sasl.jaas.config=org.apache.kafka.common.security.plain.PlainLoginModule required username\="<API KEY>" password\="<API SECRET>"; schema.registry.url=https://<SR ENDPOINT> schema.registry.basic.auth.user.info=<SR API KEY>:<SR API SECRET> basic.auth.credentials.source=USER_INFO ksql.endpoint=https://<KSQL ENDPOINT> ksql.basic.auth.user.info=<KSQL API KEY>:<KSQL API SECRET>
To retrieve the values for the endpoints and credentials in the file above, find them using either the Confluent Cloud UI or Confluent Cloud CLI commands. If you have multiple Confluent Cloud clusters, make sure to use the one with the associated KSQL cluster. The commands below demonstrate how to retrieve the values using the Confluent Cloud CLI.
# Login ccloud login --url https://confluent.cloud # BROKER ENDPOINT ccloud kafka cluster list ccloud kafka cluster use ccloud kafka cluster describe # SR ENDPOINT ccloud schema-registry cluster describe # KSQL ENDPOINT ccloud ksql app list # Credentials: API key and secret, one for each resource above ccloud api-key create
Clone the examples GitHub repository and check out the
5.4.11-post
branch.git clone https://github.com/confluentinc/examples cd examples git checkout 5.4.11-post
Change directory to the Cloud ETL demo at
cloud-etl
:cd cloud-etl
Modify the demo configuration file at config/demo.cfg. Set the proper credentials and parameters for the source, e.g. AWS Kinesis. Also set the required parameters for the respective destination cloud storage provider:
- GCP GCS
DESTINATION_STORAGE='gcs'
GCS_CREDENTIALS_FILE
GCS_BUCKET
- AWS S3
DESTINATION_STORAGE='s3'
S3_PROFILE
S3_BUCKET
- Azure Blob
DESTINATION_STORAGE='az'
AZBLOB_STORAGE_ACCOUNT
AZBLOB_CONTAINER
- GCP GCS
Run¶
Log in to Confluent Cloud with the command
ccloud login
, and use your Confluent Cloud username and password.ccloud login --url https://confluent.cloud
Run the demo. It takes approximately 7 minutes to run.
./start.sh
Validate¶
Using the Confluent Cloud CLI, list all the fully-managed connectors created in this cluster.
ccloud connector list
Sample output:
ID | Name | Status | Type +-----------+----------------------+---------+--------+ lcc-knjgv | demo-KinesisSource | RUNNING | source lcc-nwkxv | demo-GcsSink-avro | RUNNING | sink lcc-3r7w2 | demo-GcsSink-no-avro | RUNNING | sink
The demo automatically created these connectors using the Confluent Cloud CLI command
ccloud connector create
that included passing in a configuration file from the connector configuration directory. Describe any connector in more detail.ccloud connector describe lcc-knjgv
Sample output:
Connector Details +--------+--------------------+ | ID | lcc-knjgv | | Name | demo-KinesisSource | | Status | RUNNING | | Type | source | +--------+--------------------+ Task Level Details Task_ID | State +---------+---------+ 0 | RUNNING Configuration Details Configuration | Value +---------------------+---------------------------------------------------------+ schema.registry.url | https://psrc-lz3xz.us-central1.gcp.confluent.cloud value.converter | io.confluent.connect.replicator.util.ByteArrayConverter aws.access.key.id | **************** kafka.region | us-west2 tasks.max | 1 aws.secret.key.id | **************** kafka.topic | eventLogs kafka.api.key | **************** kinesis.position | TRIM_HORIZON kinesis.region | us-west-2 kinesis.stream | demo-logs cloud.environment | prod connector.class | KinesisSource key.converter | org.apache.kafka.connect.storage.StringConverter name | demo-KinesisSource kafka.api.secret | **************** kafka.endpoint | SASL_SSL://pkc-4r087.us-west2.gcp.confluent.cloud:9092
From the Confluent Cloud UI, select your Kafka cluster and click the KSQL tab to view the flow through your KSQL application:
The demo’s KSQL commands used KSQL’s REST API to generate a KSQL TABLE
COUNT_PER_SOURCE
, formatted as JSON, and its underlying Kafka topic isCOUNT_PER_SOURCE
. It also generated a KSQL TABLESUM_PER_SOURCE
, formatted as Avro, and its underlying Kafka topic isSUM_PER_SOURCE
. Use the Confluent Cloud KSQL UI or its REST API to interact with the KSQL application:curl -X POST $KSQL_ENDPOINT/ksql \ -H "Content-Type: application/vnd.ksql.v1+json; charset=utf-8" \ -u $KSQL_BASIC_AUTH_USER_INFO \ -d @<(cat <<EOF { "ksql": "SHOW QUERIES;", "streamsProperties": {} } EOF )
Sample output:
[ { "@type": "queries", "statementText": "SHOW QUERIES;", "queries": [ { "sinks": [ "COUNT_PER_SOURCE" ], "id": "CTAS_COUNT_PER_SOURCE_210", "queryString": "CREATE TABLE COUNT_PER_SOURCE WITH (KAFKA_TOPIC='COUNT_PER_SOURCE', PARTITIONS=6, REPLICAS=3) AS SELECT\n EVENTLOGS.EVENTSOURCEIP \"EVENTSOURCEIP\",\n COUNT(*) \"COUNT\"\nFROM EVENTLOGS EVENTLOGS\nGROUP BY EVENTLOGS.EVENTSOURCEIP\nEMIT CHANGES;" }, { "sinks": [ "SUM_PER_SOURCE" ], "id": "CTAS_SUM_PER_SOURCE_211", "queryString": "CREATE TABLE SUM_PER_SOURCE WITH (KAFKA_TOPIC='SUM_PER_SOURCE', PARTITIONS=6, REPLICAS=3, VALUE_FORMAT='AVRO') AS SELECT\n EVENTLOGS.EVENTSOURCEIP \"EVENTSOURCEIP\",\n SUM(EVENTLOGS.EVENTDURATION) \"SUM\"\nFROM EVENTLOGS EVENTLOGS\nWHERE (EVENTLOGS.RESULT = 'Pass')\nGROUP BY EVENTLOGS.EVENTSOURCEIP\nEMIT CHANGES;" } ], "warnings": [] } ]
View the Avro schema for the topic
SUM_PER_SOURCE
in Confluent Cloud Schema Registry.curl --silent -u <SR API KEY>:<SR API SECRET> https://<SR ENDPOINT>/subjects/SUM_PER_SOURCE-value/versions/latest | jq -r '.schema' | jq .
Sample output:
{ "type": "record", "name": "KsqlDataSourceSchema", "namespace": "io.confluent.ksql.avro_schemas", "fields": [ { "name": "EVENTSOURCEIP", "type": [ "null", "string" ], "default": null }, { "name": "SUM", "type": [ "null", "long" ], "default": null } ] }
View the data from Kinesis, Kafka, and cloud storage after running the demo. Sample output shown below:
./read-data.sh
Sample output:
Data from Kinesis stream demo-logs --limit 10: {"eventSourceIP":"192.168.1.1","eventAction":"Upload","result":"Pass","eventDuration":3} {"eventSourceIP":"192.168.1.1","eventAction":"Create","result":"Pass","eventDuration":2} {"eventSourceIP":"192.168.1.1","eventAction":"Delete","result":"Fail","eventDuration":5} {"eventSourceIP":"192.168.1.2","eventAction":"Upload","result":"Pass","eventDuration":1} {"eventSourceIP":"192.168.1.2","eventAction":"Create","result":"Pass","eventDuration":3} {"eventSourceIP":"192.168.1.1","eventAction":"Upload","result":"Pass","eventDuration":3} {"eventSourceIP":"192.168.1.1","eventAction":"Create","result":"Pass","eventDuration":2} {"eventSourceIP":"192.168.1.1","eventAction":"Delete","result":"Fail","eventDuration":5} {"eventSourceIP":"192.168.1.2","eventAction":"Upload","result":"Pass","eventDuration":1} {"eventSourceIP":"192.168.1.2","eventAction":"Create","result":"Pass","eventDuration":3} Data from Kafka topic eventLogs: confluent local consume eventLogs -- --cloud --from-beginning --property print.key=true --max-messages 10 5 {"eventSourceIP":"192.168.1.5","eventAction":"Upload","result":"Pass","eventDuration":4} 5 {"eventSourceIP":"192.168.1.5","eventAction":"Create","result":"Pass","eventDuration":1} 5 {"eventSourceIP":"192.168.1.5","eventAction":"Delete","result":"Fail","eventDuration":1} 5 {"eventSourceIP":"192.168.1.5","eventAction":"Upload","result":"Pass","eventDuration":4} 5 {"eventSourceIP":"192.168.1.5","eventAction":"Create","result":"Pass","eventDuration":1} 5 {"eventSourceIP":"192.168.1.5","eventAction":"Delete","result":"Fail","eventDuration":1} 5 {"eventSourceIP":"192.168.1.5","eventAction":"Upload","result":"Pass","eventDuration":4} 5 {"eventSourceIP":"192.168.1.5","eventAction":"Create","result":"Pass","eventDuration":1} 5 {"eventSourceIP":"192.168.1.5","eventAction":"Delete","result":"Fail","eventDuration":1} 5 {"eventSourceIP":"192.168.1.5","eventAction":"Upload","result":"Pass","eventDuration":4} Data from Kafka topic COUNT_PER_SOURCE: confluent local consume COUNT_PER_SOURCE -- --cloud --from-beginning --property print.key=true --max-messages 10 192.168.1.5 {"EVENTSOURCEIP":"192.168.1.5","COUNT":1} 192.168.1.5 {"EVENTSOURCEIP":"192.168.1.5","COUNT":2} 192.168.1.5 {"EVENTSOURCEIP":"192.168.1.5","COUNT":3} 192.168.1.5 {"EVENTSOURCEIP":"192.168.1.5","COUNT":4} 192.168.1.5 {"EVENTSOURCEIP":"192.168.1.5","COUNT":5} 192.168.1.5 {"EVENTSOURCEIP":"192.168.1.5","COUNT":6} 192.168.1.5 {"EVENTSOURCEIP":"192.168.1.5","COUNT":7} 192.168.1.5 {"EVENTSOURCEIP":"192.168.1.5","COUNT":8} 192.168.1.5 {"EVENTSOURCEIP":"192.168.1.5","COUNT":9} 192.168.1.5 {"EVENTSOURCEIP":"192.168.1.5","COUNT":10} Data from Kafka topic SUM_PER_SOURCE: confluent local consume SUM_PER_SOURCE -- --cloud --from-beginning --property print.key=true --value-format avro --property basic.auth.credentials.source=USER_INFO --property schema.registry.basic.auth.user.info=$SCHEMA_REGISTRY_BASIC_AUTH_USER_INFO --property schema.registry.url=$SCHEMA_REGISTRY_URL --property key.deserializer=org.apache.kafka.common.serialization.StringDeserializer --max-messages 10 192.168.1.2 {"EVENTSOURCEIP":{"string":"192.168.1.2"},"SUM":{"long":1}} 192.168.1.2 {"EVENTSOURCEIP":{"string":"192.168.1.2"},"SUM":{"long":4}} 192.168.1.2 {"EVENTSOURCEIP":{"string":"192.168.1.2"},"SUM":{"long":5}} 192.168.1.2 {"EVENTSOURCEIP":{"string":"192.168.1.2"},"SUM":{"long":8}} 192.168.1.2 {"EVENTSOURCEIP":{"string":"192.168.1.2"},"SUM":{"long":11}} 192.168.1.2 {"EVENTSOURCEIP":{"string":"192.168.1.2"},"SUM":{"long":12}} 192.168.1.2 {"EVENTSOURCEIP":{"string":"192.168.1.2"},"SUM":{"long":15}} 192.168.1.2 {"EVENTSOURCEIP":{"string":"192.168.1.2"},"SUM":{"long":16}} 192.168.1.2 {"EVENTSOURCEIP":{"string":"192.168.1.2"},"SUM":{"long":19}} 192.168.1.2 {"EVENTSOURCEIP":{"string":"192.168.1.2"},"SUM":{"long":22}} Objects in Cloud storage gcs: gs://confluent-cloud-demo/topics/COUNT_PER_SOURCE/year=2020/month=02/day=26/hour=03/COUNT_PER_SOURCE+1+0000000000.bin gs://confluent-cloud-demo/topics/COUNT_PER_SOURCE/year=2020/month=02/day=26/hour=03/COUNT_PER_SOURCE+1+0000001000.bin gs://confluent-cloud-demo/topics/COUNT_PER_SOURCE/year=2020/month=02/day=26/hour=03/COUNT_PER_SOURCE+1+0000002000.bin gs://confluent-cloud-demo/topics/COUNT_PER_SOURCE/year=2020/month=02/day=26/hour=03/COUNT_PER_SOURCE+1+0000003000.bin gs://confluent-cloud-demo/topics/COUNT_PER_SOURCE/year=2020/month=02/day=26/hour=03/COUNT_PER_SOURCE+1+0000004000.bin gs://confluent-cloud-demo/topics/COUNT_PER_SOURCE/year=2020/month=02/day=26/hour=03/COUNT_PER_SOURCE+3+0000000000.bin gs://confluent-cloud-demo/topics/COUNT_PER_SOURCE/year=2020/month=02/day=26/hour=03/COUNT_PER_SOURCE+3+0000001000.bin gs://confluent-cloud-demo/topics/COUNT_PER_SOURCE/year=2020/month=02/day=26/hour=03/COUNT_PER_SOURCE+3+0000002000.bin gs://confluent-cloud-demo/topics/COUNT_PER_SOURCE/year=2020/month=02/day=26/hour=03/COUNT_PER_SOURCE+3+0000003000.bin gs://confluent-cloud-demo/topics/COUNT_PER_SOURCE/year=2020/month=02/day=26/hour=03/COUNT_PER_SOURCE+3+0000004000.bin gs://confluent-cloud-demo/topics/SUM_PER_SOURCE/year=2020/month=02/day=26/hour=03/SUM_PER_SOURCE+1+0000000000.avro gs://confluent-cloud-demo/topics/SUM_PER_SOURCE/year=2020/month=02/day=26/hour=03/SUM_PER_SOURCE+1+0000001000.avro gs://confluent-cloud-demo/topics/SUM_PER_SOURCE/year=2020/month=02/day=26/hour=03/SUM_PER_SOURCE+1+0000002000.avro gs://confluent-cloud-demo/topics/SUM_PER_SOURCE/year=2020/month=02/day=26/hour=03/SUM_PER_SOURCE+1+0000003000.avro gs://confluent-cloud-demo/topics/SUM_PER_SOURCE/year=2020/month=02/day=26/hour=03/SUM_PER_SOURCE+3+0000000000.avro gs://confluent-cloud-demo/topics/SUM_PER_SOURCE/year=2020/month=02/day=26/hour=03/SUM_PER_SOURCE+3+0000001000.avro gs://confluent-cloud-demo/topics/SUM_PER_SOURCE/year=2020/month=02/day=26/hour=03/SUM_PER_SOURCE+3+0000002000.avro gs://confluent-cloud-demo/topics/SUM_PER_SOURCE/year=2020/month=02/day=26/hour=03/SUM_PER_SOURCE+3+0000003000.avro
Stop¶
Stop the demo and clean up all the resources, delete Kafka topics, delete the fully-managed connectors, delete the data in the cloud storage:
./stop.sh