Flink SQL Shell Quick Start on Confluent Cloud for Apache Flink¶
This quick start walks you through the following steps to get you up and running with Confluent Cloud for Apache Flink®.
- Step 1: Log in to Confluent Cloud with the Confluent CLI
- Step 2: Start the Flink SQL shell
- Step 3: Submit a SQL statement
- Step 4: Create and populate a table
- Step 5: Query streaming data
Prerequisites¶
You need the following prerequisites to use Confluent Cloud for Apache Flink.
Access to Confluent Cloud.
The organization ID, environment ID, and compute pool ID for your organization.
The OrganizationAdmin, EnvironmentAdmin, or FlinkAdmin role for creating compute pools, or the FlinkDeveloper role if you already have a compute pool. If you don’t have the appropriate role, reach out to your OrganizationAdmin or EnvironmentAdmin.
The Confluent CLI. To use the Flink SQL shell, update to the latest version of the Confluent CLI by running the following command:
confluent update --yes
If you used homebrew to install the Confluent CLI, update the CLI by using the
brew upgrade
command, instead ofconfluent update
.For more information, see Confluent CLI.
Step 1: Log in to Confluent Cloud with the Confluent CLI¶
Run the following CLI command to log in to Confluent Cloud.
confluent login --save --organization ${ORG_ID}
Your output should resemble:
Assuming https protocol.
Logged in as "<your-email>" for organization "<your-org-id>" ("<your-org-name>").
Step 2: Start the Flink SQL shell¶
Start the Flink SQL shell by running the confluent flink shell
command.
The shell connects with Confluent Cloud
Important
This guide focuses on ad-hoc statements. To run statements in long-running
jobs, you should provide the --service-account
option in the
confluent flink shell
command. When you start the shell
without this option, statements run with your user account. For more
information, see Service Accounts on Confluent Cloud.
Run the following CLI command to start the Flink SQL shell.
confluent flink shell --compute-pool ${COMPUTE_POOL_ID} --environment ${ENV_ID}
Your output should resemble:
Welcome!
To exit, press Ctrl-Q or type "exit".
[Ctrl-Q] Quit [Ctrl-S] Toggle Smart Completion
>
You’re ready to start processing data by submitting statements to Flink SQL.
Step 3: Submit a SQL statement¶
In the SQL shell, run the following statement to see Flink SQL in action. The CURRENT_TIMESTAMP function returns the local date and time.
SELECT CURRENT_TIMESTAMP;
Your output should resemble:
Statement name: ab12345c-6e11-7bcd-9
Statement successfully submitted.
Fetching results...
+-------------------------+
| CURRENT_TIMESTAMP |
+-------------------------+
| 2023-07-05 18:57:53.867 |
+-------------------------+
For all functions and statements supported by Flink SQL, see Flink SQL Reference.
Step 4: Create and populate a table¶
The following steps show how to create a table, populate it with a few records, and query it to view the records it contains.
Run the following statement to create a table that contains pseudorandom integers.
CREATE TABLE random_float_table( ts TIMESTAMP_LTZ(3), random_value FLOAT);
Run the following INSERT VALUES statement to populate
random_int_table
with records that have a timestamp field and afloat
field. timestamp values are generated by the CURRENT_TIMESTAMP function, and float values are generated by the RAND_INTEGER(INT) function multiplied by a float.INSERT INTO random_float_table VALUES (CURRENT_TIMESTAMP, RAND_INTEGER(100)*0.02), (CURRENT_TIMESTAMP, RAND_INTEGER(1000)*0.05), (CURRENT_TIMESTAMP, RAND_INTEGER(10000)*0.20), (CURRENT_TIMESTAMP, RAND_INTEGER(100000)*0.22), (CURRENT_TIMESTAMP, RAND_INTEGER(1000000)*0.7);
Press ENTER to return to the SQL shell. Because INSERT INTO VALUES is a point-in-time statement, it exits after it completes inserting records.
Run the following statement to query
random_float_table
for all of its records.SELECT * FROM random_float_table;
Your output should resemble:
ts random_value 2023-09-07 20:24:19.366 0.46 2023-09-07 20:24:19.276 28.75 2023-09-07 20:24:19.367 1467.2 2023-09-07 20:24:19.368 7953.88 2023-09-07 20:24:19.465 685883.1
Press Q to exit the results view and stop the statement.
Run the SHOW JOBS statement to get the status of statements in your SQL environment.
SHOW JOBS;
Your output should resemble:
Statement name: dbdb79f8-7e6e-4b03 Statement successfully submitted. Waiting for statement to be ready. Statement phase is PENDING. Statement phase is COMPLETED. +--------------------+-----------+--------------------------------+--------------+------------------+ | Name | Phase | Statement | Compute Pool | Creation Time | +--------------------+-----------+--------------------------------+--------------+------------------+ | f8f118e1-bd79-40c1 | COMPLETED | CREATE TABLE random_float_t... | lfcp-xxxxxx | 2023-09-07 20... | | a30f8a59-af67-4bf6 | COMPLETED | INSERT INTO random_float_ta... | lfcp-xxxxxx | 2023-09-07 20... | +--------------------+-----------+--------------------------------+--------------+------------------+
Step 5: Query streaming data¶
Flink SQL enables using familiar SQL syntax to query streaming data.
Confluent Cloud for Apache Flink provides example data streams
that you can experiment with. In this step, you query the orders
table from the marketplace
database in the examples
catalog.
In Flink SQL, catalog objects, like tables, are scoped by catalog and database.
- A catalog is a collection of databases that share the same namespace.
- A database is a collection of tables that share the same namespace.
In Confluent Cloud, an environment is mapped to a Flink catalog, and a Kafka cluster is mapped to a Flink database.
You can always use three-part identifiers for your tables, like
catalog.database.table
, but it’s more convenient to set a default.
Run the following statement to set the default catalog.
USE CATALOG `examples`;
Your output should resemble:
+---------------------+----------+ | Key | Value | +---------------------+----------+ | sql.current-catalog | examples | +---------------------+----------+
Run the following statement to set the default database.
USE `marketplace`;
Your output should resemble:
+----------------------+-------------+ | Key | Value | +----------------------+-------------+ | sql.current-database | marketplace | +----------------------+-------------+
Run the following statement to see the list of available tables.
SHOW TABLES;
Your output should resemble:
+------------+ | Table Name | +------------+ | clicks | | customers | | orders | | products | +------------+
Run the following statement to inspect the
orders
data stream.SELECT * FROM orders;
Your output should resemble:
order_id customer_id product_id price 36d77b21-e68f-4123-b87a-cc19ac1f36ac 3137 1305 65.71 7fd3cd2a-392b-4f8f-b953-0bfa1d331354 3063 1327 17.75 1a223c61-38a5-4b8c-8465-2a6b359bf05e 3064 1166 14.95 ...
Press Q to exit the results view and stop the statement.
Congratulations, you have run your first Flink SQL statements on Confluent Cloud using the SQL Shell.