Flink SQL Quick Start with Confluent Cloud Console¶
Important
Confluent Cloud for Apache Flink®️ is currently available for Preview. A Preview feature is a Confluent Cloud component that is being introduced to gain early feedback from developers. Preview features can be used for evaluation and non-production testing purposes or to provide feedback to Confluent. The warranty, SLA, and Support Services provisions of your agreement with Confluent do not apply to Preview features. Confluent may discontinue providing Preview releases of the Preview features at any time in Confluent’s sole discretion. Check out Getting Help for questions, feedback and requests.
For Flink SQL features and limitations in the preview program, see Notable Limitations in Public Preview.
This quick start gets you up and running with Confluent Cloud for Apache Flink®️. The following steps show how to create a compute pool for running SQL statements on streaming data.
In this quick start guide, you perform the following steps:
- Step 1: Create a Flink compute pool
- Step 2: Create a workspace
- Step 3: Run a SQL statement
- Step 4a: (Optional) Query existing topics
- Step 4b: (Optional) Using Sample Data
Prerequisites¶
You need the OrganizationAdmin, EnvironmentAdmin, or FlinkAdmin role for creating compute pools, or the FlinkDevelper role if you already have a compute pool. If you don’t have the appropriate role, reach out to your OrganizationAdmin or EnvironmentAdmin.
Step 1: Create a Flink compute pool¶
A compute pool represents the compute resources that are used to run your SQL statements. The resources provided by a compute pool are shared among all statements that use it. It allows you to limit or guarantee resources as your use cases require. A compute pool is bound to a region.
Log in to Confluent Cloud Console at https://confluent.cloud/login.
In the navigation menu, click Environments and click the tile for the environment where you want to use Flink SQL.
In the environment details page, click Flink (preview).
In the Flink (preview) page, click Compute pools in case it is not already selected.
Click Create compute pool to open the Create compute pool page.
In the Region dropdown, select the region that hosts the data you want to process with Flink SQL, or use any region if you just want to try out Flink using sample data. Click Continue.
In the Pool name textbox, enter “my-compute-pool”.
In the Max Confluent Flink Units (CFU) dropdown, select 10. For more information, see Confluent Flink Units (CFUs).
Click Continue, and on the Review and create page, click Finish.
A tile for your compute pool appears on the Flink (preview) page. It shows the pool in the Provisioning state. It may take a few minutes for the pool to enter the Running state.
Tip
The tile for your compute pool provides the Confluent CLI command for using the pool from the CLI. Learn more about the CLI in the Flink SQL Quick Start with the SQL Shell.
Step 2: Create a workspace¶
When your compute pool is in the Running state, you can create a SQL workspace. SQL workspaces provide an intuitive, flexible UI for dynamically exploring and interacting with all of your data on Confluent Cloud using Flink SQL. In workspaces, you can save your queries, run multiple queries simultaneously in a single view, and browse your catalogs, databases and tables.
In the my-compute-pool tile, click Open SQL workspace.
A new workspace page opens, containing a SQL code editor.
Step 3: Run a SQL statement¶
In the code editor, or cell, of the new workspace, you can start running SQL statements.
Copy the following SQL and paste it into the cell.
SELECT CURRENT_TIMESTAMP;
Click Run.
Information about the statement is displayed, including its status and unique identifier. Click the Statement ID link to open the statement details pane.
After a few seconds, the result from the statement is displayed.
Your output should resemble:
CURRENT_TIMESTAMP 2023-09-14 17:35:48.322
At this point, you have verified that Confluent Cloud for Flink is working as expected. Now, you have two options on how to proceed.
Step 4a: (Optional) Query existing topics¶
If you’ve created the compute pool in a region where you already have Kafka clusters and topics, you can explore this data with Flink SQL. If not, proceed to Step 4b.
In Flink SQL, catalog objects (e.g. tables) are scoped by catalog and database.
- A catalog is a collection of databases that share the same namespace.
- A database is a collection of tables that share the same namespace.
Tip
In Confluent Cloud, an environment is mapped to a Flink catalog, and a Kafka cluster is mapped to a Flink database.
Set the a default catalog and database. While you can always use three-part identifiers for your tables (like
catalog.database.table
), it’s more convenient to set a default.You can do this by using the dropdown menus (Use catalog and Use database) in the top-right corner of the SQL workspace. First, choose an environment as your default catalog, then set the database to one of the Kafka clusters in that environment.
Verify this worked by clicking
to create a new cell, and run the following statement.
SHOW TABLES;
This statement lists all the tables of the Kafka cluster that you have just selected as the default.
In another cell, you can now browse any of these tables by running a SELECT statement.
SELECT * FROM <table_name>;
Step 4b: (Optional) Using Sample Data¶
If you don’t have any existing clusters and topics in the regions where you created the compute pool, you can use Flink to create a topic and populate it with data before subsequently querying it.
Follow the steps in Manage Kafka Clusters on Confluent Cloud to create a Kafka cluster named “my-kafka-cluster” in the same region that you created the compute pool.
Now you can set the default catalog and database. While you can always use three-part identifiers for your tables (like
catalog.database.table
) it’s more convenient to set a default.You can do this by using the dropdown menus in the top-right corner of the SQL workspace. First, choose an environment as your default catalog, then choose “my-kafka-cluster” as your default database.
Tip
In Confluent Cloud, an environment is mapped to a Flink catalog, and a Kafka cluster is mapped to a Flink database.
Click
to create a new cell, and run the following statement to create a table in the default database and catalog. Flink SQL automatically creates the backing topic and schemas in Kafka and Schema Registry.
CREATE TABLE random_int_table( ts TIMESTAMP_LTZ(3), random_value INT );
Once the statement is completed, you should see the table and its schema in the left-side catalog browser.
In the next cell, run the following INSERT VALUES statement to populate
random_int_table
with records that have a timestamp field and an integer field. timestamp values are generated by the CURRENT_TIMESTAMP function, and integer values are generated by the RAND_INTEGER(INT) function.INSERT INTO random_int_table VALUES (CURRENT_TIMESTAMP, RAND_INTEGER(100)), (CURRENT_TIMESTAMP, RAND_INTEGER(1000)), (CURRENT_TIMESTAMP, RAND_INTEGER(10000)), (CURRENT_TIMESTAMP, RAND_INTEGER(100000)), (CURRENT_TIMESTAMP, RAND_INTEGER(1000000));
Finally, in the next cell, run the following statement to query
random_int_table
for all of its records.SELECT * FROM random_int_table;
The statement continues to run, waiting for more records to be produceed to the table/topic it reads from. You can stop the query by clicking Stop.