.. _cloud-etl: .. toctree:: :maxdepth: 2 Cloud ETL Demo ============== This demo showcases a cloud ETL solution leveraging all fully-managed services on `Confluent Cloud `__. Using |ccloud| CLI, the demo creates a source connector that reads data from an AWS Kinesis stream into |ccloud|, then a |ccloud| KSQL application processes that data, and then a sink connector writes the output data into cloud storage in the provider of your choice (one of GCP GCS, AWS S3, or Azure Blob). .. figure:: images/topology.png :alt: image The end result is an event streaming ETL, running 100% in the cloud, spanning multiple cloud providers. This enables you to: * Build business applications on a full event streaming platform * Span multiple cloud providers (AWS, GCP, Azure) and on-prem datacenters * Use Kafka to aggregate data in single source of truth * Harness the power of `KSQL `__ for stream processing End-to-end Streaming ETL ======================== This demo showcases an entire end-to-end cloud ETL deployment, built for 100% cloud services: - Kinesis source connector: reads from a Kinesis stream and writes the data to a Kafka topic in |ccloud| - `Amazon Kinesis Source Connector for Confluent Cloud `__ - KSQL: streaming SQL engine that enables real-time data processing against Kafka - `Confluent Cloud KSQL `__ - Cloud storage sink connector: writes data from Kafka topics to cloud storage, one of: - `Azure Blob Storage Sink Connector for Confluent Cloud `__ - `Google Cloud Storage Sink Connector for Confluent Cloud `__ - `Amazon S3 Sink Connector for Confluent Cloud `__ - |sr-ccloud|: for centralized management of schemas and checks compatibility as schemas evolve Data Flow --------- The data set is a stream of log messages, which in this demo is mock data captured in :devx-examples:`eventLogs.json|cloud-etl/eventLogs.json`. +-----------------------+-----------------------+---------------------+ | Component | Consumes From | Produces To | +=======================+=======================+=====================+ | Kinesis source | Kinesis stream | Kafka topic | | connector | ``demo-logs`` | ``eventLogs`` | +-----------------------+-----------------------+---------------------+ | KSQL | ``eventLogs`` | KSQL streams and | | | | tables | +-----------------------+-----------------------+---------------------+ | GCS/S3/Blob sink | KSQL tables | GCS/S3/Blob | | connector | ``COUNT_PER_SOURCE``, | | | | ``SUM_PER_SOURCE`` | | +-----------------------+-----------------------+---------------------+ Warning ======= This demo uses real cloud resources, including that of |ccloud|, AWS Kinesis, and one of the cloud storage providers. To avoid unexpected charges, carefully evaluate the cost of resources before launching the demo and ensure all resources are destroyed after you are done running it. Prerequisites ============= Cloud services -------------- - `Confluent Cloud cluster `__: for development only. Do not use a production cluster. - `Confluent Cloud KSQL `__ provisioned in your |ccloud| - AWS or GCP or Azure access Local Tools ----------- - `Confluent Cloud CLI `__ v0.239.0 or later - ``gsutil`` CLI, properly initialized with your credentials: (optional) if destination is GPC GCS - ``aws`` CLI, properly initialized with your credentials: used for AWS Kinesis and (optional) if destination is AWS S3 - ``az`` CLI, properly initialized with your credentials: (optional) if destination is Azure Blob storage - ``jq`` - ``curl`` - ``timeout`` - ``python`` - Download `Confluent Platform `__ |release|: for more advanced Confluent CLI functionality (optional) Run the Demo ============ Setup ----- Because this demo interacts with real resources in Kinesis, a destination storage service, and |ccloud|, you must set up some initial parameters to communicate with these services. #. By default, the demo reads the configuration parameters for your |ccloud| environment from a file at ``$HOME/.ccloud/config``. You can change this filename via the parameter ``CONFIG_FILE`` in :devx-examples:`config/demo.cfg|cloud-etl/config/demo.cfg`. Enter the configuration parameters for your |ccloud| cluster, replacing the values in ``<...>`` below particular for your |ccloud| environment: .. code:: shell $ cat $HOME/.ccloud/config bootstrap.servers= ssl.endpoint.identification.algorithm=https security.protocol=SASL_SSL sasl.mechanism=PLAIN sasl.jaas.config=org.apache.kafka.common.security.plain.PlainLoginModule required username\="" password\=""; schema.registry.url=https:// schema.registry.basic.auth.user.info=: basic.auth.credentials.source=USER_INFO ksql.endpoint=https:// ksql.basic.auth.user.info=: To retrieve the values for the endpoints and credentials in the file above, find them using either the |ccloud| UI or |ccloud| CLI commands. If you have multiple |ccloud| clusters, make sure to use the one with the associated KSQL cluster. The commands below demonstrate how to retrieve the values using the |ccloud| CLI. .. code:: shell # Login ccloud login --url https://confluent.cloud # BROKER ENDPOINT ccloud kafka cluster list ccloud kafka cluster use ccloud kafka cluster describe # SR ENDPOINT ccloud schema-registry cluster describe # KSQL ENDPOINT ccloud ksql app list # Credentials: API key and secret, one for each resource above ccloud api-key create #. Clone the `examples GitHub repository `__ and check out the :litwithvars:`|release|-post` branch. .. codewithvars:: bash git clone https://github.com/confluentinc/examples cd examples git checkout |release|-post #. Change directory to the Cloud ETL demo at ``cloud-etl``: .. codewithvars:: bash cd cloud-etl #. Modify the demo configuration file at :devx-examples:`config/demo.cfg|cloud-etl/config/demo.cfg`. Set the proper credentials and parameters for the source, e.g. AWS Kinesis. Also set the required parameters for the respective destination cloud storage provider: - GCP GCS - ``DESTINATION_STORAGE='gcs'`` - ``GCS_CREDENTIALS_FILE`` - ``GCS_BUCKET`` - AWS S3 - ``DESTINATION_STORAGE='s3'`` - ``S3_PROFILE`` - ``S3_BUCKET`` - Azure Blob - ``DESTINATION_STORAGE='az'`` - ``AZBLOB_STORAGE_ACCOUNT`` - ``AZBLOB_CONTAINER`` Run --- #. Log in to |ccloud| with the command ``ccloud login``, and use your |ccloud| username and password. .. code:: shell ccloud login --url https://confluent.cloud #. Run the demo. It takes approximately 7 minutes to run. .. code:: bash ./start.sh Validate -------- #. Using the `Confluent Cloud CLI `__, list all the fully-managed connectors created in this cluster. .. code:: bash ccloud connector list Sample output: .. code:: bash ID | Name | Status | Type +-----------+----------------------+---------+--------+ lcc-knjgv | demo-KinesisSource | RUNNING | source lcc-nwkxv | demo-GcsSink-avro | RUNNING | sink lcc-3r7w2 | demo-GcsSink-no-avro | RUNNING | sink #. The demo automatically created these connectors using the |ccloud| CLI command ``ccloud connector create`` that included passing in a configuration file from the :devx-examples:`connector configuration directory|cloud-etl/connectors/`. Describe any connector in more detail. .. code:: bash ccloud connector describe lcc-knjgv Sample output: .. code:: bash Connector Details +--------+--------------------+ | ID | lcc-knjgv | | Name | demo-KinesisSource | | Status | RUNNING | | Type | source | +--------+--------------------+ Task Level Details Task_ID | State +---------+---------+ 0 | RUNNING Configuration Details Configuration | Value +---------------------+---------------------------------------------------------+ schema.registry.url | https://psrc-lz3xz.us-central1.gcp.confluent.cloud value.converter | io.confluent.connect.replicator.util.ByteArrayConverter aws.access.key.id | **************** kafka.region | us-west2 tasks.max | 1 aws.secret.key.id | **************** kafka.topic | eventLogs kafka.api.key | **************** kinesis.position | TRIM_HORIZON kinesis.region | us-west-2 kinesis.stream | demo-logs cloud.environment | prod connector.class | KinesisSource key.converter | org.apache.kafka.connect.storage.StringConverter name | demo-KinesisSource kafka.api.secret | **************** kafka.endpoint | SASL_SSL://pkc-4r087.us-west2.gcp.confluent.cloud:9092 #. From the `Confluent Cloud UI `__, select your Kafka cluster and click the KSQL tab to view the `flow `__ through your KSQL application: .. figure:: images/flow.png :alt: image #. The demo's :devx-examples:`KSQL commands|cloud-etl/ksql.commands` used KSQL's REST API to generate a KSQL TABLE ``COUNT_PER_SOURCE``, formatted as JSON, and its underlying Kafka topic is ``COUNT_PER_SOURCE``. It also generated a KSQL TABLE ``SUM_PER_SOURCE``, formatted as Avro, and its underlying Kafka topic is ``SUM_PER_SOURCE``. Use the Confluent Cloud KSQL UI or its REST API to interact with the KSQL application: .. code:: bash curl -X POST $KSQL_ENDPOINT/ksql \ -H "Content-Type: application/vnd.ksql.v1+json; charset=utf-8" \ -u $KSQL_BASIC_AUTH_USER_INFO \ -d @<(cat <: https:///subjects/SUM_PER_SOURCE-value/versions/latest | jq -r '.schema' | jq . Sample output: .. code:: json { "type": "record", "name": "KsqlDataSourceSchema", "namespace": "io.confluent.ksql.avro_schemas", "fields": [ { "name": "EVENTSOURCEIP", "type": [ "null", "string" ], "default": null }, { "name": "SUM", "type": [ "null", "long" ], "default": null } ] } #. View the data from Kinesis, |ak|, and cloud storage after running the demo. Sample output shown below: .. code:: bash ./read-data.sh Sample output: .. code:: shell Data from Kinesis stream demo-logs --limit 10: {"eventSourceIP":"192.168.1.1","eventAction":"Upload","result":"Pass","eventDuration":3} {"eventSourceIP":"192.168.1.1","eventAction":"Create","result":"Pass","eventDuration":2} {"eventSourceIP":"192.168.1.1","eventAction":"Delete","result":"Fail","eventDuration":5} {"eventSourceIP":"192.168.1.2","eventAction":"Upload","result":"Pass","eventDuration":1} {"eventSourceIP":"192.168.1.2","eventAction":"Create","result":"Pass","eventDuration":3} {"eventSourceIP":"192.168.1.1","eventAction":"Upload","result":"Pass","eventDuration":3} {"eventSourceIP":"192.168.1.1","eventAction":"Create","result":"Pass","eventDuration":2} {"eventSourceIP":"192.168.1.1","eventAction":"Delete","result":"Fail","eventDuration":5} {"eventSourceIP":"192.168.1.2","eventAction":"Upload","result":"Pass","eventDuration":1} {"eventSourceIP":"192.168.1.2","eventAction":"Create","result":"Pass","eventDuration":3} Data from Kafka topic eventLogs: confluent local consume eventLogs -- --cloud --from-beginning --property print.key=true --max-messages 10 5 {"eventSourceIP":"192.168.1.5","eventAction":"Upload","result":"Pass","eventDuration":4} 5 {"eventSourceIP":"192.168.1.5","eventAction":"Create","result":"Pass","eventDuration":1} 5 {"eventSourceIP":"192.168.1.5","eventAction":"Delete","result":"Fail","eventDuration":1} 5 {"eventSourceIP":"192.168.1.5","eventAction":"Upload","result":"Pass","eventDuration":4} 5 {"eventSourceIP":"192.168.1.5","eventAction":"Create","result":"Pass","eventDuration":1} 5 {"eventSourceIP":"192.168.1.5","eventAction":"Delete","result":"Fail","eventDuration":1} 5 {"eventSourceIP":"192.168.1.5","eventAction":"Upload","result":"Pass","eventDuration":4} 5 {"eventSourceIP":"192.168.1.5","eventAction":"Create","result":"Pass","eventDuration":1} 5 {"eventSourceIP":"192.168.1.5","eventAction":"Delete","result":"Fail","eventDuration":1} 5 {"eventSourceIP":"192.168.1.5","eventAction":"Upload","result":"Pass","eventDuration":4} Data from Kafka topic COUNT_PER_SOURCE: confluent local consume COUNT_PER_SOURCE -- --cloud --from-beginning --property print.key=true --max-messages 10 192.168.1.5 {"EVENTSOURCEIP":"192.168.1.5","COUNT":1} 192.168.1.5 {"EVENTSOURCEIP":"192.168.1.5","COUNT":2} 192.168.1.5 {"EVENTSOURCEIP":"192.168.1.5","COUNT":3} 192.168.1.5 {"EVENTSOURCEIP":"192.168.1.5","COUNT":4} 192.168.1.5 {"EVENTSOURCEIP":"192.168.1.5","COUNT":5} 192.168.1.5 {"EVENTSOURCEIP":"192.168.1.5","COUNT":6} 192.168.1.5 {"EVENTSOURCEIP":"192.168.1.5","COUNT":7} 192.168.1.5 {"EVENTSOURCEIP":"192.168.1.5","COUNT":8} 192.168.1.5 {"EVENTSOURCEIP":"192.168.1.5","COUNT":9} 192.168.1.5 {"EVENTSOURCEIP":"192.168.1.5","COUNT":10} Data from Kafka topic SUM_PER_SOURCE: confluent local consume SUM_PER_SOURCE -- --cloud --from-beginning --property print.key=true --value-format avro --property basic.auth.credentials.source=USER_INFO --property schema.registry.basic.auth.user.info=$SCHEMA_REGISTRY_BASIC_AUTH_USER_INFO --property schema.registry.url=$SCHEMA_REGISTRY_URL --property key.deserializer=org.apache.kafka.common.serialization.StringDeserializer --max-messages 10 192.168.1.2 {"EVENTSOURCEIP":{"string":"192.168.1.2"},"SUM":{"long":1}} 192.168.1.2 {"EVENTSOURCEIP":{"string":"192.168.1.2"},"SUM":{"long":4}} 192.168.1.2 {"EVENTSOURCEIP":{"string":"192.168.1.2"},"SUM":{"long":5}} 192.168.1.2 {"EVENTSOURCEIP":{"string":"192.168.1.2"},"SUM":{"long":8}} 192.168.1.2 {"EVENTSOURCEIP":{"string":"192.168.1.2"},"SUM":{"long":11}} 192.168.1.2 {"EVENTSOURCEIP":{"string":"192.168.1.2"},"SUM":{"long":12}} 192.168.1.2 {"EVENTSOURCEIP":{"string":"192.168.1.2"},"SUM":{"long":15}} 192.168.1.2 {"EVENTSOURCEIP":{"string":"192.168.1.2"},"SUM":{"long":16}} 192.168.1.2 {"EVENTSOURCEIP":{"string":"192.168.1.2"},"SUM":{"long":19}} 192.168.1.2 {"EVENTSOURCEIP":{"string":"192.168.1.2"},"SUM":{"long":22}} Objects in Cloud storage gcs: gs://confluent-cloud-demo/topics/COUNT_PER_SOURCE/year=2020/month=02/day=26/hour=03/COUNT_PER_SOURCE+1+0000000000.bin gs://confluent-cloud-demo/topics/COUNT_PER_SOURCE/year=2020/month=02/day=26/hour=03/COUNT_PER_SOURCE+1+0000001000.bin gs://confluent-cloud-demo/topics/COUNT_PER_SOURCE/year=2020/month=02/day=26/hour=03/COUNT_PER_SOURCE+1+0000002000.bin gs://confluent-cloud-demo/topics/COUNT_PER_SOURCE/year=2020/month=02/day=26/hour=03/COUNT_PER_SOURCE+1+0000003000.bin gs://confluent-cloud-demo/topics/COUNT_PER_SOURCE/year=2020/month=02/day=26/hour=03/COUNT_PER_SOURCE+1+0000004000.bin gs://confluent-cloud-demo/topics/COUNT_PER_SOURCE/year=2020/month=02/day=26/hour=03/COUNT_PER_SOURCE+3+0000000000.bin gs://confluent-cloud-demo/topics/COUNT_PER_SOURCE/year=2020/month=02/day=26/hour=03/COUNT_PER_SOURCE+3+0000001000.bin gs://confluent-cloud-demo/topics/COUNT_PER_SOURCE/year=2020/month=02/day=26/hour=03/COUNT_PER_SOURCE+3+0000002000.bin gs://confluent-cloud-demo/topics/COUNT_PER_SOURCE/year=2020/month=02/day=26/hour=03/COUNT_PER_SOURCE+3+0000003000.bin gs://confluent-cloud-demo/topics/COUNT_PER_SOURCE/year=2020/month=02/day=26/hour=03/COUNT_PER_SOURCE+3+0000004000.bin gs://confluent-cloud-demo/topics/SUM_PER_SOURCE/year=2020/month=02/day=26/hour=03/SUM_PER_SOURCE+1+0000000000.avro gs://confluent-cloud-demo/topics/SUM_PER_SOURCE/year=2020/month=02/day=26/hour=03/SUM_PER_SOURCE+1+0000001000.avro gs://confluent-cloud-demo/topics/SUM_PER_SOURCE/year=2020/month=02/day=26/hour=03/SUM_PER_SOURCE+1+0000002000.avro gs://confluent-cloud-demo/topics/SUM_PER_SOURCE/year=2020/month=02/day=26/hour=03/SUM_PER_SOURCE+1+0000003000.avro gs://confluent-cloud-demo/topics/SUM_PER_SOURCE/year=2020/month=02/day=26/hour=03/SUM_PER_SOURCE+3+0000000000.avro gs://confluent-cloud-demo/topics/SUM_PER_SOURCE/year=2020/month=02/day=26/hour=03/SUM_PER_SOURCE+3+0000001000.avro gs://confluent-cloud-demo/topics/SUM_PER_SOURCE/year=2020/month=02/day=26/hour=03/SUM_PER_SOURCE+3+0000002000.avro gs://confluent-cloud-demo/topics/SUM_PER_SOURCE/year=2020/month=02/day=26/hour=03/SUM_PER_SOURCE+3+0000003000.avro Stop ---- #. Stop the demo and clean up all the resources, delete Kafka topics, delete the fully-managed connectors, delete the data in the cloud storage: .. code:: bash ./stop.sh