Tableflow Quick Start with Managed Storage in Confluent Cloud

Confluent Tableflow enables exposing Apache Kafka® topics or Apache Flink® tables as Apache Iceberg™ tables by using Confluent managed storage.

Before you materialize your Kafka topic as an Iceberg table, you must configure the storage bucket where the materialized Iceberg tables are stored.

This quick start uses Confluent Managed Storage (CMS), so you don’t need to configure storage explicitly. When you enable Tableflow on your topic, you select the CMS option.

In this quick start, you perform the following steps:

Prerequisites

Step 1: Create a topic and publish data

In this step, you create a stock-trades topic by using Confluent Cloud Console. Click Add topic, provide the topic name, and create it with default settings. You can skip defining a contract.

Publish data to the stock-trades topic by using the Datagen Source connector with the Stock Trades data set. When you configure the Datagen connector, click Additional configuration and proceed through the provisioning workflow. When you reach the Configuration step, in the Select output record value format, select Avro. Click Continue and keep the default settings. For more information, see Datagen Source Connector Quick Start.

Step 2: Enable Tableflow on your topic

  1. Navigate to the stock-trades topic and click Enable Tableflow.

  2. In the Enable Tableflow dialog, click Use Confluent storage.

    The topic status updates to Tableflow Syncing.

Materializing a newly created topic as an Iceberg table can take a few minutes.

For low-throughput topics in which Kafka segments have not been filled, Tableflow tries optimistically to publish data every 15 minutes. This is best-effort and not guaranteed.

Step 3: Set up access to the Iceberg REST Catalog

To access Tableflow Iceberg tables through the built-in Iceberg REST Catalog, you must get an endpoint URI, an API key, and the secret associated with your account.

  1. In the navigation menu, click Tableflow to open the Tableflow overview page.

  2. In the API access section, copy the REST Catalog Endpoint, which resembles:

    https://tableflow.{CLOUD_REGION}.aws.confluent.cloud/iceberg/catalog/organizations/{ORG_ID}/environments/{ENV_ID}.
    
  3. In the API access section, click Create/View API keys.

  4. In the API keys page, click Add API key.

  5. In the Create API key page, select either My account or Service account, and click Next. Ensure that the user account or service account you select has the DeveloperRead role or higher permission level for the Kafka cluster.

  6. In the Resource scope page, select Tableflow and click Next.

  7. (Optional) Enter a name and description for your API key.

  8. Click Create API key and save your key and secret in a secure location.

  9. Click Complete.

    Once you obtain the API key and secret values, use them in the format <apikey>:<secret> wherever you need to specify the Iceberg REST Catalog credentials for your Iceberg Reader applications or compute engines.

It can take around 10 minutes to materialize a newly created topic as an Iceberg table.

Step 4: Query Iceberg tables from Spark

In this step, you read Iceberg tables created by Tableflow by using PySpark.

  • Ensure that Docker is installed and running in your development environment.
  1. Run the following command to start PySpark in a docker container. In this command, the AWS_REGION option must match your Kafka cluster region, for example, us-west-2.

    docker run -d \
      --name spark-iceberg \
      -v $(pwd)/warehouse:/home/iceberg/warehouse \
      -v $(pwd)/notebooks:/home/iceberg/notebooks/notebooks \
      -e AWS_REGION=${YOUR_CLUSTER_REGION} \
      -p 8888:8888 \
      -p 8080:8080 \
      -p 10000:10000 \
      -p 10001:10001 \
      tabulario/spark-iceberg
    

    Once the container has started successfully, you can access Jupyter notebooks in your browser by going to http://localhost:8888.

    Screenshot of Jupyter notebooks in PySpark
  2. Upload the following ipynb file by clicking Upload. This file pre-populates the notebook that you use to test Tableflow.

    tableflow-quickstart.ipynb
    {
       "cells": [
    {
       "cell_type": "markdown",
       "id": "2b3b8256-432a-46a8-8542-837777aada52",
       "metadata": {},
       "source": [
       "## Register rest catalog as default catalog for Spark"
       ]
    },
    {
       "cell_type": "code",
       "execution_count": 1,
       "id": "e4d27656-867c-464e-a8c0-4b590fd7aae2",
       "metadata": {},
       "outputs": [
       {
       "name": "stderr",
       "output_type": "stream",
       "text": [
          "24/05/18 07:27:44 WARN SparkSession: Using an existing Spark session; only runtime SQL configurations will take effect.\n"
       ]
       }
       ],
       "source": [
       "from pyspark.sql import SparkSession\n",
       "\n",
       "conf = (\n",
       "    pyspark.SparkConf()\n",
       "        .setAppName('Jupyter')\n",
       "        .set(\"spark.sql.extensions\", \"org.apache.iceberg.spark.extensions.IcebergSparkSessionExtensions\")\n",
       "        .set(\"spark.sql.catalog.tableflowdemo\", \"org.apache.iceberg.spark.SparkCatalog\")\n",
       "        .set(\"spark.sql.catalog.tableflowdemo.type\", \"rest\")\n",
       "        .set(\"spark.sql.catalog.tableflowdemo.uri\", \"<Tableflow REST Catalog URI>\")\n",
       "        .set(\"spark.sql.catalog.tableflowdemo.credential\", \"<api_key>:<secret>\")\n",
       "        .set(\"spark.sql.catalog.tableflowdemo.io-impl\", \"org.apache.iceberg.aws.s3.S3FileIO\")\n",
       "        .set(\"spark.sql.catalog.tableflowdemo.rest-metrics-reporting-enabled\", \"false\")\n",
       "        .set(\"spark.sql.defaultCatalog\", \"tableflowdemo\")\n",
       "        .set(\"spark.sql.catalog.tableflowdemo.s3.remote-signing-enabled\", \"true\")\n",
       ")\n",
       "spark = SparkSession.builder.config(conf=conf).getOrCreate()\n"
       ]
    },
    {
       "cell_type": "markdown",
       "id": "3f7f0ed8-39bf-4ad1-ad72-d2f6e010c4b5",
       "metadata": {},
       "source": [
       "## List all the tables in the db"
       ]
    },
    {
       "cell_type": "code",
       "execution_count": null,
       "id": "89fc7044-4f9e-47f7-8fca-05b15da88a9c",
       "metadata": {},
       "outputs": [],
       "source": [
       "%%sql \n",
       "SHOW TABLES in `<your_cluster_name>`"
       ]
    },
    {
       "cell_type": "markdown",
       "id": "9282572f-557d-4bb7-9f3e-511e86889304",
       "metadata": {},
       "source": [
       "## Query all records in the table"
       ]
    },
    {
       "cell_type": "code",
       "execution_count": null,
       "id": "345f8fef-9d1f-4cc5-8015-babdf4102988",
       "metadata": {},
       "outputs": [],
       "source": [
       "%%sql \n",
       "SELECT *\n",
       "FROM `<your_cluster_name>`.tableflow-ea-stock-trades;"
       ]
    }
    ],
    "metadata": {
    "kernelspec": {
       "display_name": "Python 3 (ipykernel)",
       "language": "python",
       "name": "python3"
    },
    "language_info": {
       "codemirror_mode": {
       "name": "ipython",
       "version": 3
       },
       "file_extension": ".py",
       "mimetype": "text/x-python",
       "name": "python",
       "nbconvert_exporter": "python",
       "pygments_lexer": "ipython3",
       "version": "3.9.18"
    }
    },
    "nbformat": 4,
    "nbformat_minor": 5
    }
    

    A new notebook named tableflow-quickstart appears.

  3. Double-click the tableflow-quickstart notebook to open it.

    Screenshot of Jupyter notebooks in PySpark showing the Confluent Tableflow Playground notebook
  4. Update the following properties of the Spark configuration with the values from Step 3.

    • spark.sql.catalog.tableflowdemo.uri
    • spark.sql.catalog.tableflowdemo.credential <apikey>:<secret>
  5. Run each cell individually from the Run menu by updating the query with the information corresponding to your cluster and topics.

    You can see the list of tables and table data in the cells’ output.

Step 5: Read Iceberg data from analytics engines (optional)

Amazon Athena for Spark (PySpark)

Amazon Athena PySpark supports reading Iceberg tables using the Iceberg REST Catalog. This means that you can use Athena to query Tableflow Iceberg tables.

  1. Ensure that you have the values from Step 1 and Step 2.

    • Tableflow REST Catalog endpoint
    • Credentials to access the Tableflow REST Catalog
  2. Log in to Amazon Athena.

  3. Create a workgroup. Provide a name, like “tableflow-wg”, for the workgroup and select Apache Spark as the analytics engine. Ensure that PySpark engine version 3 is selected. Click Create to create the workgroup.

    Screenshot showing the Create Workgroup page in Amazon Athena for Confluent Tableflow
  4. Under the workgroup you created in previous step, create a new notebook. Name your workbook, and in the Apache Spark properties section, select Custom to provide Spark properties in JSON format.

    Screenshot showing the Create Workgroup page in Amazon Athena for Confluent Tableflow
  5. Copy the following Spark properties configuration to Athena. Update spark.sql.catalog.tableflow-cluster.uri and spark.sql.catalog.tableflow-cluster.credential with the values from Step 3.

    {
      "spark.sql.catalog.tableflow-cluster": "org.apache.iceberg.spark.SparkCatalog",
      "spark.sql.catalog.tableflow-cluster.catalog-impl": "org.apache.iceberg.rest.RESTCatalog",
      "spark.sql.catalog.tableflow-cluster.uri": "<Tableflow-REST-Catalog-URI>",
      "spark.sql.catalog.tableflow-cluster.credential": "<cloud-api-key>:<secret>",
      "spark.sql.catalog.tableflowdemo.s3.remote-signing-enabled": "true",
      "spark.sql.defaultCatalog": "tableflow-cluster",
      "spark.sql.extensions": "org.apache.iceberg.spark.extensions.IcebergSparkSessionExtensions"
    }
    
  6. Run the following query in Athena PySpark to see the Iceberg tables in the Tableflow Catalog.

    %%sql
    SHOW TABLES in `<your-kafka-cluster-name>`
    
  7. Select a table of your choice and query data with the following PySpark query. In the current example, replace <table-name> with stock-trades.

    %%sql
    SELECT * FROM `<your-kafka-cluster-name>`.<table-name>;
    

Note

If you get this exception: org.apache.iceberg.exceptions.ForbiddenException: Forbidden: Forbidden: not authorized to sign the request for, check whether you have configured the Spark client to be in the same region as the Kafka cluster.

Step 6: Query data with other analytics engines (optional)

Explore other integration options for using Tableflow with analytics engines: