Python Table API Quick Start on Confluent Cloud for Apache Flink¶
Confluent Cloud for Apache Flink®️ supports programming applications with the Table API. Confluent provides a plugin for running applications that use the Table API on Confluent Cloud.
For more information, see Table API.
For code examples, see Python Examples for Table API on Confluent Cloud.
Prerequisites¶
- Access to Confluent Cloud
- A compute pool in Confluent Cloud
- A Apache Kafka® cluster, if you want to run examples that store data in Kafka
- Java version 11 or later
To run Table API and Flink SQL programs, you must generate an API key that’s specific to the Flink environment. Also, you need Confluent Cloud account details, like your organization and environment identifiers.
- Flink API Key: Follow the steps in Generate a Flink API key. For convenience, assign your Flink key and secret to the FLINK_API_KEY and FLINK_API_SECRET environment variables.
- Organization ID: The identifier your organization, for example,
b0b421724-4586-4a07-b787-d0bb5aacbf87
. For convenience, assign your organization identifier to the ORG_ID environment variable. - Environment ID: The identifier of the environment where your Flink SQL
statements run, for example,
env-z3y2x1
. For convenience, assign your environment identifier to the ENV_ID environment variable. - Cloud provider name: The name of the cloud provider where your cluster
runs, for example,
aws
. To see the available providers, run theconfluent flink region list
command. For convenience, assign your cloud provider to the CLOUD_PROVIDER environment variable. - Cloud region: The name of the region where your cluster runs, for
example,
us-east-1
. To see the available regions, run theconfluent flink region list
command. For convenience, assign your cloud region to the CLOUD_REGION environment variable.
export CLOUD_PROVIDER="aws"
export CLOUD_REGION="us-east-1"
export FLINK_API_KEY="<your-flink-api-key>"
export FLINK_API_SECRET="<your-flink-api-secret>"
export ORG_ID="<your-organization-id>"
export ENV_ID="<your-environment-id>"
export COMPUTE_POOL_ID="<your-compute-pool-id>"
Compile and run a Table API program¶
Use poetry to create a virtual environment that contains all required dependencies and project files.
Follow the instructions here to install pipx.
Run the following command to install poetry.
pipx install poetry
Copy the following code into a file named
hello_table_api.py
.from pyflink.table.confluent import ConfluentSettings, ConfluentTools from pyflink.table import TableEnvironment, Row from pyflink.table.expressions import col, row def run(): # Set up the connection to Confluent Cloud settings = ConfluentSettings.from_file("/cloud.properties") env = TableEnvironment.create(settings) # Run your first Flink statement in Table API env.from_elements([row("Hello world!")]).execute().print() # Or use SQL env.sql_query("SELECT 'Hello world!'").execute().print() # Structure your code with Table objects - the main ingredient of Table API. table = env.from_path("examples.marketplace.clicks") \ .filter(col("user_agent").like("Mozilla%")) \ .select(col("click_id"), col("user_id")) table.print_schema() print(table.explain()) # Use the provided tools to test on a subset of the streaming data expected = ConfluentTools.collect_materialized_limit(table, 50) actual = [Row(42, 500)] if expected != actual: print("Results don't match!")
In the directory where you created
hello_table_api.py
, run the following command to build a virtual environment containing all required dependencies and project files.poetry install
Run the following command to execute the Table API program.
poetry run hello_table_api