Flink SQL Shell Quick Start on Confluent Cloud for Apache Flink
This quick start walks you through the following steps to get you up and running with Confluent Cloud for Apache Flink®.
Prerequisites
You need the following prerequisites to use the Flink SQL shell.
Access to Confluent Cloud.
The organization ID and environment ID for your organization.
(Optional) A compute pool ID, if you want to use a specific compute pool instead of the default pool. For more information, see Compute Pools.
The FlinkDeveloper role is granted by default to all users in an environment. To create compute pools manually for workload isolation or cost control, you need the OrganizationAdmin, EnvironmentAdmin, or FlinkAdmin role. If you don’t have the appropriate role, reach out to your OrganizationAdmin or EnvironmentAdmin.
The Confluent CLI. To use the Flink SQL shell, update to the latest version of the Confluent CLI by running the following command:
confluent update --yes
If you used homebrew to install the Confluent CLI, update the CLI by using the
brew upgradecommand, instead ofconfluent update.For more information, see Confluent CLI.
Step 1: Log in to Confluent Cloud with the Confluent CLI
Run the following CLI command to log in to Confluent Cloud.
confluent login --save --organization ${ORG_ID}
Your output should resemble:
Logged in as "<your-email>" for organization "<your-org-id>" ("<your-org-name>").
Step 2: Start the Flink SQL shell
Start the Flink SQL shell by running the confluent flink shell command. The shell connects with Confluent Cloud
Important
This guide focuses on ad-hoc statements. To run statements in long-running jobs, you should provide the --service-account option in the confluent flink shell command. When you start the shell without this option, statements run with your user account. For more information, see Service Accounts on Confluent Cloud.
Run the following CLI command to start the Flink SQL shell.
confluent flink shell --environment ${ENV_ID}
Your output should resemble:
Welcome!
To exit, press Ctrl-Q or type "exit".
[Ctrl-Q] Quit [Ctrl-S] Toggle Smart Completion
>
The shell automatically uses the default compute pool for your environment. To use a specific compute pool instead, add the --compute-pool option:
confluent flink shell --compute-pool ${COMPUTE_POOL_ID} --environment ${ENV_ID}
You’re ready to start processing data by submitting statements to Flink SQL.
Step 3: Submit a SQL statement
In the SQL shell, run the following statement to see Flink SQL in action. The CURRENT_TIMESTAMP function returns the local date and time.
SELECT CURRENT_TIMESTAMP;
Your output should resemble:
Creating statement: cli-2026-02-10-123456-7a8b9c0d-1234-5a6b-7890-808d0bbfecc6
Statement successfully submitted.
Finished statement execution. Statement phase: RUNNING.
+-------------------------+
| CURRENT_TIMESTAMP |
+-------------------------+
| 2026-02-10 11:25:09.903 |
+-------------------------+
For all functions and statements supported by Flink SQL, see Flink SQL Reference.
Step 4: Create and populate a table
The following steps show how to create a table, populate it with a few records, and query it to view the records it contains.
Run the following statement to create a table that contains pseudorandom integers.
CREATE TABLE random_float_table( ts TIMESTAMP_LTZ(3), random_value FLOAT);
Run the following INSERT VALUES statement to populate
random_int_tablewith records that have a timestamp field and afloatfield. timestamp values are generated by the CURRENT_TIMESTAMP function, and float values are generated by the RAND_INTEGER(INT) function multiplied by a float.INSERT INTO random_float_table VALUES (CURRENT_TIMESTAMP, RAND_INTEGER(100)*0.02), (CURRENT_TIMESTAMP, RAND_INTEGER(1000)*0.05), (CURRENT_TIMESTAMP, RAND_INTEGER(10000)*0.20), (CURRENT_TIMESTAMP, RAND_INTEGER(100000)*0.22), (CURRENT_TIMESTAMP, RAND_INTEGER(1000000)*0.7);
Press ENTER to return to the SQL shell. Because INSERT INTO VALUES is a point-in-time statement, it exits after it completes inserting records.
Run the following statement to query
random_float_tablefor all of its records.SELECT * FROM random_float_table;
Your output should resemble:
ts random_value 2026-02-10 20:24:19.366 0.46 2026-02-10 20:24:19.276 28.75 2026-02-10 20:24:19.367 1467.2 2026-02-10 20:24:19.368 7953.88 2026-02-10 20:24:19.465 685883.1
Press Q to exit the results view and stop the statement.
Run the SHOW JOBS statement to get the status of statements in your SQL environment.
SHOW JOBS;
Your output should resemble:
Creating statement: cli-2026-02-10-123456-7a8b9c0d-1234-5a6b-7890-808d0bbfecc6 Statement successfully submitted. Finished statement execution. Statement phase: COMPLETED. +----------------------------------+-----------+------------------+--------------+------------------+------------------+ | Name | Phase | Statement | Compute Pool | Creation Time | Detail | +----------------------------------+-----------+------------------+--------------+------------------+------------------+ | cli-2026-02-10-080946-155640b... | COMPLETED | CREATE TABLE ... | lfcp-xxxxxx | 2026-02-10 20... | This statemen... | | cli-2026-02-10-123456-7a8b9c0... | COMPLETED | SELECT CURREN... | lfcp-xxxxxx | 2026-02-10 20... | | +----------------------------------+-----------+------------------+--------------+------------------+------------------+
Step 5: Query streaming data
Flink SQL enables using familiar SQL syntax to query streaming data. Confluent Cloud for Apache Flink provides example data streams that you can experiment with. In this step, you query the orders table from the marketplace database in the examples catalog.
In Flink SQL, catalog objects, like tables, are scoped by catalog and database.
A catalog is a collection of databases that share the same namespace.
A database is a collection of tables that share the same namespace.
In Confluent Cloud, an environment is mapped to a Flink catalog, and a Kafka cluster is mapped to a Flink database.
You can always use three-part identifiers for your tables, like catalog.database.table, but it’s more convenient to set a default.
Run the following statement to set the default catalog.
USE CATALOG `examples`;
Your output should resemble:
+---------------------+----------+ | Key | Value | +---------------------+----------+ | sql.current-catalog | examples | +---------------------+----------+
Run the following statement to set the default database.
USE `marketplace`;
Your output should resemble:
+----------------------+-------------+ | Key | Value | +----------------------+-------------+ | sql.current-database | marketplace | +----------------------+-------------+
Run the following statement to see the list of available tables.
SHOW TABLES;
Your output should resemble:
+------------+ | Table Name | +------------+ | clicks | | customers | | orders | | products | +------------+
Run the following statement to inspect the
ordersdata stream.SELECT * FROM orders;
Your output should resemble:
order_id customer_id product_id price 36d77b21-e68f-4123-b87a-cc19ac1f36ac 3137 1305 65.71 7fd3cd2a-392b-4f8f-b953-0bfa1d331354 3063 1327 17.75 1a223c61-38a5-4b8c-8465-2a6b359bf05e 3064 1166 14.95 ...
Press Q to exit the results view and stop the statement.
Congratulations, you have run your first Flink SQL statements on Confluent Cloud using the SQL Shell.